288 lines
9.3 KiB
Python
Executable File
288 lines
9.3 KiB
Python
Executable File
import requests
|
|
import json
|
|
import time
|
|
import os
|
|
from bs4 import BeautifulSoup
|
|
from urllib.parse import urljoin
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
class AisiMuScraper:
|
|
|
|
def __init__(self, config_path="config.json"):
|
|
self.config = self._load_config(config_path)
|
|
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
"User-Agent": self.config.get("user_agent")
|
|
})
|
|
|
|
self.category_urls = {} # {url: 分类名}
|
|
self.results = {} # {直播url: 主播/房间名}
|
|
self.old_urls = set()
|
|
self.new_urls = set()
|
|
|
|
self.output_dir = "output"
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
|
|
self._load_history()
|
|
|
|
# ================= 基础 =================
|
|
|
|
def _load_config(self, path):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
def tg(self, text):
|
|
token = self.config.get("tg_token")
|
|
chat_id = self.config.get("tg_chat_id")
|
|
if not token or not chat_id:
|
|
return
|
|
try:
|
|
requests.post(
|
|
f"https://api.telegram.org/bot{token}/sendMessage",
|
|
data={"chat_id": chat_id, "text": text},
|
|
timeout=10
|
|
)
|
|
except Exception as e:
|
|
print(f"[AISIMU] Telegram 消息发送失败: {e}")
|
|
|
|
def tg_file(self, filepath, caption=""):
|
|
token = self.config.get("tg_token")
|
|
chat_id = self.config.get("tg_chat_id")
|
|
if not token or not chat_id:
|
|
return
|
|
try:
|
|
with open(filepath, "rb") as f:
|
|
requests.post(
|
|
f"https://api.telegram.org/bot{token}/sendDocument",
|
|
data={
|
|
"chat_id": chat_id,
|
|
"caption": caption
|
|
},
|
|
files={"document": f},
|
|
timeout=30
|
|
)
|
|
except Exception as e:
|
|
print(f"[AISIMU] Telegram 文件发送失败: {e}")
|
|
|
|
# ================= 登录 =================
|
|
|
|
def login(self):
|
|
print("[AISIMU] 尝试登录:", self.config["login_url"])
|
|
try:
|
|
r = self.session.get(self.config["login_url"], timeout=10)
|
|
soup = BeautifulSoup(r.text, "html.parser")
|
|
|
|
payload = {
|
|
self.config["username_field"]: self.config["username"],
|
|
self.config["password_field"]: self.config["password"]
|
|
}
|
|
|
|
token_field = self.config.get("csrf_token_field")
|
|
if token_field:
|
|
token = soup.find("input", {"name": token_field})
|
|
if token:
|
|
payload[token_field] = token.get("value")
|
|
|
|
r = self.session.post(
|
|
self.config["login_url"],
|
|
data=payload,
|
|
allow_redirects=True,
|
|
timeout=10
|
|
)
|
|
|
|
if self.config["login_failed_check_text"] in r.text:
|
|
print("[AISIMU] ❌ 登录失败")
|
|
self.tg("❌ AISIMU 登录失败")
|
|
return False
|
|
|
|
print("[AISIMU] 登录成功")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print("[AISIMU] 登录异常:", e)
|
|
self.tg("❌ AISIMU 登录异常")
|
|
return False
|
|
|
|
# ================= 分类页 =================
|
|
|
|
def fetch_index(self):
|
|
r = self.session.get(self.config["logged_in_expected_url"], timeout=10)
|
|
soup = BeautifulSoup(r.text, "html.parser")
|
|
|
|
for a in soup.select('a[href*="zblist.php"]'):
|
|
name = a.text.strip()
|
|
url = urljoin(self.config["logged_in_expected_url"], a["href"])
|
|
self.category_urls[url] = name
|
|
|
|
print(f"[AISIMU] 发现分类页: {len(self.category_urls)}")
|
|
|
|
# ======== 抓取 + 过滤规则(按你要求修正)========
|
|
def fetch_category(self, url, cname, idx, total):
|
|
try:
|
|
r = self.session.get(url, timeout=10)
|
|
soup = BeautifulSoup(r.text, "html.parser")
|
|
|
|
for tr in soup.select("table tr"):
|
|
tds = tr.find_all("td")
|
|
if len(tds) < 4:
|
|
continue
|
|
|
|
room_name = tds[2].get_text(strip=True)
|
|
live = tds[3].get_text(strip=True)
|
|
|
|
if not live.startswith("http"):
|
|
continue
|
|
|
|
# === 你的核心过滤规则 ===
|
|
banned_words = ["广播", "查看主播", "支付宝风控解除,之声,实力带飞,财经"]
|
|
if any(w in room_name for w in banned_words):
|
|
continue
|
|
|
|
# 只保留真正的主播名
|
|
self.results[live] = room_name
|
|
|
|
print(f"[AISIMU] 分类页进度: {idx}/{total}")
|
|
|
|
except Exception as e:
|
|
print(f"[AISIMU] ✖ 分类失败: {cname} -> {e}")
|
|
|
|
# ================= 增量 =================
|
|
|
|
def _load_history(self):
|
|
path = os.path.join(self.output_dir, "history.txt")
|
|
if os.path.exists(path):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
self.old_urls = set(x.strip() for x in f if x.strip())
|
|
|
|
def _save_history(self):
|
|
path = os.path.join(self.output_dir, "history.txt")
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
for u in sorted(self.results.keys()):
|
|
f.write(u + "\n")
|
|
|
|
# ================= 多线程检测 =================
|
|
|
|
def check_stream(self, url):
|
|
try:
|
|
r = self.session.head(
|
|
url,
|
|
timeout=(2, 4),
|
|
allow_redirects=True
|
|
)
|
|
ok = r.status_code in (200, 301, 302)
|
|
r.close()
|
|
return ok
|
|
except Exception:
|
|
return False
|
|
|
|
def validate_streams(self):
|
|
print("[AISIMU] 多线程检测直播源可用性...")
|
|
|
|
valid = {}
|
|
total = len(self.results)
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as pool:
|
|
future_map = {
|
|
pool.submit(self.check_stream, url): (url, name)
|
|
for url, name in self.results.items()
|
|
}
|
|
|
|
for i, future in enumerate(as_completed(future_map), 1):
|
|
url, name = future_map[future]
|
|
try:
|
|
if future.result():
|
|
valid[url] = name
|
|
except Exception:
|
|
pass
|
|
|
|
if i % 20 == 0 or i == total:
|
|
print(f"[AISIMU] 检测进度: {i}/{total}")
|
|
|
|
self.results = valid
|
|
print(f"[AISIMU] ✅ 检测完成,可用源: {len(valid)}/{total}")
|
|
|
|
# ================= M3U 导出(按你要求修正) =================
|
|
|
|
def export_m3u(self):
|
|
lines = ["#EXTM3U"]
|
|
|
|
for url, room_name in self.results.items():
|
|
# 👉 这里已经去掉 group-title="查看主播"
|
|
lines.append(f'#EXTINF:-1,{room_name}')
|
|
lines.append(url)
|
|
|
|
path = os.path.join(self.output_dir, "aisimu.m3u")
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(lines))
|
|
|
|
print("[AISIMU] M3U 导出完成:", path)
|
|
return path
|
|
|
|
# ================= TXT 导出 =================
|
|
|
|
def export_txt(self):
|
|
path = os.path.join(self.output_dir, "aisimu.txt")
|
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
for url, room_name in self.results.items():
|
|
f.write(f"{room_name}\t{url}\n")
|
|
|
|
print("[AISIMU] TXT 导出完成:", path)
|
|
return path
|
|
|
|
# ================= JSON 导出 =================
|
|
|
|
def export_json(self):
|
|
path = os.path.join(self.output_dir, "aisimu.json")
|
|
|
|
data = [
|
|
{"name": name, "url": url}
|
|
for url, name in self.results.items()
|
|
]
|
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
print("[AISIMU] JSON 导出完成:", path)
|
|
return path
|
|
|
|
# ================= 主流程 =================
|
|
|
|
def run(self):
|
|
if not self.login():
|
|
return
|
|
|
|
self.fetch_index()
|
|
|
|
total = len(self.category_urls)
|
|
with ThreadPoolExecutor(max_workers=6) as pool:
|
|
tasks = []
|
|
for i, (url, name) in enumerate(self.category_urls.items(), 1):
|
|
tasks.append(pool.submit(self.fetch_category, url, name, i, total))
|
|
for _ in as_completed(tasks):
|
|
pass
|
|
|
|
self.validate_streams()
|
|
|
|
self.new_urls = set(self.results) - self.old_urls
|
|
if self.new_urls:
|
|
self.tg(f"🆕 新增直播源 {len(self.new_urls)} 条")
|
|
|
|
m3u_path = self.export_m3u()
|
|
txt_path = self.export_txt()
|
|
json_path = self.export_json()
|
|
|
|
self._save_history()
|
|
|
|
self.tg_file(
|
|
m3u_path,
|
|
caption=f"✅ AISIMU 采集完成\n有效源: {len(self.results)}"
|
|
)
|
|
|
|
print("[AISIMU] 全流程完成,脚本退出")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
AisiMuScraper().run()
|