Compare commits

4 Commits
main ... dev

4 changed files with 235 additions and 38 deletions

3
.gitignore vendored
View File

@@ -176,4 +176,5 @@ cython_debug/
# cookie
auth.txt
chrome_user_data
chrome_user_data
id.txt

View File

@@ -4,4 +4,6 @@
在这个页面下载https://googlechromelabs.github.io/chrome-for-testing/#stable
需要你的chrome和driver是相同版本
需要你的chrome和driver是相同版本
需要配置id.txt每一行一个话题id

0
id.txt Normal file
View File

266
main.py
View File

@@ -7,18 +7,141 @@ from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime
from datetime import datetime, timedelta
import threading
# 清除代理环境变量
os.environ['HTTP_PROXY'] = ''
os.environ['HTTPS_PROXY'] = ''
os.environ['http_proxy'] = ''
os.environ['https_proxy'] = ''
AUTH_FILE = "auth.txt"
TARGET_URL = "https://live-liveapi.vzan.com/api/v1/topic/get_topicdatas"
RESET_URL = "https://live.vzan.com/NLive/ReSetStatus"
ADMIN_URL = "https://live.vzan.com/admin/index.html?zbid=951423954&v=638941728939484662"
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data") # 浏览器登录状态持久化目录
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data")
WEBHOOK_URL = "https://你的webhook地址" # 替换为实际的webhook地址
# 全局变量
last_login_check = datetime.now()
LOGIN_CHECK_INTERVAL = 1800 # 30分钟
is_logged_in = True
# -------------------- 日志 -------------------- #
def log(msg, level="INFO"):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {msg}")
# -------------------- Webhook通知 -------------------- #
def send_webhook_notification(message, is_emergency=False):
"""发送通知到webhook[9,11](@ref)"""
if not WEBHOOK_URL or "你的webhook地址" in WEBHOOK_URL:
log("Webhook地址未配置跳过通知发送", "WARNING")
return False
try:
payload = {
"msgtype": "text",
"text": {
"content": f"直播状态监控告警\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n信息: {message}"
}
}
if is_emergency:
payload["text"]["content"] = "🔴 紧急告警: " + payload["text"]["content"]
headers = {'Content-Type': 'application/json'}
response = requests.post(WEBHOOK_URL, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
log(f"Webhook通知发送成功: {message}", "INFO")
return True
else:
log(f"Webhook通知发送失败: {response.status_code}", "ERROR")
return False
except Exception as e:
log(f"发送Webhook通知异常: {e}", "ERROR")
return False
# -------------------- 登录状态检查 -------------------- #
def check_login_status(driver):
"""检查当前登录状态是否有效[8](@ref)"""
global is_logged_in
try:
# 尝试访问需要登录的页面
driver.get(ADMIN_URL)
time.sleep(3)
# 检查是否跳转到登录页面或显示登录表单
current_url = driver.current_url
page_source = driver.page_source
# 登录失效的判定条件
login_indicators = [
"login" in current_url.lower(),
"登录" in page_source,
"password" in page_source.lower(),
"username" in page_source.lower()
]
if any(login_indicators):
log("检测到登录已失效", "WARNING")
is_logged_in = False
return False
else:
log("登录状态正常", "INFO")
is_logged_in = True
return True
except Exception as e:
log(f"检查登录状态时出错: {e}", "ERROR")
is_logged_in = False
return False
def refresh_login_status():
"""定期刷新登录状态[1,2](@ref)"""
global last_login_check, is_logged_in
log("执行定期登录状态检查", "INFO")
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_argument("--headless=new") # 无头模式减少干扰
driver = None
try:
driver = webdriver.Chrome(options=chrome_options)
# 检查当前登录状态
if not check_login_status(driver):
log("登录已失效,尝试重新登录", "WARNING")
send_webhook_notification("直播系统登录已失效,需要手动重新登录", True)
# 这里可以添加自动重新登录逻辑,但考虑到安全性,建议手动登录
else:
log("登录状态刷新成功", "INFO")
# 模拟一些操作以保持会话活跃
driver.get(ADMIN_URL)
time.sleep(2)
except Exception as e:
log(f"刷新登录状态时出错: {e}", "ERROR")
send_webhook_notification(f"登录状态检查异常: {e}", True)
finally:
if driver:
driver.quit()
last_login_check = datetime.now()
def should_check_login():
"""判断是否应该执行登录检查"""
global last_login_check
time_since_last_check = (datetime.now() - last_login_check).total_seconds()
return time_since_last_check >= LOGIN_CHECK_INTERVAL
# -------------------- Auth -------------------- #
def save_auth(headers, cookies):
data = {"headers": headers, "cookies": cookies}
@@ -37,7 +160,7 @@ def load_auth():
def get_auth_from_browser():
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 保存登录状态
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--ignore-certificate-errors")
driver = webdriver.Chrome(options=chrome_options)
@@ -63,17 +186,36 @@ def get_auth_from_browser():
break
time.sleep(1)
driver.quit() # 登录状态已保存到 USER_DATA_DIR
driver.quit()
if not captured_headers or not captured_cookies:
raise Exception("未捕获到 headers 或 cookies请确认登录完成")
save_auth(captured_headers, captured_cookies)
return {"headers": captured_headers, "cookies": captured_cookies}
# -------------------- 获取话题 -------------------- #
def get_topics(headers, cookies):
# -------------------- 读取ID文件 -------------------- #
def read_topic_ids():
id_file = "id.txt"
if not os.path.exists(id_file):
log(f"{id_file} 文件不存在请创建该文件并添加话题ID", "ERROR")
return []
try:
with open(id_file, "r", encoding="utf-8") as f:
ids = [line.strip() for line in f.readlines() if line.strip()]
log(f"{id_file} 读取到 {len(ids)} 个话题ID", "INFO")
return ids
except Exception as e:
log(f"读取ID文件失败: {e}", "ERROR")
return []
# -------------------- 获取单个话题 -------------------- #
def get_single_topic(tid, headers, cookies):
payload = {
"keyword": "", "keytype": 1, "state": -2, "psize": 7, "tag": 0, "page": 1,
"keyword": tid,
"keytype": 1,
"state": -2, "psize": 1, "tag": 0, "page": 1,
"keytype":2,"type":-1,
"livescene": -1, "typeid": -1, "types": -1, "isOnShelf": -1,
"starttime": "", "endtime": "", "chanid": 0, "isHQOut": 0, "isGHHQOut": 0
}
@@ -84,14 +226,18 @@ def get_topics(headers, cookies):
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
log(f"获取话题API返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
return []
log(f"查询话题 {tid} 返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
return None
topic_list = data.get("dataObj", {}).get("list", [])
log(f"共获取到 {len(topic_list)} 条话题", "INFO")
return topic_list
if topic_list:
return topic_list[0]
else:
log(f"未找到ID为 {tid} 的话题", "WARNING")
return None
except Exception as e:
log(f"获取话题列表失败: {e}", "ERROR")
return []
log(f"查询话题 {tid} 失败: {e}", "ERROR")
return None
# -------------------- 重置状态0话题 -------------------- #
def reset_status_to_minus1(tid, title, headers, cookies):
@@ -136,10 +282,9 @@ def click_element_js(driver, element):
def wait_status_minus1(tid, headers, cookies, retries=10, interval=1):
for _ in range(retries):
topics = get_topics(headers, cookies)
for t in topics:
if t.get("id") == tid and t.get("status") == -1:
return True
topic = get_single_topic(tid, headers, cookies)
if topic and topic.get("status") == -1:
return True
time.sleep(interval)
return False
@@ -180,6 +325,14 @@ def automate_browser(topic_id, title, driver, main_window, headers, cookies):
else:
log(f"话题 {topic_id} ({title}) 未找到保存按钮,截图保存", "ERROR")
driver.save_screenshot(f"save_not_found_{topic_id}.png")
# 检查登录状态
if not check_login_status(driver):
log("检测到登录已失效", "WARNING")
global is_logged_in
is_logged_in = False
send_webhook_notification("直播系统登录已失效,需要手动重新登录", True)
driver.close()
driver.switch_to.window(main_window)
return False
@@ -200,27 +353,57 @@ def automate_browser(topic_id, title, driver, main_window, headers, cookies):
# -------------------- 主循环 -------------------- #
def main_loop():
global last_login_check
auth = load_auth()
headers, cookies = auth["headers"], auth["cookies"]
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 复用登录状态
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
# 发送启动通知
send_webhook_notification("直播状态监控程序已启动")
driver = webdriver.Chrome(options=chrome_options)
driver.get("https://live.vzan.com/admin/index.html")
main_window = driver.current_window_handle
while True:
# 检查是否需要执行登录状态刷新
if should_check_login():
refresh_login_status()
# 只有在登录状态下才执行正常业务逻辑
if not is_logged_in:
log("登录已失效,暂停业务处理", "WARNING")
time.sleep(60) # 等待1分钟后重试
continue
try:
while True:
topics = get_topics(headers, cookies)
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=chrome_options)
driver.get("https://live.vzan.com/admin/index.html")
main_window = driver.current_window_handle
try:
# 读取ID列表
topic_ids = read_topic_ids()
if not topic_ids:
log("未获取到任何话题ID等待10秒后重试", "WARNING")
time.sleep(10)
continue
# 逐个查询ID对应的话题
topics = []
for tid in topic_ids:
topic = get_single_topic(tid, headers, cookies)
if topic:
topics.append(topic)
if not topics:
log("未找到任何有效话题等待10秒后重试", "INFO")
time.sleep(10)
continue
# 处理状态为0的话题重置为-1
zero_topics = [t for t in topics if t.get("status") == 0]
for t in zero_topics:
tid = t.get("id")
@@ -228,18 +411,29 @@ def main_loop():
if tid:
reset_status_to_minus1(tid, title, headers, cookies)
minus1_topics = [t for t in topics if t.get("status") in (-1, 0)]
# 处理状态为-1的话题浏览器操作
minus1_topics = [t for t in topics if t.get("status") == -1]
for t in minus1_topics:
tid = t.get("id")
title = t.get("title", "")
if tid:
automate_browser(tid, title, driver, main_window, headers, cookies)
log("等待10秒再次轮询...", "INFO")
time.sleep(10)
finally:
driver.quit()
log("等待20秒再次轮询...", "INFO")
time.sleep(20)
except Exception as e:
log(f"主循环执行异常: {e}", "ERROR")
send_webhook_notification(f"主循环执行异常: {e}", True)
finally:
driver.quit()
if __name__ == "__main__":
log("程序启动,动态检测话题状态并处理", "INFO")
main_loop()
log("程序启动,仅处理id.txt中的话题ID", "INFO")
try:
main_loop()
except KeyboardInterrupt:
log("程序被用户中断", "INFO")
send_webhook_notification("直播状态监控程序已手动停止")
except Exception as e:
log(f"程序异常退出: {e}", "ERROR")
send_webhook_notification(f"程序异常退出: {e}", True)