Update Up

This commit is contained in:
cluntop
2026-03-21 09:07:47 +08:00
parent 8ccd7acf77
commit c578e4b071
5 changed files with 119 additions and 15742 deletions
+118 -69
View File
@@ -10,13 +10,40 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# === 配置日志 ===
def setup_logger():
# 确保日志目录存在
os.makedirs("logs", exist_ok=True)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.handlers.clear() # 清除已有 handler 避免重复
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 控制台输出
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件输出
file_handler = logging.FileHandler("logs/iptv_update.log", encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
logger = setup_logger()
# 全局锁,用于文件写入
write_lock = threading.Lock()
def ensure_dir(file_path):
"""确保文件所在的目录存在"""
dirname = os.path.dirname(file_path)
if dirname:
os.makedirs(dirname, exist_ok=True)
def get_session():
"""创建一个带有重试机制的requests Session"""
session = requests.Session()
@@ -34,7 +61,8 @@ def load_urls_from_file(file_path):
return urls
try:
with open(file_path, "r", encoding="utf-8") as f:
# 使用 utf-8-sig 安全过滤由于记事本编辑可能产生的 \ufeff BOM 头
with open(file_path, "r", encoding="utf-8-sig") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
@@ -50,7 +78,8 @@ def parse_template(template_file):
current_category = None
try:
with open(template_file, "r", encoding="utf-8") as f:
# 使用 utf-8-sig 避免首行解析出错
with open(template_file, "r", encoding="utf-8-sig") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
@@ -63,68 +92,73 @@ def parse_template(template_file):
channel_name = line.split(",")[0].strip()
template_channels[current_category].append(channel_name)
except FileNotFoundError:
# 这是一个正常的流程控制,如果文件不存在可能只是没配置测试文件
logger.warning(f"模板文件未找到: {template_file}")
return None # 返回 None 表示失败
return None
return template_channels
def fetch_channels(url):
"""从URL获取频道列表"""
channels = OrderedDict()
session = get_session()
try:
response = session.get(url, timeout=30)
response.raise_for_status()
response.encoding = response.apparent_encoding or "utf-8"
lines = [line.strip() for line in response.text.splitlines() if line.strip()]
if not lines:
# 使用上下文管理器确保 socket 资源正确释放
with get_session() as session:
try:
with session.get(url, timeout=30) as response:
response.raise_for_status()
# 优化编码解析:跳过极其缓慢的 apparent_encoding 计算,直接指定 utf-8
if response.encoding is None or response.encoding.lower() == 'iso-8859-1':
response.encoding = 'utf-8'
text_content = response.text
except Exception as e:
logger.error(f"处理 {url} 时出错: {e}")
return channels
is_m3u = any("#EXTINF" in line for line in lines[:10])
if is_m3u:
current_category = "默认分类"
current_name = "未知频道"
re_group = re.compile(r'group-title="([^"]*)"')
re_name = re.compile(r',([^,]*)$')
for line in lines:
if line.startswith("#EXTINF"):
group_match = re_group.search(line)
if group_match:
current_category = group_match.group(1).strip()
name_match = re_name.search(line)
if name_match:
current_name = name_match.group(1).strip()
elif not line.startswith("#") and "://" in line:
if current_category not in channels:
channels[current_category] = []
if current_name and current_name != "未知频道":
channels[current_category].append((current_name, line))
current_name = "未知频道"
else:
current_category = None
for line in lines:
if "#genre#" in line:
current_category = line.split(",")[0].strip()
if current_category not in channels:
channels[current_category] = []
elif current_category and "," in line:
parts = line.split(",", 1)
if len(parts) == 2:
name, url = parts
if name.strip() and url.strip():
channels[current_category].append((name.strip(), url.strip()))
lines = [line.strip() for line in text_content.splitlines() if line.strip()]
if not lines:
return channels
except Exception as e:
logger.error(f"处理 {url} 时出错: {e}")
return OrderedDict()
is_m3u = any("#EXTINF" in line for line in lines[:10])
if is_m3u:
current_category = "默认分类"
current_name = "未知频道"
re_group = re.compile(r'group-title="([^"]*)"')
re_name = re.compile(r',([^,]*)$')
for line in lines:
if line.startswith("#EXTINF"):
group_match = re_group.search(line)
if group_match:
current_category = group_match.group(1).strip()
name_match = re_name.search(line)
if name_match:
current_name = name_match.group(1).strip()
elif not line.startswith("#") and "://" in line:
if current_category not in channels:
channels[current_category] = []
if current_name and current_name != "未知频道":
channels[current_category].append((current_name, line))
current_name = "未知频道"
else:
current_category = None
for line in lines:
if "#genre#" in line:
current_category = line.split(",")[0].strip()
if current_category not in channels:
channels[current_category] = []
elif current_category and "," in line:
parts = line.split(",", 1)
if len(parts) == 2:
name, url_part = parts
if name.strip() and url_part.strip():
channels[current_category].append((name.strip(), url_part.strip()))
return channels
def match_channels(template_channels, all_channels):
matched = OrderedDict()
@@ -158,7 +192,6 @@ def match_channels(template_channels, all_channels):
variants = list(OrderedDict.fromkeys(variants_raw))
primary_name = variants[0]
found_for_this_template = False
for variant in variants:
@@ -202,8 +235,9 @@ def generate_outputs(channels, template_channels, m3u_path, txt_path):
"""生成文件 - 路径参数化"""
written_urls = set()
# 确保输出目录存在
os.makedirs(os.path.dirname(m3u_path), exist_ok=True)
# 安全地确保输出目录存在
ensure_dir(m3u_path)
ensure_dir(txt_path)
try:
with write_lock:
@@ -249,14 +283,17 @@ def generate_outputs(channels, template_channels, m3u_path, txt_path):
logger.info(f"输出完成: {m3u_path}, {txt_path}")
except Exception as e:
logger.error(f"写入文件失败: {e}")
logger.error(f"写入输出文件失败: {e}")
def generate_unmatched_report(unmatched_template, unmatched_source, report_file):
"""生成未匹配报告 - 路径参数化"""
# 确保配置目录存在
os.makedirs(os.path.dirname(report_file), exist_ok=True)
"""生成未匹配报告"""
total_template_lost = sum(len(v) for v in unmatched_template.values())
# 如果未指定报告文件路径,则仅计算丢失数量,不执行文件写入
if not report_file:
return total_template_lost
ensure_dir(report_file)
try:
with open(report_file, "w", encoding="utf-8") as f:
@@ -286,7 +323,7 @@ def remove_unmatched_from_template(template_file, unmatched_template):
backup_file = template_file + ".backup"
try:
shutil.copy2(template_file, backup_file)
with open(template_file, "r", encoding="utf-8") as f:
with open(template_file, "r", encoding="utf-8-sig") as f:
lines = f.readlines()
new_lines = []
@@ -307,6 +344,9 @@ def remove_unmatched_from_template(template_file, unmatched_template):
if current_cat in to_remove and name in to_remove[current_cat]:
continue
new_lines.append(line)
else:
# 修复: 若不在任何 category 内的内容(如异常格式),不应被错误丢弃
new_lines.append(line)
with open(template_file, "w", encoding="utf-8") as f:
f.writelines(new_lines)
@@ -327,7 +367,9 @@ def process_iptv_task(template_file, tv_urls, output_m3u, output_txt, report_fil
logger.info(f"开始从 {len(tv_urls)} 个源获取数据...")
all_channels = OrderedDict()
# 这里使用临时Executor,或者可以将Executor传进来复用
success_count = 0
fail_count = 0
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(fetch_channels, url): url for url in tv_urls}
for future in as_completed(future_to_url):
@@ -335,14 +377,20 @@ def process_iptv_task(template_file, tv_urls, output_m3u, output_txt, report_fil
try:
data = future.result()
if data:
success_count += 1
for cat, chans in data.items():
if cat not in all_channels:
all_channels[cat] = []
all_channels[cat].extend(chans)
else:
fail_count += 1
except Exception as e:
fail_count += 1
logger.error(f"{url} 异常: {e}")
logger.info(f"数据获取完毕: 成功解析 {success_count} 个源,失败/空数据 {fail_count} 个源。")
logger.info("开始匹配频道...")
matched, unmatched_tmpl, unmatched_src = match_channels(template, all_channels)
generate_outputs(matched, template, output_m3u, output_txt)
@@ -370,7 +418,7 @@ if __name__ == "__main__":
tv_urls=TV_URLS,
output_m3u="lib/iptv.m3u",
output_txt="lib/iptv.txt",
report_file="py/config/unmatched.txt", # 修改报告名,避免覆盖测试配置
report_file="py/config/iptv.log",
auto_clean=True
)
@@ -380,10 +428,11 @@ if __name__ == "__main__":
process_iptv_task(
template_file=TEST_TEMPLATE_FILE,
tv_urls=TV_URLS,
output_m3u="lib/iptv_test.m3u", # 输出到 lib 目录
output_m3u="lib/iptv_test.m3u",
output_txt="lib/iptv_test.txt",
report_file="py/config/unmatched_test.txt", # 测试的报告单独存放
auto_clean=False # 测试列表建议不自动删除,方便调试
report_file=None,
auto_clean=False
)
else:
logger.info(f"未检测到测试配置 {TEST_TEMPLATE_FILE},跳过测试生成。")