439 lines
17 KiB
Python
439 lines
17 KiB
Python
import json
|
||
import os
|
||
import time
|
||
import requests
|
||
from seleniumwire import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from datetime import datetime, timedelta
|
||
import threading
|
||
|
||
# 清除代理环境变量
|
||
os.environ['HTTP_PROXY'] = ''
|
||
os.environ['HTTPS_PROXY'] = ''
|
||
os.environ['http_proxy'] = ''
|
||
os.environ['https_proxy'] = ''
|
||
|
||
AUTH_FILE = "auth.txt"
|
||
TARGET_URL = "https://live-liveapi.vzan.com/api/v1/topic/get_topicdatas"
|
||
RESET_URL = "https://live.vzan.com/NLive/ReSetStatus"
|
||
ADMIN_URL = "https://live.vzan.com/admin/index.html?zbid=951423954&v=638941728939484662"
|
||
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data")
|
||
WEBHOOK_URL = "https://你的webhook地址" # 替换为实际的webhook地址
|
||
|
||
# 全局变量
|
||
last_login_check = datetime.now()
|
||
LOGIN_CHECK_INTERVAL = 1800 # 30分钟
|
||
is_logged_in = True
|
||
|
||
# -------------------- 日志 -------------------- #
|
||
def log(msg, level="INFO"):
|
||
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {msg}")
|
||
|
||
# -------------------- Webhook通知 -------------------- #
|
||
def send_webhook_notification(message, is_emergency=False):
|
||
"""发送通知到webhook[9,11](@ref)"""
|
||
if not WEBHOOK_URL or "你的webhook地址" in WEBHOOK_URL:
|
||
log("Webhook地址未配置,跳过通知发送", "WARNING")
|
||
return False
|
||
|
||
try:
|
||
payload = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": f"直播状态监控告警\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n信息: {message}"
|
||
}
|
||
}
|
||
|
||
if is_emergency:
|
||
payload["text"]["content"] = "🔴 紧急告警: " + payload["text"]["content"]
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
response = requests.post(WEBHOOK_URL, json=payload, headers=headers, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
log(f"Webhook通知发送成功: {message}", "INFO")
|
||
return True
|
||
else:
|
||
log(f"Webhook通知发送失败: {response.status_code}", "ERROR")
|
||
return False
|
||
except Exception as e:
|
||
log(f"发送Webhook通知异常: {e}", "ERROR")
|
||
return False
|
||
|
||
# -------------------- 登录状态检查 -------------------- #
|
||
def check_login_status(driver):
|
||
"""检查当前登录状态是否有效[8](@ref)"""
|
||
global is_logged_in
|
||
|
||
try:
|
||
# 尝试访问需要登录的页面
|
||
driver.get(ADMIN_URL)
|
||
time.sleep(3)
|
||
|
||
# 检查是否跳转到登录页面或显示登录表单
|
||
current_url = driver.current_url
|
||
page_source = driver.page_source
|
||
|
||
# 登录失效的判定条件
|
||
login_indicators = [
|
||
"login" in current_url.lower(),
|
||
"登录" in page_source,
|
||
"password" in page_source.lower(),
|
||
"username" in page_source.lower()
|
||
]
|
||
|
||
if any(login_indicators):
|
||
log("检测到登录已失效", "WARNING")
|
||
is_logged_in = False
|
||
return False
|
||
else:
|
||
log("登录状态正常", "INFO")
|
||
is_logged_in = True
|
||
return True
|
||
|
||
except Exception as e:
|
||
log(f"检查登录状态时出错: {e}", "ERROR")
|
||
is_logged_in = False
|
||
return False
|
||
|
||
def refresh_login_status():
|
||
"""定期刷新登录状态[1,2](@ref)"""
|
||
global last_login_check, is_logged_in
|
||
|
||
log("执行定期登录状态检查", "INFO")
|
||
|
||
chrome_options = Options()
|
||
chrome_options.add_argument("--start-maximized")
|
||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||
chrome_options.add_argument("--ignore-certificate-errors")
|
||
chrome_options.add_argument("--headless=new") # 无头模式减少干扰
|
||
|
||
driver = None
|
||
try:
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
|
||
# 检查当前登录状态
|
||
if not check_login_status(driver):
|
||
log("登录已失效,尝试重新登录", "WARNING")
|
||
send_webhook_notification("直播系统登录已失效,需要手动重新登录", True)
|
||
# 这里可以添加自动重新登录逻辑,但考虑到安全性,建议手动登录
|
||
else:
|
||
log("登录状态刷新成功", "INFO")
|
||
# 模拟一些操作以保持会话活跃
|
||
driver.get(ADMIN_URL)
|
||
time.sleep(2)
|
||
|
||
except Exception as e:
|
||
log(f"刷新登录状态时出错: {e}", "ERROR")
|
||
send_webhook_notification(f"登录状态检查异常: {e}", True)
|
||
finally:
|
||
if driver:
|
||
driver.quit()
|
||
|
||
last_login_check = datetime.now()
|
||
|
||
def should_check_login():
|
||
"""判断是否应该执行登录检查"""
|
||
global last_login_check
|
||
time_since_last_check = (datetime.now() - last_login_check).total_seconds()
|
||
return time_since_last_check >= LOGIN_CHECK_INTERVAL
|
||
|
||
# -------------------- Auth -------------------- #
|
||
def save_auth(headers, cookies):
|
||
data = {"headers": headers, "cookies": cookies}
|
||
with open(AUTH_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||
log(f"Headers 和 Cookies 已保存到 {AUTH_FILE}", "INFO")
|
||
|
||
def load_auth():
|
||
if os.path.exists(AUTH_FILE):
|
||
with open(AUTH_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
else:
|
||
log(f"{AUTH_FILE} 不存在,启动浏览器手动登录获取认证信息", "INFO")
|
||
return get_auth_from_browser()
|
||
|
||
def get_auth_from_browser():
|
||
chrome_options = Options()
|
||
chrome_options.add_argument("--start-maximized")
|
||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||
chrome_options.add_argument("--ignore-certificate-errors")
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
driver.requests.clear()
|
||
|
||
driver.get("https://live.vzan.com/admin/index.html")
|
||
input("请手动登录网站,登录完成后按回车继续...")
|
||
driver.get(ADMIN_URL)
|
||
log("等待页面加载完成...", "INFO")
|
||
|
||
# 捕获 headers 和 cookies
|
||
timeout = 60
|
||
start_time = time.time()
|
||
captured_headers = None
|
||
captured_cookies = None
|
||
while time.time() - start_time < timeout:
|
||
for request in driver.requests:
|
||
if request.url.startswith(ADMIN_URL) and request.method.upper() == "GET":
|
||
captured_cookies = request.headers.get("Cookie", "")
|
||
if request.url.startswith(TARGET_URL) and request.response:
|
||
captured_headers = dict(request.headers)
|
||
if captured_headers and captured_cookies:
|
||
break
|
||
time.sleep(1)
|
||
|
||
driver.quit()
|
||
if not captured_headers or not captured_cookies:
|
||
raise Exception("未捕获到 headers 或 cookies,请确认登录完成")
|
||
|
||
save_auth(captured_headers, captured_cookies)
|
||
return {"headers": captured_headers, "cookies": captured_cookies}
|
||
|
||
# -------------------- 读取ID文件 -------------------- #
|
||
def read_topic_ids():
|
||
id_file = "id.txt"
|
||
if not os.path.exists(id_file):
|
||
log(f"{id_file} 文件不存在,请创建该文件并添加话题ID", "ERROR")
|
||
return []
|
||
|
||
try:
|
||
with open(id_file, "r", encoding="utf-8") as f:
|
||
ids = [line.strip() for line in f.readlines() if line.strip()]
|
||
log(f"从 {id_file} 读取到 {len(ids)} 个话题ID", "INFO")
|
||
return ids
|
||
except Exception as e:
|
||
log(f"读取ID文件失败: {e}", "ERROR")
|
||
return []
|
||
|
||
# -------------------- 获取单个话题 -------------------- #
|
||
def get_single_topic(tid, headers, cookies):
|
||
payload = {
|
||
"keyword": tid,
|
||
"keytype": 1,
|
||
"state": -2, "psize": 1, "tag": 0, "page": 1,
|
||
"keytype":2,"type":-1,
|
||
"livescene": -1, "typeid": -1, "types": -1, "isOnShelf": -1,
|
||
"starttime": "", "endtime": "", "chanid": 0, "isHQOut": 0, "isGHHQOut": 0
|
||
}
|
||
request_headers = headers.copy()
|
||
request_headers["Cookie"] = cookies
|
||
try:
|
||
resp = requests.post(TARGET_URL, headers=request_headers, json=payload, timeout=10)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if data.get("code") != 0:
|
||
log(f"查询话题 {tid} 返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
|
||
return None
|
||
|
||
topic_list = data.get("dataObj", {}).get("list", [])
|
||
if topic_list:
|
||
return topic_list[0]
|
||
else:
|
||
log(f"未找到ID为 {tid} 的话题", "WARNING")
|
||
return None
|
||
except Exception as e:
|
||
log(f"查询话题 {tid} 失败: {e}", "ERROR")
|
||
return None
|
||
|
||
# -------------------- 重置状态0话题 -------------------- #
|
||
def reset_status_to_minus1(tid, title, headers, cookies):
|
||
request_headers = headers.copy()
|
||
request_headers["Cookie"] = cookies
|
||
request_headers["content-type"] = "application/x-www-form-urlencoded"
|
||
data = {"tid": tid, "pstate": "-1"}
|
||
|
||
try:
|
||
resp = requests.post(RESET_URL, headers=request_headers, data=data, timeout=10)
|
||
resp.raise_for_status()
|
||
result = resp.json()
|
||
if result.get("isok"):
|
||
log(f"话题 {tid} ({title}) 状态0已重置为-1", "SUCCESS")
|
||
else:
|
||
log(f"话题 {tid} ({title}) 重置失败: {result.get('Msg')} (code: {result.get('code')})", "ERROR")
|
||
return result
|
||
except Exception as e:
|
||
log(f"重置话题 {tid} ({title}) 出错: {e}", "ERROR")
|
||
return None
|
||
|
||
# -------------------- 浏览器操作 -------------------- #
|
||
def find_element_with_retry(driver, selectors, timeout=10):
|
||
for selector in selectors:
|
||
try:
|
||
el = WebDriverWait(driver, timeout).until(
|
||
EC.presence_of_element_located((By.XPATH, selector))
|
||
)
|
||
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", el)
|
||
return el
|
||
except:
|
||
continue
|
||
return None
|
||
|
||
def click_element_js(driver, element):
|
||
try:
|
||
driver.execute_script("arguments[0].click();", element)
|
||
return True
|
||
except Exception as e:
|
||
log(f"JS点击失败: {e}", "ERROR")
|
||
return False
|
||
|
||
def wait_status_minus1(tid, headers, cookies, retries=10, interval=1):
|
||
for _ in range(retries):
|
||
topic = get_single_topic(tid, headers, cookies)
|
||
if topic and topic.get("status") == -1:
|
||
return True
|
||
time.sleep(interval)
|
||
return False
|
||
|
||
def automate_browser(topic_id, title, driver, main_window, headers, cookies):
|
||
try:
|
||
driver.switch_to.new_window('tab')
|
||
url = f"https://live.vzan.com/admin/index.html?zbid=951423954&v=638941334981302363#/TopicManage/BaseSetting?topicId={topic_id}"
|
||
log(f"打开话题ID: {topic_id} ({title})", "INFO")
|
||
driver.get(url)
|
||
WebDriverWait(driver, 15).until(lambda d: str(topic_id) in d.current_url)
|
||
time.sleep(2)
|
||
|
||
# 隐藏tip遮挡
|
||
try:
|
||
tip = driver.find_element(By.CSS_SELECTOR, "div.tip-wrap")
|
||
if tip.is_displayed():
|
||
driver.execute_script("arguments[0].style.display='none';", tip)
|
||
except:
|
||
pass
|
||
|
||
fake_live_button = find_element_with_retry(driver, [
|
||
"//button[contains(text(), '伪直播')]",
|
||
"//span[contains(text(), '伪直播')]/.."
|
||
], timeout=10)
|
||
if fake_live_button:
|
||
click_element_js(driver, fake_live_button)
|
||
log(f"话题 {topic_id} ({title}) 已点击伪直播按钮", "INFO")
|
||
time.sleep(1)
|
||
|
||
save_button = find_element_with_retry(driver, [
|
||
"//button[contains(@class,'create-live-btn')]//span[contains(normalize-space(text()),'保 存')]",
|
||
"//button[contains(text(),'保 存')]"
|
||
], timeout=10)
|
||
if save_button:
|
||
click_element_js(driver, save_button)
|
||
log(f"话题 {topic_id} ({title}) 已点击保存按钮", "INFO")
|
||
time.sleep(2)
|
||
else:
|
||
log(f"话题 {topic_id} ({title}) 未找到保存按钮,截图保存", "ERROR")
|
||
driver.save_screenshot(f"save_not_found_{topic_id}.png")
|
||
|
||
# 检查登录状态
|
||
if not check_login_status(driver):
|
||
log("检测到登录已失效", "WARNING")
|
||
global is_logged_in
|
||
is_logged_in = False
|
||
send_webhook_notification("直播系统登录已失效,需要手动重新登录", True)
|
||
|
||
driver.close()
|
||
driver.switch_to.window(main_window)
|
||
return False
|
||
|
||
if wait_status_minus1(topic_id, headers, cookies):
|
||
log(f"话题 {topic_id} ({title}) 状态已确认为-1,浏览器操作完成", "SUCCESS")
|
||
else:
|
||
log(f"话题 {topic_id} ({title}) 状态未生效,请检查", "ERROR")
|
||
|
||
driver.close()
|
||
driver.switch_to.window(main_window)
|
||
return True
|
||
except Exception as e:
|
||
log(f"处理话题 {topic_id} ({title}) 时出错: {e}", "ERROR")
|
||
driver.close()
|
||
driver.switch_to.window(main_window)
|
||
return False
|
||
|
||
# -------------------- 主循环 -------------------- #
|
||
def main_loop():
|
||
global last_login_check
|
||
|
||
auth = load_auth()
|
||
headers, cookies = auth["headers"], auth["cookies"]
|
||
|
||
# 发送启动通知
|
||
send_webhook_notification("直播状态监控程序已启动")
|
||
|
||
while True:
|
||
# 检查是否需要执行登录状态刷新
|
||
if should_check_login():
|
||
refresh_login_status()
|
||
|
||
# 只有在登录状态下才执行正常业务逻辑
|
||
if not is_logged_in:
|
||
log("登录已失效,暂停业务处理", "WARNING")
|
||
time.sleep(60) # 等待1分钟后重试
|
||
continue
|
||
|
||
chrome_options = Options()
|
||
chrome_options.add_argument("--start-maximized")
|
||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
|
||
chrome_options.add_argument("--headless=new")
|
||
chrome_options.add_argument("--disable-gpu")
|
||
chrome_options.add_argument("--window-size=1920,1080")
|
||
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
driver.get("https://live.vzan.com/admin/index.html")
|
||
main_window = driver.current_window_handle
|
||
|
||
try:
|
||
# 读取ID列表
|
||
topic_ids = read_topic_ids()
|
||
|
||
if not topic_ids:
|
||
log("未获取到任何话题ID,等待10秒后重试", "WARNING")
|
||
time.sleep(10)
|
||
continue
|
||
|
||
# 逐个查询ID对应的话题
|
||
topics = []
|
||
for tid in topic_ids:
|
||
topic = get_single_topic(tid, headers, cookies)
|
||
if topic:
|
||
topics.append(topic)
|
||
if not topics:
|
||
log("未找到任何有效话题,等待10秒后重试", "INFO")
|
||
time.sleep(10)
|
||
continue
|
||
|
||
# 处理状态为0的话题(重置为-1)
|
||
zero_topics = [t for t in topics if t.get("status") == 0]
|
||
for t in zero_topics:
|
||
tid = t.get("id")
|
||
title = t.get("title", "")
|
||
if tid:
|
||
reset_status_to_minus1(tid, title, headers, cookies)
|
||
|
||
# 处理状态为-1的话题(浏览器操作)
|
||
minus1_topics = [t for t in topics if t.get("status") == -1]
|
||
for t in minus1_topics:
|
||
tid = t.get("id")
|
||
title = t.get("title", "")
|
||
if tid:
|
||
automate_browser(tid, title, driver, main_window, headers, cookies)
|
||
|
||
log("等待20秒再次轮询...", "INFO")
|
||
time.sleep(20)
|
||
except Exception as e:
|
||
log(f"主循环执行异常: {e}", "ERROR")
|
||
send_webhook_notification(f"主循环执行异常: {e}", True)
|
||
finally:
|
||
driver.quit()
|
||
|
||
if __name__ == "__main__":
|
||
log("程序启动,仅处理id.txt中的话题ID", "INFO")
|
||
try:
|
||
main_loop()
|
||
except KeyboardInterrupt:
|
||
log("程序被用户中断", "INFO")
|
||
send_webhook_notification("直播状态监控程序已手动停止")
|
||
except Exception as e:
|
||
log(f"程序异常退出: {e}", "ERROR")
|
||
send_webhook_notification(f"程序异常退出: {e}", True) |