cluntop upload /.github/Toos/大秀直播/aisimu.py
This commit is contained in:
@@ -0,0 +1,287 @@
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
from bs4 import BeautifulSoup
|
||||
from urllib.parse import urljoin
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
class AisiMuScraper:
|
||||
|
||||
def __init__(self, config_path="config.json"):
|
||||
self.config = self._load_config(config_path)
|
||||
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
"User-Agent": self.config.get("user_agent")
|
||||
})
|
||||
|
||||
self.category_urls = {} # {url: 分类名}
|
||||
self.results = {} # {直播url: 主播/房间名}
|
||||
self.old_urls = set()
|
||||
self.new_urls = set()
|
||||
|
||||
self.output_dir = "output"
|
||||
os.makedirs(self.output_dir, exist_ok=True)
|
||||
|
||||
self._load_history()
|
||||
|
||||
# ================= 基础 =================
|
||||
|
||||
def _load_config(self, path):
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
def tg(self, text):
|
||||
token = self.config.get("tg_token")
|
||||
chat_id = self.config.get("tg_chat_id")
|
||||
if not token or not chat_id:
|
||||
return
|
||||
try:
|
||||
requests.post(
|
||||
f"https://api.telegram.org/bot{token}/sendMessage",
|
||||
data={"chat_id": chat_id, "text": text},
|
||||
timeout=10
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[AISIMU] Telegram 消息发送失败: {e}")
|
||||
|
||||
def tg_file(self, filepath, caption=""):
|
||||
token = self.config.get("tg_token")
|
||||
chat_id = self.config.get("tg_chat_id")
|
||||
if not token or not chat_id:
|
||||
return
|
||||
try:
|
||||
with open(filepath, "rb") as f:
|
||||
requests.post(
|
||||
f"https://api.telegram.org/bot{token}/sendDocument",
|
||||
data={
|
||||
"chat_id": chat_id,
|
||||
"caption": caption
|
||||
},
|
||||
files={"document": f},
|
||||
timeout=30
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[AISIMU] Telegram 文件发送失败: {e}")
|
||||
|
||||
# ================= 登录 =================
|
||||
|
||||
def login(self):
|
||||
print("[AISIMU] 尝试登录:", self.config["login_url"])
|
||||
try:
|
||||
r = self.session.get(self.config["login_url"], timeout=10)
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
|
||||
payload = {
|
||||
self.config["username_field"]: self.config["username"],
|
||||
self.config["password_field"]: self.config["password"]
|
||||
}
|
||||
|
||||
token_field = self.config.get("csrf_token_field")
|
||||
if token_field:
|
||||
token = soup.find("input", {"name": token_field})
|
||||
if token:
|
||||
payload[token_field] = token.get("value")
|
||||
|
||||
r = self.session.post(
|
||||
self.config["login_url"],
|
||||
data=payload,
|
||||
allow_redirects=True,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if self.config["login_failed_check_text"] in r.text:
|
||||
print("[AISIMU] ❌ 登录失败")
|
||||
self.tg("❌ AISIMU 登录失败")
|
||||
return False
|
||||
|
||||
print("[AISIMU] 登录成功")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print("[AISIMU] 登录异常:", e)
|
||||
self.tg("❌ AISIMU 登录异常")
|
||||
return False
|
||||
|
||||
# ================= 分类页 =================
|
||||
|
||||
def fetch_index(self):
|
||||
r = self.session.get(self.config["logged_in_expected_url"], timeout=10)
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
|
||||
for a in soup.select('a[href*="zblist.php"]'):
|
||||
name = a.text.strip()
|
||||
url = urljoin(self.config["logged_in_expected_url"], a["href"])
|
||||
self.category_urls[url] = name
|
||||
|
||||
print(f"[AISIMU] 发现分类页: {len(self.category_urls)}")
|
||||
|
||||
# ======== 抓取 + 过滤规则(按你要求修正)========
|
||||
def fetch_category(self, url, cname, idx, total):
|
||||
try:
|
||||
r = self.session.get(url, timeout=10)
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
|
||||
for tr in soup.select("table tr"):
|
||||
tds = tr.find_all("td")
|
||||
if len(tds) < 4:
|
||||
continue
|
||||
|
||||
room_name = tds[2].get_text(strip=True)
|
||||
live = tds[3].get_text(strip=True)
|
||||
|
||||
if not live.startswith("http"):
|
||||
continue
|
||||
|
||||
# === 你的核心过滤规则 ===
|
||||
banned_words = ["广播", "查看主播", "支付宝风控解除,之声,实力带飞,财经"]
|
||||
if any(w in room_name for w in banned_words):
|
||||
continue
|
||||
|
||||
# 只保留真正的主播名
|
||||
self.results[live] = room_name
|
||||
|
||||
print(f"[AISIMU] 分类页进度: {idx}/{total}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[AISIMU] ✖ 分类失败: {cname} -> {e}")
|
||||
|
||||
# ================= 增量 =================
|
||||
|
||||
def _load_history(self):
|
||||
path = os.path.join(self.output_dir, "history.txt")
|
||||
if os.path.exists(path):
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
self.old_urls = set(x.strip() for x in f if x.strip())
|
||||
|
||||
def _save_history(self):
|
||||
path = os.path.join(self.output_dir, "history.txt")
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
for u in sorted(self.results.keys()):
|
||||
f.write(u + "\n")
|
||||
|
||||
# ================= 多线程检测 =================
|
||||
|
||||
def check_stream(self, url):
|
||||
try:
|
||||
r = self.session.head(
|
||||
url,
|
||||
timeout=(2, 4),
|
||||
allow_redirects=True
|
||||
)
|
||||
ok = r.status_code in (200, 301, 302)
|
||||
r.close()
|
||||
return ok
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def validate_streams(self):
|
||||
print("[AISIMU] 多线程检测直播源可用性...")
|
||||
|
||||
valid = {}
|
||||
total = len(self.results)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=10) as pool:
|
||||
future_map = {
|
||||
pool.submit(self.check_stream, url): (url, name)
|
||||
for url, name in self.results.items()
|
||||
}
|
||||
|
||||
for i, future in enumerate(as_completed(future_map), 1):
|
||||
url, name = future_map[future]
|
||||
try:
|
||||
if future.result():
|
||||
valid[url] = name
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if i % 20 == 0 or i == total:
|
||||
print(f"[AISIMU] 检测进度: {i}/{total}")
|
||||
|
||||
self.results = valid
|
||||
print(f"[AISIMU] ✅ 检测完成,可用源: {len(valid)}/{total}")
|
||||
|
||||
# ================= M3U 导出(按你要求修正) =================
|
||||
|
||||
def export_m3u(self):
|
||||
lines = ["#EXTM3U"]
|
||||
|
||||
for url, room_name in self.results.items():
|
||||
# 👉 这里已经去掉 group-title="查看主播"
|
||||
lines.append(f'#EXTINF:-1,{room_name}')
|
||||
lines.append(url)
|
||||
|
||||
path = os.path.join(self.output_dir, "aisimu.m3u")
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(lines))
|
||||
|
||||
print("[AISIMU] M3U 导出完成:", path)
|
||||
return path
|
||||
|
||||
# ================= TXT 导出 =================
|
||||
|
||||
def export_txt(self):
|
||||
path = os.path.join(self.output_dir, "aisimu.txt")
|
||||
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
for url, room_name in self.results.items():
|
||||
f.write(f"{room_name}\t{url}\n")
|
||||
|
||||
print("[AISIMU] TXT 导出完成:", path)
|
||||
return path
|
||||
|
||||
# ================= JSON 导出 =================
|
||||
|
||||
def export_json(self):
|
||||
path = os.path.join(self.output_dir, "aisimu.json")
|
||||
|
||||
data = [
|
||||
{"name": name, "url": url}
|
||||
for url, name in self.results.items()
|
||||
]
|
||||
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print("[AISIMU] JSON 导出完成:", path)
|
||||
return path
|
||||
|
||||
# ================= 主流程 =================
|
||||
|
||||
def run(self):
|
||||
if not self.login():
|
||||
return
|
||||
|
||||
self.fetch_index()
|
||||
|
||||
total = len(self.category_urls)
|
||||
with ThreadPoolExecutor(max_workers=6) as pool:
|
||||
tasks = []
|
||||
for i, (url, name) in enumerate(self.category_urls.items(), 1):
|
||||
tasks.append(pool.submit(self.fetch_category, url, name, i, total))
|
||||
for _ in as_completed(tasks):
|
||||
pass
|
||||
|
||||
self.validate_streams()
|
||||
|
||||
self.new_urls = set(self.results) - self.old_urls
|
||||
if self.new_urls:
|
||||
self.tg(f"🆕 新增直播源 {len(self.new_urls)} 条")
|
||||
|
||||
m3u_path = self.export_m3u()
|
||||
txt_path = self.export_txt()
|
||||
json_path = self.export_json()
|
||||
|
||||
self._save_history()
|
||||
|
||||
self.tg_file(
|
||||
m3u_path,
|
||||
caption=f"✅ AISIMU 采集完成\n有效源: {len(self.results)}"
|
||||
)
|
||||
|
||||
print("[AISIMU] 全流程完成,脚本退出")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
AisiMuScraper().run()
|
||||
Reference in New Issue
Block a user