Files
vzan_autoreplay/main.py

439 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import time
import requests
from seleniumwire import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime, timedelta
import threading
# 清除代理环境变量
os.environ['HTTP_PROXY'] = ''
os.environ['HTTPS_PROXY'] = ''
os.environ['http_proxy'] = ''
os.environ['https_proxy'] = ''
AUTH_FILE = "auth.txt"
TARGET_URL = "https://live-liveapi.vzan.com/api/v1/topic/get_topicdatas"
RESET_URL = "https://live.vzan.com/NLive/ReSetStatus"
ADMIN_URL = "https://live.vzan.com/admin/index.html?zbid=951423954&v=638941728939484662"
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data")
WEBHOOK_URL = "https://你的webhook地址" # 替换为实际的webhook地址
# 全局变量
last_login_check = datetime.now()
LOGIN_CHECK_INTERVAL = 1800 # 30分钟
is_logged_in = True
# -------------------- 日志 -------------------- #
def log(msg, level="INFO"):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {msg}")
# -------------------- Webhook通知 -------------------- #
def send_webhook_notification(message, is_emergency=False):
"""发送通知到webhook[9,11](@ref)"""
if not WEBHOOK_URL or "你的webhook地址" in WEBHOOK_URL:
log("Webhook地址未配置跳过通知发送", "WARNING")
return False
try:
payload = {
"msgtype": "text",
"text": {
"content": f"直播状态监控告警\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n信息: {message}"
}
}
if is_emergency:
payload["text"]["content"] = "🔴 紧急告警: " + payload["text"]["content"]
headers = {'Content-Type': 'application/json'}
response = requests.post(WEBHOOK_URL, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
log(f"Webhook通知发送成功: {message}", "INFO")
return True
else:
log(f"Webhook通知发送失败: {response.status_code}", "ERROR")
return False
except Exception as e:
log(f"发送Webhook通知异常: {e}", "ERROR")
return False
# -------------------- 登录状态检查 -------------------- #
def check_login_status(driver):
"""检查当前登录状态是否有效[8](@ref)"""
global is_logged_in
try:
# 尝试访问需要登录的页面
driver.get(ADMIN_URL)
time.sleep(3)
# 检查是否跳转到登录页面或显示登录表单
current_url = driver.current_url
page_source = driver.page_source
# 登录失效的判定条件
login_indicators = [
"login" in current_url.lower(),
"登录" in page_source,
"password" in page_source.lower(),
"username" in page_source.lower()
]
if any(login_indicators):
log("检测到登录已失效", "WARNING")
is_logged_in = False
return False
else:
log("登录状态正常", "INFO")
is_logged_in = True
return True
except Exception as e:
log(f"检查登录状态时出错: {e}", "ERROR")
is_logged_in = False
return False
def refresh_login_status():
"""定期刷新登录状态[1,2](@ref)"""
global last_login_check, is_logged_in
log("执行定期登录状态检查", "INFO")
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_argument("--headless=new") # 无头模式减少干扰
driver = None
try:
driver = webdriver.Chrome(options=chrome_options)
# 检查当前登录状态
if not check_login_status(driver):
log("登录已失效,尝试重新登录", "WARNING")
send_webhook_notification("直播系统登录已失效,需要手动重新登录", True)
# 这里可以添加自动重新登录逻辑,但考虑到安全性,建议手动登录
else:
log("登录状态刷新成功", "INFO")
# 模拟一些操作以保持会话活跃
driver.get(ADMIN_URL)
time.sleep(2)
except Exception as e:
log(f"刷新登录状态时出错: {e}", "ERROR")
send_webhook_notification(f"登录状态检查异常: {e}", True)
finally:
if driver:
driver.quit()
last_login_check = datetime.now()
def should_check_login():
"""判断是否应该执行登录检查"""
global last_login_check
time_since_last_check = (datetime.now() - last_login_check).total_seconds()
return time_since_last_check >= LOGIN_CHECK_INTERVAL
# -------------------- Auth -------------------- #
def save_auth(headers, cookies):
data = {"headers": headers, "cookies": cookies}
with open(AUTH_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
log(f"Headers 和 Cookies 已保存到 {AUTH_FILE}", "INFO")
def load_auth():
if os.path.exists(AUTH_FILE):
with open(AUTH_FILE, "r", encoding="utf-8") as f:
return json.load(f)
else:
log(f"{AUTH_FILE} 不存在,启动浏览器手动登录获取认证信息", "INFO")
return get_auth_from_browser()
def get_auth_from_browser():
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--ignore-certificate-errors")
driver = webdriver.Chrome(options=chrome_options)
driver.requests.clear()
driver.get("https://live.vzan.com/admin/index.html")
input("请手动登录网站,登录完成后按回车继续...")
driver.get(ADMIN_URL)
log("等待页面加载完成...", "INFO")
# 捕获 headers 和 cookies
timeout = 60
start_time = time.time()
captured_headers = None
captured_cookies = None
while time.time() - start_time < timeout:
for request in driver.requests:
if request.url.startswith(ADMIN_URL) and request.method.upper() == "GET":
captured_cookies = request.headers.get("Cookie", "")
if request.url.startswith(TARGET_URL) and request.response:
captured_headers = dict(request.headers)
if captured_headers and captured_cookies:
break
time.sleep(1)
driver.quit()
if not captured_headers or not captured_cookies:
raise Exception("未捕获到 headers 或 cookies请确认登录完成")
save_auth(captured_headers, captured_cookies)
return {"headers": captured_headers, "cookies": captured_cookies}
# -------------------- 读取ID文件 -------------------- #
def read_topic_ids():
id_file = "id.txt"
if not os.path.exists(id_file):
log(f"{id_file} 文件不存在请创建该文件并添加话题ID", "ERROR")
return []
try:
with open(id_file, "r", encoding="utf-8") as f:
ids = [line.strip() for line in f.readlines() if line.strip()]
log(f"{id_file} 读取到 {len(ids)} 个话题ID", "INFO")
return ids
except Exception as e:
log(f"读取ID文件失败: {e}", "ERROR")
return []
# -------------------- 获取单个话题 -------------------- #
def get_single_topic(tid, headers, cookies):
payload = {
"keyword": tid,
"keytype": 1,
"state": -2, "psize": 1, "tag": 0, "page": 1,
"keytype":2,"type":-1,
"livescene": -1, "typeid": -1, "types": -1, "isOnShelf": -1,
"starttime": "", "endtime": "", "chanid": 0, "isHQOut": 0, "isGHHQOut": 0
}
request_headers = headers.copy()
request_headers["Cookie"] = cookies
try:
resp = requests.post(TARGET_URL, headers=request_headers, json=payload, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
log(f"查询话题 {tid} 返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
return None
topic_list = data.get("dataObj", {}).get("list", [])
if topic_list:
return topic_list[0]
else:
log(f"未找到ID为 {tid} 的话题", "WARNING")
return None
except Exception as e:
log(f"查询话题 {tid} 失败: {e}", "ERROR")
return None
# -------------------- 重置状态0话题 -------------------- #
def reset_status_to_minus1(tid, title, headers, cookies):
request_headers = headers.copy()
request_headers["Cookie"] = cookies
request_headers["content-type"] = "application/x-www-form-urlencoded"
data = {"tid": tid, "pstate": "-1"}
try:
resp = requests.post(RESET_URL, headers=request_headers, data=data, timeout=10)
resp.raise_for_status()
result = resp.json()
if result.get("isok"):
log(f"话题 {tid} ({title}) 状态0已重置为-1", "SUCCESS")
else:
log(f"话题 {tid} ({title}) 重置失败: {result.get('Msg')} (code: {result.get('code')})", "ERROR")
return result
except Exception as e:
log(f"重置话题 {tid} ({title}) 出错: {e}", "ERROR")
return None
# -------------------- 浏览器操作 -------------------- #
def find_element_with_retry(driver, selectors, timeout=10):
for selector in selectors:
try:
el = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.XPATH, selector))
)
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", el)
return el
except:
continue
return None
def click_element_js(driver, element):
try:
driver.execute_script("arguments[0].click();", element)
return True
except Exception as e:
log(f"JS点击失败: {e}", "ERROR")
return False
def wait_status_minus1(tid, headers, cookies, retries=10, interval=1):
for _ in range(retries):
topic = get_single_topic(tid, headers, cookies)
if topic and topic.get("status") == -1:
return True
time.sleep(interval)
return False
def automate_browser(topic_id, title, driver, main_window, headers, cookies):
try:
driver.switch_to.new_window('tab')
url = f"https://live.vzan.com/admin/index.html?zbid=951423954&v=638941334981302363#/TopicManage/BaseSetting?topicId={topic_id}"
log(f"打开话题ID: {topic_id} ({title})", "INFO")
driver.get(url)
WebDriverWait(driver, 15).until(lambda d: str(topic_id) in d.current_url)
time.sleep(2)
# 隐藏tip遮挡
try:
tip = driver.find_element(By.CSS_SELECTOR, "div.tip-wrap")
if tip.is_displayed():
driver.execute_script("arguments[0].style.display='none';", tip)
except:
pass
fake_live_button = find_element_with_retry(driver, [
"//button[contains(text(), '伪直播')]",
"//span[contains(text(), '伪直播')]/.."
], timeout=10)
if fake_live_button:
click_element_js(driver, fake_live_button)
log(f"话题 {topic_id} ({title}) 已点击伪直播按钮", "INFO")
time.sleep(1)
save_button = find_element_with_retry(driver, [
"//button[contains(@class,'create-live-btn')]//span[contains(normalize-space(text()),'保 存')]",
"//button[contains(text(),'保 存')]"
], timeout=10)
if save_button:
click_element_js(driver, save_button)
log(f"话题 {topic_id} ({title}) 已点击保存按钮", "INFO")
time.sleep(2)
else:
log(f"话题 {topic_id} ({title}) 未找到保存按钮,截图保存", "ERROR")
driver.save_screenshot(f"save_not_found_{topic_id}.png")
# 检查登录状态
if not check_login_status(driver):
log("检测到登录已失效", "WARNING")
global is_logged_in
is_logged_in = False
send_webhook_notification("直播系统登录已失效,需要手动重新登录", True)
driver.close()
driver.switch_to.window(main_window)
return False
if wait_status_minus1(topic_id, headers, cookies):
log(f"话题 {topic_id} ({title}) 状态已确认为-1浏览器操作完成", "SUCCESS")
else:
log(f"话题 {topic_id} ({title}) 状态未生效,请检查", "ERROR")
driver.close()
driver.switch_to.window(main_window)
return True
except Exception as e:
log(f"处理话题 {topic_id} ({title}) 时出错: {e}", "ERROR")
driver.close()
driver.switch_to.window(main_window)
return False
# -------------------- 主循环 -------------------- #
def main_loop():
global last_login_check
auth = load_auth()
headers, cookies = auth["headers"], auth["cookies"]
# 发送启动通知
send_webhook_notification("直播状态监控程序已启动")
while True:
# 检查是否需要执行登录状态刷新
if should_check_login():
refresh_login_status()
# 只有在登录状态下才执行正常业务逻辑
if not is_logged_in:
log("登录已失效,暂停业务处理", "WARNING")
time.sleep(60) # 等待1分钟后重试
continue
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=chrome_options)
driver.get("https://live.vzan.com/admin/index.html")
main_window = driver.current_window_handle
try:
# 读取ID列表
topic_ids = read_topic_ids()
if not topic_ids:
log("未获取到任何话题ID等待10秒后重试", "WARNING")
time.sleep(10)
continue
# 逐个查询ID对应的话题
topics = []
for tid in topic_ids:
topic = get_single_topic(tid, headers, cookies)
if topic:
topics.append(topic)
if not topics:
log("未找到任何有效话题等待10秒后重试", "INFO")
time.sleep(10)
continue
# 处理状态为0的话题重置为-1
zero_topics = [t for t in topics if t.get("status") == 0]
for t in zero_topics:
tid = t.get("id")
title = t.get("title", "")
if tid:
reset_status_to_minus1(tid, title, headers, cookies)
# 处理状态为-1的话题浏览器操作
minus1_topics = [t for t in topics if t.get("status") == -1]
for t in minus1_topics:
tid = t.get("id")
title = t.get("title", "")
if tid:
automate_browser(tid, title, driver, main_window, headers, cookies)
log("等待20秒再次轮询...", "INFO")
time.sleep(20)
except Exception as e:
log(f"主循环执行异常: {e}", "ERROR")
send_webhook_notification(f"主循环执行异常: {e}", True)
finally:
driver.quit()
if __name__ == "__main__":
log("程序启动仅处理id.txt中的话题ID", "INFO")
try:
main_loop()
except KeyboardInterrupt:
log("程序被用户中断", "INFO")
send_webhook_notification("直播状态监控程序已手动停止")
except Exception as e:
log(f"程序异常退出: {e}", "ERROR")
send_webhook_notification(f"程序异常退出: {e}", True)