更新按照话题id操作,每30分钟打开浏览器保活,登录状态失效提醒
This commit is contained in:
258
main.py
258
main.py
@@ -7,18 +7,141 @@ from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
import threading
|
||||
|
||||
# 清除代理环境变量
|
||||
os.environ['HTTP_PROXY'] = ''
|
||||
os.environ['HTTPS_PROXY'] = ''
|
||||
os.environ['http_proxy'] = ''
|
||||
os.environ['https_proxy'] = ''
|
||||
|
||||
AUTH_FILE = "auth.txt"
|
||||
TARGET_URL = "https://live-liveapi.vzan.com/api/v1/topic/get_topicdatas"
|
||||
RESET_URL = "https://live.vzan.com/NLive/ReSetStatus"
|
||||
ADMIN_URL = "https://live.vzan.com/admin/index.html?zbid=951423954&v=638941728939484662"
|
||||
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data") # 浏览器登录状态持久化目录
|
||||
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data")
|
||||
WEBHOOK_URL = "https://你的webhook地址" # 替换为实际的webhook地址
|
||||
|
||||
# 全局变量
|
||||
last_login_check = datetime.now()
|
||||
LOGIN_CHECK_INTERVAL = 1800 # 30分钟
|
||||
is_logged_in = True
|
||||
|
||||
# -------------------- 日志 -------------------- #
|
||||
def log(msg, level="INFO"):
|
||||
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {msg}")
|
||||
|
||||
# -------------------- Webhook通知 -------------------- #
|
||||
def send_webhook_notification(message, is_emergency=False):
|
||||
"""发送通知到webhook[9,11](@ref)"""
|
||||
if not WEBHOOK_URL or "你的webhook地址" in WEBHOOK_URL:
|
||||
log("Webhook地址未配置,跳过通知发送", "WARNING")
|
||||
return False
|
||||
|
||||
try:
|
||||
payload = {
|
||||
"msgtype": "text",
|
||||
"text": {
|
||||
"content": f"直播状态监控告警\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n信息: {message}"
|
||||
}
|
||||
}
|
||||
|
||||
if is_emergency:
|
||||
payload["text"]["content"] = "🔴 紧急告警: " + payload["text"]["content"]
|
||||
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
response = requests.post(WEBHOOK_URL, json=payload, headers=headers, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
log(f"Webhook通知发送成功: {message}", "INFO")
|
||||
return True
|
||||
else:
|
||||
log(f"Webhook通知发送失败: {response.status_code}", "ERROR")
|
||||
return False
|
||||
except Exception as e:
|
||||
log(f"发送Webhook通知异常: {e}", "ERROR")
|
||||
return False
|
||||
|
||||
# -------------------- 登录状态检查 -------------------- #
|
||||
def check_login_status(driver):
|
||||
"""检查当前登录状态是否有效[8](@ref)"""
|
||||
global is_logged_in
|
||||
|
||||
try:
|
||||
# 尝试访问需要登录的页面
|
||||
driver.get(ADMIN_URL)
|
||||
time.sleep(3)
|
||||
|
||||
# 检查是否跳转到登录页面或显示登录表单
|
||||
current_url = driver.current_url
|
||||
page_source = driver.page_source
|
||||
|
||||
# 登录失效的判定条件
|
||||
login_indicators = [
|
||||
"login" in current_url.lower(),
|
||||
"登录" in page_source,
|
||||
"password" in page_source.lower(),
|
||||
"username" in page_source.lower()
|
||||
]
|
||||
|
||||
if any(login_indicators):
|
||||
log("检测到登录已失效", "WARNING")
|
||||
is_logged_in = False
|
||||
return False
|
||||
else:
|
||||
log("登录状态正常", "INFO")
|
||||
is_logged_in = True
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
log(f"检查登录状态时出错: {e}", "ERROR")
|
||||
is_logged_in = False
|
||||
return False
|
||||
|
||||
def refresh_login_status():
|
||||
"""定期刷新登录状态[1,2](@ref)"""
|
||||
global last_login_check, is_logged_in
|
||||
|
||||
log("执行定期登录状态检查", "INFO")
|
||||
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--start-maximized")
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
|
||||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||||
chrome_options.add_argument("--ignore-certificate-errors")
|
||||
chrome_options.add_argument("--headless=new") # 无头模式减少干扰
|
||||
|
||||
driver = None
|
||||
try:
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
|
||||
# 检查当前登录状态
|
||||
if not check_login_status(driver):
|
||||
log("登录已失效,尝试重新登录", "WARNING")
|
||||
send_webhook_notification("直播系统登录已失效,需要手动重新登录", True)
|
||||
# 这里可以添加自动重新登录逻辑,但考虑到安全性,建议手动登录
|
||||
else:
|
||||
log("登录状态刷新成功", "INFO")
|
||||
# 模拟一些操作以保持会话活跃
|
||||
driver.get(ADMIN_URL)
|
||||
time.sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
log(f"刷新登录状态时出错: {e}", "ERROR")
|
||||
send_webhook_notification(f"登录状态检查异常: {e}", True)
|
||||
finally:
|
||||
if driver:
|
||||
driver.quit()
|
||||
|
||||
last_login_check = datetime.now()
|
||||
|
||||
def should_check_login():
|
||||
"""判断是否应该执行登录检查"""
|
||||
global last_login_check
|
||||
time_since_last_check = (datetime.now() - last_login_check).total_seconds()
|
||||
return time_since_last_check >= LOGIN_CHECK_INTERVAL
|
||||
|
||||
# -------------------- Auth -------------------- #
|
||||
def save_auth(headers, cookies):
|
||||
data = {"headers": headers, "cookies": cookies}
|
||||
@@ -37,7 +160,7 @@ def load_auth():
|
||||
def get_auth_from_browser():
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--start-maximized")
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 保存登录状态
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
|
||||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||||
chrome_options.add_argument("--ignore-certificate-errors")
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
@@ -63,17 +186,36 @@ def get_auth_from_browser():
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
driver.quit() # 登录状态已保存到 USER_DATA_DIR
|
||||
driver.quit()
|
||||
if not captured_headers or not captured_cookies:
|
||||
raise Exception("未捕获到 headers 或 cookies,请确认登录完成")
|
||||
|
||||
save_auth(captured_headers, captured_cookies)
|
||||
return {"headers": captured_headers, "cookies": captured_cookies}
|
||||
|
||||
# -------------------- 获取话题 -------------------- #
|
||||
def get_topics(headers, cookies):
|
||||
# -------------------- 读取ID文件 -------------------- #
|
||||
def read_topic_ids():
|
||||
id_file = "id.txt"
|
||||
if not os.path.exists(id_file):
|
||||
log(f"{id_file} 文件不存在,请创建该文件并添加话题ID", "ERROR")
|
||||
return []
|
||||
|
||||
try:
|
||||
with open(id_file, "r", encoding="utf-8") as f:
|
||||
ids = [line.strip() for line in f.readlines() if line.strip()]
|
||||
log(f"从 {id_file} 读取到 {len(ids)} 个话题ID", "INFO")
|
||||
return ids
|
||||
except Exception as e:
|
||||
log(f"读取ID文件失败: {e}", "ERROR")
|
||||
return []
|
||||
|
||||
# -------------------- 获取单个话题 -------------------- #
|
||||
def get_single_topic(tid, headers, cookies):
|
||||
payload = {
|
||||
"keyword": "", "keytype": 1, "state": -2, "psize": 7, "tag": 0, "page": 1,
|
||||
"keyword": tid,
|
||||
"keytype": 1,
|
||||
"state": -2, "psize": 1, "tag": 0, "page": 1,
|
||||
"keytype":2,"type":-1,
|
||||
"livescene": -1, "typeid": -1, "types": -1, "isOnShelf": -1,
|
||||
"starttime": "", "endtime": "", "chanid": 0, "isHQOut": 0, "isGHHQOut": 0
|
||||
}
|
||||
@@ -84,14 +226,18 @@ def get_topics(headers, cookies):
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
if data.get("code") != 0:
|
||||
log(f"获取话题API返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
|
||||
return []
|
||||
log(f"查询话题 {tid} 返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
|
||||
return None
|
||||
|
||||
topic_list = data.get("dataObj", {}).get("list", [])
|
||||
log(f"共获取到 {len(topic_list)} 条话题", "INFO")
|
||||
return topic_list
|
||||
if topic_list:
|
||||
return topic_list[0]
|
||||
else:
|
||||
log(f"未找到ID为 {tid} 的话题", "WARNING")
|
||||
return None
|
||||
except Exception as e:
|
||||
log(f"获取话题列表失败: {e}", "ERROR")
|
||||
return []
|
||||
log(f"查询话题 {tid} 失败: {e}", "ERROR")
|
||||
return None
|
||||
|
||||
# -------------------- 重置状态0话题 -------------------- #
|
||||
def reset_status_to_minus1(tid, title, headers, cookies):
|
||||
@@ -136,10 +282,9 @@ def click_element_js(driver, element):
|
||||
|
||||
def wait_status_minus1(tid, headers, cookies, retries=10, interval=1):
|
||||
for _ in range(retries):
|
||||
topics = get_topics(headers, cookies)
|
||||
for t in topics:
|
||||
if t.get("id") == tid and t.get("status") == -1:
|
||||
return True
|
||||
topic = get_single_topic(tid, headers, cookies)
|
||||
if topic and topic.get("status") == -1:
|
||||
return True
|
||||
time.sleep(interval)
|
||||
return False
|
||||
|
||||
@@ -200,27 +345,57 @@ def automate_browser(topic_id, title, driver, main_window, headers, cookies):
|
||||
|
||||
# -------------------- 主循环 -------------------- #
|
||||
def main_loop():
|
||||
global last_login_check
|
||||
|
||||
auth = load_auth()
|
||||
headers, cookies = auth["headers"], auth["cookies"]
|
||||
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--start-maximized")
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 复用登录状态
|
||||
chrome_options.add_argument("--headless=new")
|
||||
chrome_options.add_argument("--disable-gpu")
|
||||
chrome_options.add_argument("--window-size=1920,1080")
|
||||
# 发送启动通知
|
||||
send_webhook_notification("直播状态监控程序已启动")
|
||||
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
driver.get("https://live.vzan.com/admin/index.html")
|
||||
main_window = driver.current_window_handle
|
||||
while True:
|
||||
# 检查是否需要执行登录状态刷新
|
||||
if should_check_login():
|
||||
refresh_login_status()
|
||||
|
||||
# 只有在登录状态下才执行正常业务逻辑
|
||||
if not is_logged_in:
|
||||
log("登录已失效,暂停业务处理", "WARNING")
|
||||
time.sleep(60) # 等待1分钟后重试
|
||||
continue
|
||||
|
||||
try:
|
||||
while True:
|
||||
topics = get_topics(headers, cookies)
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--start-maximized")
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
|
||||
chrome_options.add_argument("--headless=new")
|
||||
chrome_options.add_argument("--disable-gpu")
|
||||
chrome_options.add_argument("--window-size=1920,1080")
|
||||
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
driver.get("https://live.vzan.com/admin/index.html")
|
||||
main_window = driver.current_window_handle
|
||||
|
||||
try:
|
||||
# 读取ID列表
|
||||
topic_ids = read_topic_ids()
|
||||
|
||||
if not topic_ids:
|
||||
log("未获取到任何话题ID,等待10秒后重试", "WARNING")
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
||||
# 逐个查询ID对应的话题
|
||||
topics = []
|
||||
for tid in topic_ids:
|
||||
topic = get_single_topic(tid, headers, cookies)
|
||||
if topic:
|
||||
topics.append(topic)
|
||||
if not topics:
|
||||
log("未找到任何有效话题,等待10秒后重试", "INFO")
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
||||
# 处理状态为0的话题(重置为-1)
|
||||
zero_topics = [t for t in topics if t.get("status") == 0]
|
||||
for t in zero_topics:
|
||||
tid = t.get("id")
|
||||
@@ -228,18 +403,29 @@ def main_loop():
|
||||
if tid:
|
||||
reset_status_to_minus1(tid, title, headers, cookies)
|
||||
|
||||
minus1_topics = [t for t in topics if t.get("status") in (-1, 0)]
|
||||
# 处理状态为-1的话题(浏览器操作)
|
||||
minus1_topics = [t for t in topics if t.get("status") == -1]
|
||||
for t in minus1_topics:
|
||||
tid = t.get("id")
|
||||
title = t.get("title", "")
|
||||
if tid:
|
||||
automate_browser(tid, title, driver, main_window, headers, cookies)
|
||||
|
||||
log("等待10秒再次轮询...", "INFO")
|
||||
time.sleep(10)
|
||||
finally:
|
||||
driver.quit()
|
||||
log("等待20秒再次轮询...", "INFO")
|
||||
time.sleep(20)
|
||||
except Exception as e:
|
||||
log(f"主循环执行异常: {e}", "ERROR")
|
||||
send_webhook_notification(f"主循环执行异常: {e}", True)
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
if __name__ == "__main__":
|
||||
log("程序启动,动态检测话题状态并处理", "INFO")
|
||||
main_loop()
|
||||
log("程序启动,仅处理id.txt中的话题ID", "INFO")
|
||||
try:
|
||||
main_loop()
|
||||
except KeyboardInterrupt:
|
||||
log("程序被用户中断", "INFO")
|
||||
send_webhook_notification("直播状态监控程序已手动停止")
|
||||
except Exception as e:
|
||||
log(f"程序异常退出: {e}", "ERROR")
|
||||
send_webhook_notification(f"程序异常退出: {e}", True)
|
||||
Reference in New Issue
Block a user