Files
vzan_autoreplay/main.py
2025-09-24 08:52:25 +08:00

246 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import time
import requests
from seleniumwire import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime
AUTH_FILE = "auth.txt"
TARGET_URL = "https://live-liveapi.vzan.com/api/v1/topic/get_topicdatas"
RESET_URL = "https://live.vzan.com/NLive/ReSetStatus"
ADMIN_URL = "https://live.vzan.com/admin/index.html?zbid=951423954&v=638941728939484662"
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data") # 浏览器登录状态持久化目录
# -------------------- 日志 -------------------- #
def log(msg, level="INFO"):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {msg}")
# -------------------- Auth -------------------- #
def save_auth(headers, cookies):
data = {"headers": headers, "cookies": cookies}
with open(AUTH_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
log(f"Headers 和 Cookies 已保存到 {AUTH_FILE}", "INFO")
def load_auth():
if os.path.exists(AUTH_FILE):
with open(AUTH_FILE, "r", encoding="utf-8") as f:
return json.load(f)
else:
log(f"{AUTH_FILE} 不存在,启动浏览器手动登录获取认证信息", "INFO")
return get_auth_from_browser()
def get_auth_from_browser():
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 保存登录状态
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--ignore-certificate-errors")
driver = webdriver.Chrome(options=chrome_options)
driver.requests.clear()
driver.get("https://live.vzan.com/admin/index.html")
input("请手动登录网站,登录完成后按回车继续...")
driver.get(ADMIN_URL)
log("等待页面加载完成...", "INFO")
# 捕获 headers 和 cookies
timeout = 60
start_time = time.time()
captured_headers = None
captured_cookies = None
while time.time() - start_time < timeout:
for request in driver.requests:
if request.url.startswith(ADMIN_URL) and request.method.upper() == "GET":
captured_cookies = request.headers.get("Cookie", "")
if request.url.startswith(TARGET_URL) and request.response:
captured_headers = dict(request.headers)
if captured_headers and captured_cookies:
break
time.sleep(1)
driver.quit() # 登录状态已保存到 USER_DATA_DIR
if not captured_headers or not captured_cookies:
raise Exception("未捕获到 headers 或 cookies请确认登录完成")
save_auth(captured_headers, captured_cookies)
return {"headers": captured_headers, "cookies": captured_cookies}
# -------------------- 获取话题 -------------------- #
def get_topics(headers, cookies):
payload = {
"keyword": "", "keytype": 1, "state": -2, "psize": 7, "tag": 0, "page": 1,
"livescene": -1, "typeid": -1, "types": -1, "isOnShelf": -1,
"starttime": "", "endtime": "", "chanid": 0, "isHQOut": 0, "isGHHQOut": 0
}
request_headers = headers.copy()
request_headers["Cookie"] = cookies
try:
resp = requests.post(TARGET_URL, headers=request_headers, json=payload, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
log(f"获取话题API返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
return []
topic_list = data.get("dataObj", {}).get("list", [])
log(f"共获取到 {len(topic_list)} 条话题", "INFO")
return topic_list
except Exception as e:
log(f"获取话题列表失败: {e}", "ERROR")
return []
# -------------------- 重置状态0话题 -------------------- #
def reset_status_to_minus1(tid, title, headers, cookies):
request_headers = headers.copy()
request_headers["Cookie"] = cookies
request_headers["content-type"] = "application/x-www-form-urlencoded"
data = {"tid": tid, "pstate": "-1"}
try:
resp = requests.post(RESET_URL, headers=request_headers, data=data, timeout=10)
resp.raise_for_status()
result = resp.json()
if result.get("isok"):
log(f"话题 {tid} ({title}) 状态0已重置为-1", "SUCCESS")
else:
log(f"话题 {tid} ({title}) 重置失败: {result.get('Msg')} (code: {result.get('code')})", "ERROR")
return result
except Exception as e:
log(f"重置话题 {tid} ({title}) 出错: {e}", "ERROR")
return None
# -------------------- 浏览器操作 -------------------- #
def find_element_with_retry(driver, selectors, timeout=10):
for selector in selectors:
try:
el = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.XPATH, selector))
)
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", el)
return el
except:
continue
return None
def click_element_js(driver, element):
try:
driver.execute_script("arguments[0].click();", element)
return True
except Exception as e:
log(f"JS点击失败: {e}", "ERROR")
return False
def wait_status_minus1(tid, headers, cookies, retries=10, interval=1):
for _ in range(retries):
topics = get_topics(headers, cookies)
for t in topics:
if t.get("id") == tid and t.get("status") == -1:
return True
time.sleep(interval)
return False
def automate_browser(topic_id, title, driver, main_window, headers, cookies):
try:
driver.switch_to.new_window('tab')
url = f"https://live.vzan.com/admin/index.html?zbid=951423954&v=638941334981302363#/TopicManage/BaseSetting?topicId={topic_id}"
log(f"打开话题ID: {topic_id} ({title})", "INFO")
driver.get(url)
WebDriverWait(driver, 15).until(lambda d: str(topic_id) in d.current_url)
time.sleep(2)
# 隐藏tip遮挡
try:
tip = driver.find_element(By.CSS_SELECTOR, "div.tip-wrap")
if tip.is_displayed():
driver.execute_script("arguments[0].style.display='none';", tip)
except:
pass
fake_live_button = find_element_with_retry(driver, [
"//button[contains(text(), '伪直播')]",
"//span[contains(text(), '伪直播')]/.."
], timeout=10)
if fake_live_button:
click_element_js(driver, fake_live_button)
log(f"话题 {topic_id} ({title}) 已点击伪直播按钮", "INFO")
time.sleep(1)
save_button = find_element_with_retry(driver, [
"//button[contains(@class,'create-live-btn')]//span[contains(normalize-space(text()),'保 存')]",
"//button[contains(text(),'保 存')]"
], timeout=10)
if save_button:
click_element_js(driver, save_button)
log(f"话题 {topic_id} ({title}) 已点击保存按钮", "INFO")
time.sleep(2)
else:
log(f"话题 {topic_id} ({title}) 未找到保存按钮,截图保存", "ERROR")
driver.save_screenshot(f"save_not_found_{topic_id}.png")
driver.close()
driver.switch_to.window(main_window)
return False
if wait_status_minus1(topic_id, headers, cookies):
log(f"话题 {topic_id} ({title}) 状态已确认为-1浏览器操作完成", "SUCCESS")
else:
log(f"话题 {topic_id} ({title}) 状态未生效,请检查", "ERROR")
driver.close()
driver.switch_to.window(main_window)
return True
except Exception as e:
log(f"处理话题 {topic_id} ({title}) 时出错: {e}", "ERROR")
driver.close()
driver.switch_to.window(main_window)
return False
# -------------------- 主循环 -------------------- #
def main_loop():
auth = load_auth()
headers, cookies = auth["headers"], auth["cookies"]
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 复用登录状态
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=chrome_options)
driver.get("https://live.vzan.com/admin/index.html")
main_window = driver.current_window_handle
try:
while True:
topics = get_topics(headers, cookies)
if not topics:
time.sleep(10)
continue
zero_topics = [t for t in topics if t.get("status") == 0]
for t in zero_topics:
tid = t.get("id")
title = t.get("title", "")
if tid:
reset_status_to_minus1(tid, title, headers, cookies)
minus1_topics = [t for t in topics if t.get("status") in (-1, 0)]
for t in minus1_topics:
tid = t.get("id")
title = t.get("title", "")
if tid:
automate_browser(tid, title, driver, main_window, headers, cookies)
log("等待10秒再次轮询...", "INFO")
time.sleep(10)
finally:
driver.quit()
if __name__ == "__main__":
log("程序启动,动态检测话题状态并处理", "INFO")
main_loop()