diff --git a/.github/Toos/大秀直播/aisimu.py b/.github/Toos/大秀直播/aisimu.py new file mode 100644 index 000000000..ace0557ca --- /dev/null +++ b/.github/Toos/大秀直播/aisimu.py @@ -0,0 +1,287 @@ +import requests +import json +import time +import os +from bs4 import BeautifulSoup +from urllib.parse import urljoin +from concurrent.futures import ThreadPoolExecutor, as_completed + +class AisiMuScraper: + + def __init__(self, config_path="config.json"): + self.config = self._load_config(config_path) + + self.session = requests.Session() + self.session.headers.update({ + "User-Agent": self.config.get("user_agent") + }) + + self.category_urls = {} # {url: 分类名} + self.results = {} # {直播url: 主播/房间名} + self.old_urls = set() + self.new_urls = set() + + self.output_dir = "output" + os.makedirs(self.output_dir, exist_ok=True) + + self._load_history() + + # ================= 基础 ================= + + def _load_config(self, path): + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + def tg(self, text): + token = self.config.get("tg_token") + chat_id = self.config.get("tg_chat_id") + if not token or not chat_id: + return + try: + requests.post( + f"https://api.telegram.org/bot{token}/sendMessage", + data={"chat_id": chat_id, "text": text}, + timeout=10 + ) + except Exception as e: + print(f"[AISIMU] Telegram 消息发送失败: {e}") + + def tg_file(self, filepath, caption=""): + token = self.config.get("tg_token") + chat_id = self.config.get("tg_chat_id") + if not token or not chat_id: + return + try: + with open(filepath, "rb") as f: + requests.post( + f"https://api.telegram.org/bot{token}/sendDocument", + data={ + "chat_id": chat_id, + "caption": caption + }, + files={"document": f}, + timeout=30 + ) + except Exception as e: + print(f"[AISIMU] Telegram 文件发送失败: {e}") + + # ================= 登录 ================= + + def login(self): + print("[AISIMU] 尝试登录:", self.config["login_url"]) + try: + r = self.session.get(self.config["login_url"], timeout=10) + soup = BeautifulSoup(r.text, "html.parser") + + payload = { + self.config["username_field"]: self.config["username"], + self.config["password_field"]: self.config["password"] + } + + token_field = self.config.get("csrf_token_field") + if token_field: + token = soup.find("input", {"name": token_field}) + if token: + payload[token_field] = token.get("value") + + r = self.session.post( + self.config["login_url"], + data=payload, + allow_redirects=True, + timeout=10 + ) + + if self.config["login_failed_check_text"] in r.text: + print("[AISIMU] ❌ 登录失败") + self.tg("❌ AISIMU 登录失败") + return False + + print("[AISIMU] 登录成功") + return True + + except Exception as e: + print("[AISIMU] 登录异常:", e) + self.tg("❌ AISIMU 登录异常") + return False + + # ================= 分类页 ================= + + def fetch_index(self): + r = self.session.get(self.config["logged_in_expected_url"], timeout=10) + soup = BeautifulSoup(r.text, "html.parser") + + for a in soup.select('a[href*="zblist.php"]'): + name = a.text.strip() + url = urljoin(self.config["logged_in_expected_url"], a["href"]) + self.category_urls[url] = name + + print(f"[AISIMU] 发现分类页: {len(self.category_urls)}") + + # ======== 抓取 + 过滤规则(按你要求修正)======== + def fetch_category(self, url, cname, idx, total): + try: + r = self.session.get(url, timeout=10) + soup = BeautifulSoup(r.text, "html.parser") + + for tr in soup.select("table tr"): + tds = tr.find_all("td") + if len(tds) < 4: + continue + + room_name = tds[2].get_text(strip=True) + live = tds[3].get_text(strip=True) + + if not live.startswith("http"): + continue + + # === 你的核心过滤规则 === + banned_words = ["广播", "查看主播", "支付宝风控解除,之声,实力带飞,财经"] + if any(w in room_name for w in banned_words): + continue + + # 只保留真正的主播名 + self.results[live] = room_name + + print(f"[AISIMU] 分类页进度: {idx}/{total}") + + except Exception as e: + print(f"[AISIMU] ✖ 分类失败: {cname} -> {e}") + + # ================= 增量 ================= + + def _load_history(self): + path = os.path.join(self.output_dir, "history.txt") + if os.path.exists(path): + with open(path, "r", encoding="utf-8") as f: + self.old_urls = set(x.strip() for x in f if x.strip()) + + def _save_history(self): + path = os.path.join(self.output_dir, "history.txt") + with open(path, "w", encoding="utf-8") as f: + for u in sorted(self.results.keys()): + f.write(u + "\n") + + # ================= 多线程检测 ================= + + def check_stream(self, url): + try: + r = self.session.head( + url, + timeout=(2, 4), + allow_redirects=True + ) + ok = r.status_code in (200, 301, 302) + r.close() + return ok + except Exception: + return False + + def validate_streams(self): + print("[AISIMU] 多线程检测直播源可用性...") + + valid = {} + total = len(self.results) + + with ThreadPoolExecutor(max_workers=10) as pool: + future_map = { + pool.submit(self.check_stream, url): (url, name) + for url, name in self.results.items() + } + + for i, future in enumerate(as_completed(future_map), 1): + url, name = future_map[future] + try: + if future.result(): + valid[url] = name + except Exception: + pass + + if i % 20 == 0 or i == total: + print(f"[AISIMU] 检测进度: {i}/{total}") + + self.results = valid + print(f"[AISIMU] ✅ 检测完成,可用源: {len(valid)}/{total}") + + # ================= M3U 导出(按你要求修正) ================= + + def export_m3u(self): + lines = ["#EXTM3U"] + + for url, room_name in self.results.items(): + # 👉 这里已经去掉 group-title="查看主播" + lines.append(f'#EXTINF:-1,{room_name}') + lines.append(url) + + path = os.path.join(self.output_dir, "aisimu.m3u") + with open(path, "w", encoding="utf-8") as f: + f.write("\n".join(lines)) + + print("[AISIMU] M3U 导出完成:", path) + return path + + # ================= TXT 导出 ================= + + def export_txt(self): + path = os.path.join(self.output_dir, "aisimu.txt") + + with open(path, "w", encoding="utf-8") as f: + for url, room_name in self.results.items(): + f.write(f"{room_name}\t{url}\n") + + print("[AISIMU] TXT 导出完成:", path) + return path + + # ================= JSON 导出 ================= + + def export_json(self): + path = os.path.join(self.output_dir, "aisimu.json") + + data = [ + {"name": name, "url": url} + for url, name in self.results.items() + ] + + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + print("[AISIMU] JSON 导出完成:", path) + return path + + # ================= 主流程 ================= + + def run(self): + if not self.login(): + return + + self.fetch_index() + + total = len(self.category_urls) + with ThreadPoolExecutor(max_workers=6) as pool: + tasks = [] + for i, (url, name) in enumerate(self.category_urls.items(), 1): + tasks.append(pool.submit(self.fetch_category, url, name, i, total)) + for _ in as_completed(tasks): + pass + + self.validate_streams() + + self.new_urls = set(self.results) - self.old_urls + if self.new_urls: + self.tg(f"🆕 新增直播源 {len(self.new_urls)} 条") + + m3u_path = self.export_m3u() + txt_path = self.export_txt() + json_path = self.export_json() + + self._save_history() + + self.tg_file( + m3u_path, + caption=f"✅ AISIMU 采集完成\n有效源: {len(self.results)}" + ) + + print("[AISIMU] 全流程完成,脚本退出") + + +if __name__ == "__main__": + AisiMuScraper().run()