246 lines
9.8 KiB
Python
246 lines
9.8 KiB
Python
import json
|
||
import os
|
||
import time
|
||
import requests
|
||
from seleniumwire import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from datetime import datetime
|
||
|
||
AUTH_FILE = "auth.txt"
|
||
TARGET_URL = "https://live-liveapi.vzan.com/api/v1/topic/get_topicdatas"
|
||
RESET_URL = "https://live.vzan.com/NLive/ReSetStatus"
|
||
ADMIN_URL = "https://live.vzan.com/admin/index.html?zbid=951423954&v=638941728939484662"
|
||
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data") # 浏览器登录状态持久化目录
|
||
|
||
# -------------------- 日志 -------------------- #
|
||
def log(msg, level="INFO"):
|
||
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {msg}")
|
||
|
||
# -------------------- Auth -------------------- #
|
||
def save_auth(headers, cookies):
|
||
data = {"headers": headers, "cookies": cookies}
|
||
with open(AUTH_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||
log(f"Headers 和 Cookies 已保存到 {AUTH_FILE}", "INFO")
|
||
|
||
def load_auth():
|
||
if os.path.exists(AUTH_FILE):
|
||
with open(AUTH_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
else:
|
||
log(f"{AUTH_FILE} 不存在,启动浏览器手动登录获取认证信息", "INFO")
|
||
return get_auth_from_browser()
|
||
|
||
def get_auth_from_browser():
|
||
chrome_options = Options()
|
||
chrome_options.add_argument("--start-maximized")
|
||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 保存登录状态
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||
chrome_options.add_argument("--ignore-certificate-errors")
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
driver.requests.clear()
|
||
|
||
driver.get("https://live.vzan.com/admin/index.html")
|
||
input("请手动登录网站,登录完成后按回车继续...")
|
||
driver.get(ADMIN_URL)
|
||
log("等待页面加载完成...", "INFO")
|
||
|
||
# 捕获 headers 和 cookies
|
||
timeout = 60
|
||
start_time = time.time()
|
||
captured_headers = None
|
||
captured_cookies = None
|
||
while time.time() - start_time < timeout:
|
||
for request in driver.requests:
|
||
if request.url.startswith(ADMIN_URL) and request.method.upper() == "GET":
|
||
captured_cookies = request.headers.get("Cookie", "")
|
||
if request.url.startswith(TARGET_URL) and request.response:
|
||
captured_headers = dict(request.headers)
|
||
if captured_headers and captured_cookies:
|
||
break
|
||
time.sleep(1)
|
||
|
||
driver.quit() # 登录状态已保存到 USER_DATA_DIR
|
||
if not captured_headers or not captured_cookies:
|
||
raise Exception("未捕获到 headers 或 cookies,请确认登录完成")
|
||
|
||
save_auth(captured_headers, captured_cookies)
|
||
return {"headers": captured_headers, "cookies": captured_cookies}
|
||
|
||
# -------------------- 获取话题 -------------------- #
|
||
def get_topics(headers, cookies):
|
||
payload = {
|
||
"keyword": "", "keytype": 1, "state": -2, "psize": 7, "tag": 0, "page": 1,
|
||
"livescene": -1, "typeid": -1, "types": -1, "isOnShelf": -1,
|
||
"starttime": "", "endtime": "", "chanid": 0, "isHQOut": 0, "isGHHQOut": 0
|
||
}
|
||
request_headers = headers.copy()
|
||
request_headers["Cookie"] = cookies
|
||
try:
|
||
resp = requests.post(TARGET_URL, headers=request_headers, json=payload, timeout=10)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if data.get("code") != 0:
|
||
log(f"获取话题API返回错误码: {data.get('code')} 消息: {data.get('msg')}", "ERROR")
|
||
return []
|
||
topic_list = data.get("dataObj", {}).get("list", [])
|
||
log(f"共获取到 {len(topic_list)} 条话题", "INFO")
|
||
return topic_list
|
||
except Exception as e:
|
||
log(f"获取话题列表失败: {e}", "ERROR")
|
||
return []
|
||
|
||
# -------------------- 重置状态0话题 -------------------- #
|
||
def reset_status_to_minus1(tid, title, headers, cookies):
|
||
request_headers = headers.copy()
|
||
request_headers["Cookie"] = cookies
|
||
request_headers["content-type"] = "application/x-www-form-urlencoded"
|
||
data = {"tid": tid, "pstate": "-1"}
|
||
|
||
try:
|
||
resp = requests.post(RESET_URL, headers=request_headers, data=data, timeout=10)
|
||
resp.raise_for_status()
|
||
result = resp.json()
|
||
if result.get("isok"):
|
||
log(f"话题 {tid} ({title}) 状态0已重置为-1", "SUCCESS")
|
||
else:
|
||
log(f"话题 {tid} ({title}) 重置失败: {result.get('Msg')} (code: {result.get('code')})", "ERROR")
|
||
return result
|
||
except Exception as e:
|
||
log(f"重置话题 {tid} ({title}) 出错: {e}", "ERROR")
|
||
return None
|
||
|
||
# -------------------- 浏览器操作 -------------------- #
|
||
def find_element_with_retry(driver, selectors, timeout=10):
|
||
for selector in selectors:
|
||
try:
|
||
el = WebDriverWait(driver, timeout).until(
|
||
EC.presence_of_element_located((By.XPATH, selector))
|
||
)
|
||
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", el)
|
||
return el
|
||
except:
|
||
continue
|
||
return None
|
||
|
||
def click_element_js(driver, element):
|
||
try:
|
||
driver.execute_script("arguments[0].click();", element)
|
||
return True
|
||
except Exception as e:
|
||
log(f"JS点击失败: {e}", "ERROR")
|
||
return False
|
||
|
||
def wait_status_minus1(tid, headers, cookies, retries=10, interval=1):
|
||
for _ in range(retries):
|
||
topics = get_topics(headers, cookies)
|
||
for t in topics:
|
||
if t.get("id") == tid and t.get("status") == -1:
|
||
return True
|
||
time.sleep(interval)
|
||
return False
|
||
|
||
def automate_browser(topic_id, title, driver, main_window, headers, cookies):
|
||
try:
|
||
driver.switch_to.new_window('tab')
|
||
url = f"https://live.vzan.com/admin/index.html?zbid=951423954&v=638941334981302363#/TopicManage/BaseSetting?topicId={topic_id}"
|
||
log(f"打开话题ID: {topic_id} ({title})", "INFO")
|
||
driver.get(url)
|
||
WebDriverWait(driver, 15).until(lambda d: str(topic_id) in d.current_url)
|
||
time.sleep(2)
|
||
|
||
# 隐藏tip遮挡
|
||
try:
|
||
tip = driver.find_element(By.CSS_SELECTOR, "div.tip-wrap")
|
||
if tip.is_displayed():
|
||
driver.execute_script("arguments[0].style.display='none';", tip)
|
||
except:
|
||
pass
|
||
|
||
fake_live_button = find_element_with_retry(driver, [
|
||
"//button[contains(text(), '伪直播')]",
|
||
"//span[contains(text(), '伪直播')]/.."
|
||
], timeout=10)
|
||
if fake_live_button:
|
||
click_element_js(driver, fake_live_button)
|
||
log(f"话题 {topic_id} ({title}) 已点击伪直播按钮", "INFO")
|
||
time.sleep(1)
|
||
|
||
save_button = find_element_with_retry(driver, [
|
||
"//button[contains(@class,'create-live-btn')]//span[contains(normalize-space(text()),'保 存')]",
|
||
"//button[contains(text(),'保 存')]"
|
||
], timeout=10)
|
||
if save_button:
|
||
click_element_js(driver, save_button)
|
||
log(f"话题 {topic_id} ({title}) 已点击保存按钮", "INFO")
|
||
time.sleep(2)
|
||
else:
|
||
log(f"话题 {topic_id} ({title}) 未找到保存按钮,截图保存", "ERROR")
|
||
driver.save_screenshot(f"save_not_found_{topic_id}.png")
|
||
driver.close()
|
||
driver.switch_to.window(main_window)
|
||
return False
|
||
|
||
if wait_status_minus1(topic_id, headers, cookies):
|
||
log(f"话题 {topic_id} ({title}) 状态已确认为-1,浏览器操作完成", "SUCCESS")
|
||
else:
|
||
log(f"话题 {topic_id} ({title}) 状态未生效,请检查", "ERROR")
|
||
|
||
driver.close()
|
||
driver.switch_to.window(main_window)
|
||
return True
|
||
except Exception as e:
|
||
log(f"处理话题 {topic_id} ({title}) 时出错: {e}", "ERROR")
|
||
driver.close()
|
||
driver.switch_to.window(main_window)
|
||
return False
|
||
|
||
# -------------------- 主循环 -------------------- #
|
||
def main_loop():
|
||
auth = load_auth()
|
||
headers, cookies = auth["headers"], auth["cookies"]
|
||
|
||
chrome_options = Options()
|
||
chrome_options.add_argument("--start-maximized")
|
||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 复用登录状态
|
||
chrome_options.add_argument("--headless=new")
|
||
chrome_options.add_argument("--disable-gpu")
|
||
chrome_options.add_argument("--window-size=1920,1080")
|
||
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
driver.get("https://live.vzan.com/admin/index.html")
|
||
main_window = driver.current_window_handle
|
||
|
||
try:
|
||
while True:
|
||
topics = get_topics(headers, cookies)
|
||
if not topics:
|
||
time.sleep(10)
|
||
continue
|
||
|
||
zero_topics = [t for t in topics if t.get("status") == 0]
|
||
for t in zero_topics:
|
||
tid = t.get("id")
|
||
title = t.get("title", "")
|
||
if tid:
|
||
reset_status_to_minus1(tid, title, headers, cookies)
|
||
|
||
minus1_topics = [t for t in topics if t.get("status") in (-1, 0)]
|
||
for t in minus1_topics:
|
||
tid = t.get("id")
|
||
title = t.get("title", "")
|
||
if tid:
|
||
automate_browser(tid, title, driver, main_window, headers, cookies)
|
||
|
||
log("等待10秒再次轮询...", "INFO")
|
||
time.sleep(10)
|
||
finally:
|
||
driver.quit()
|
||
|
||
if __name__ == "__main__":
|
||
log("程序启动,动态检测话题状态并处理", "INFO")
|
||
main_loop()
|