更改鉴权信息获取逻辑
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -174,3 +174,6 @@ cython_debug/
|
||||
# PyPI configuration file
|
||||
.pypirc
|
||||
|
||||
# cookie
|
||||
auth.txt
|
||||
chrome_user_data
|
||||
62
main.py
62
main.py
@@ -2,7 +2,7 @@ import json
|
||||
import os
|
||||
import time
|
||||
import requests
|
||||
from seleniumwire import webdriver
|
||||
from seleniumwire import webdriver # pip install selenium-wire
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
@@ -12,18 +12,63 @@ from datetime import datetime
|
||||
AUTH_FILE = "auth.txt"
|
||||
TARGET_URL = "https://live-liveapi.vzan.com/api/v1/topic/get_topicdatas"
|
||||
RESET_URL = "https://live.vzan.com/NLive/ReSetStatus"
|
||||
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data")
|
||||
ADMIN_URL = "https://live.vzan.com/admin/index.html?zbid=951423954&v=638941728939484662"
|
||||
USER_DATA_DIR = os.path.join(os.getcwd(), "chrome_user_data") # 浏览器登录状态持久化目录
|
||||
|
||||
# -------------------- 日志 -------------------- #
|
||||
def log(msg, level="INFO"):
|
||||
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {msg}")
|
||||
|
||||
# -------------------- Auth -------------------- #
|
||||
def save_auth(headers, cookies):
|
||||
data = {"headers": headers, "cookies": cookies}
|
||||
with open(AUTH_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||||
log(f"Headers 和 Cookies 已保存到 {AUTH_FILE}", "INFO")
|
||||
|
||||
def load_auth():
|
||||
if os.path.exists(AUTH_FILE):
|
||||
with open(AUTH_FILE, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
raise Exception(f"{AUTH_FILE} 不存在,请先生成认证信息")
|
||||
else:
|
||||
log(f"{AUTH_FILE} 不存在,启动浏览器手动登录获取认证信息", "INFO")
|
||||
return get_auth_from_browser()
|
||||
|
||||
def get_auth_from_browser():
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--start-maximized")
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 保存登录状态
|
||||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||||
chrome_options.add_argument("--ignore-certificate-errors")
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
driver.requests.clear()
|
||||
|
||||
driver.get("https://live.vzan.com/admin/index.html")
|
||||
input("请手动登录网站,登录完成后按回车继续...")
|
||||
driver.get(ADMIN_URL)
|
||||
log("等待页面加载完成...", "INFO")
|
||||
|
||||
# 捕获 headers 和 cookies
|
||||
timeout = 60
|
||||
start_time = time.time()
|
||||
captured_headers = None
|
||||
captured_cookies = None
|
||||
while time.time() - start_time < timeout:
|
||||
for request in driver.requests:
|
||||
if request.url.startswith(ADMIN_URL) and request.method.upper() == "GET":
|
||||
captured_cookies = request.headers.get("Cookie", "")
|
||||
if request.url.startswith(TARGET_URL) and request.response:
|
||||
captured_headers = dict(request.headers)
|
||||
if captured_headers and captured_cookies:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
driver.quit() # 登录状态已保存到 USER_DATA_DIR
|
||||
if not captured_headers or not captured_cookies:
|
||||
raise Exception("未捕获到 headers 或 cookies,请确认登录完成")
|
||||
|
||||
save_auth(captured_headers, captured_cookies)
|
||||
return {"headers": captured_headers, "cookies": captured_cookies}
|
||||
|
||||
# -------------------- 获取话题 -------------------- #
|
||||
def get_topics(headers, cookies):
|
||||
@@ -90,7 +135,6 @@ def click_element_js(driver, element):
|
||||
return False
|
||||
|
||||
def wait_status_minus1(tid, headers, cookies, retries=10, interval=1):
|
||||
"""确认话题状态为-1再进行关闭页面"""
|
||||
for _ in range(retries):
|
||||
topics = get_topics(headers, cookies)
|
||||
for t in topics:
|
||||
@@ -113,7 +157,6 @@ def automate_browser(topic_id, title, driver, main_window, headers, cookies):
|
||||
tip = driver.find_element(By.CSS_SELECTOR, "div.tip-wrap")
|
||||
if tip.is_displayed():
|
||||
driver.execute_script("arguments[0].style.display='none';", tip)
|
||||
log(f"话题 {topic_id} ({title}) tip-wrap遮挡已隐藏", "INFO")
|
||||
except:
|
||||
pass
|
||||
|
||||
@@ -141,11 +184,10 @@ def automate_browser(topic_id, title, driver, main_window, headers, cookies):
|
||||
driver.switch_to.window(main_window)
|
||||
return False
|
||||
|
||||
# 等待状态真正变为-1再关闭页面
|
||||
if wait_status_minus1(topic_id, headers, cookies):
|
||||
log(f"话题 {topic_id} ({title}) 状态已确认为-1,浏览器操作完成", "SUCCESS")
|
||||
else:
|
||||
log(f"话题 {topic_id} ({title}) 状态未生效,可能需要手动检查", "ERROR")
|
||||
log(f"话题 {topic_id} ({title}) 状态未生效,请检查", "ERROR")
|
||||
|
||||
driver.close()
|
||||
driver.switch_to.window(main_window)
|
||||
@@ -163,7 +205,7 @@ def main_loop():
|
||||
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--start-maximized")
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
|
||||
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}") # 复用登录状态
|
||||
chrome_options.add_argument("--headless=new")
|
||||
chrome_options.add_argument("--disable-gpu")
|
||||
chrome_options.add_argument("--window-size=1920,1080")
|
||||
@@ -179,7 +221,6 @@ def main_loop():
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
||||
# 状态为0的先重置为-1
|
||||
zero_topics = [t for t in topics if t.get("status") == 0]
|
||||
for t in zero_topics:
|
||||
tid = t.get("id")
|
||||
@@ -187,8 +228,7 @@ def main_loop():
|
||||
if tid:
|
||||
reset_status_to_minus1(tid, title, headers, cookies)
|
||||
|
||||
# 状态为-1的处理浏览器操作
|
||||
minus1_topics = [t for t in topics if t.get("status") == -1 or t.get("status") == 0]
|
||||
minus1_topics = [t for t in topics if t.get("status") in (-1, 0)]
|
||||
for t in minus1_topics:
|
||||
tid = t.get("id")
|
||||
title = t.get("title", "")
|
||||
|
||||
Reference in New Issue
Block a user