From 45dad0d775b34d14aff0c683a06e9a17eaed1154 Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Mon, 30 Sep 2024 00:26:00 +0800 Subject: [PATCH] Update GAT.py --- py/GAT.py | 118 +++++++++++++++++++++++++++++------------------------- 1 file changed, 64 insertions(+), 54 deletions(-) diff --git a/py/GAT.py b/py/GAT.py index b17da16..5c0a057 100644 --- a/py/GAT.py +++ b/py/GAT.py @@ -175,86 +175,96 @@ with open('gat.txt', 'w', encoding='utf-8') as new_file: -import cv2 +import requests import time +import cv2 +from urllib.parse import urlparse from tqdm import tqdm - -# 存储文件路径 -file_path = "gat.txt" -output_file_path = "gat.txt" - -# 打开输入文件和输出文件 -with open(file_path, 'r', encoding='utf-8') as file: - lines = file.readlines() - -# 获取总行数用于进度条 -total_lines = len(lines) - -# 写入通过检测的行到新文件 -with open(output_file_path, 'w', encoding='utf-8') as output_file: - # 使用tqdm显示进度条 - for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'): - # 检查是否包含 'genre' - if 'genre' in line: - output_file.write(line) - continue - # 分割频道名称和URL,并去除空白字符 - parts = line.split(',', 1) - if len(parts) == 2: - channel_name, url = parts - channel_name = channel_name.strip() - url = url.strip() - # 进行检测 - cap = cv2.VideoCapture(url) - start_time = time.time() - frame_count = 0 - # 尝试捕获10秒内的帧 - while frame_count < 25 and (time.time() - start_time) < 2: - ret, frame = cap.read() - if not ret: - break - frame_count += 1 - # 释放资源 - cap.release() - # 根据捕获的帧数判断状态并记录结果 - if frame_count >= 25: # 10秒内超过200帧则写入 - output_file.write(line) # 写入检测通过的行 - -# 无需再打印酒店源,因为这里是对所有URL进行检测,而不是基于IP分组检测 - +# 测试HTTP连接并尝试下载数据 +def test_connectivity_and_download(url, initial_timeout=1, retry_timeout=1): + parsed_url = urlparse(url) + if parsed_url.scheme not in ['http', 'https']: + # 非HTTP(s)协议,尝试RTSP检测 + return test_rtsp_connectivity(url, retry_timeout) + else: + # HTTP(s)协议,使用原始方法 + try: + with requests.get(url, stream=True, timeout=initial_timeout) as response: + if response.status_code == 200: + start_time = time.time() + while time.time() - start_time < initial_timeout: + chunk = response.raw.read(512) # 尝试下载1KB数据 + if chunk: + return True # 成功下载数据 + except requests.RequestException as e: + print(f"请求异常: {e}") + pass #这行删掉则会在下载不到数据流的时候进行连通性测试 + return False # 默认返回False +print("/" * 80) +# 测试RTSP连接并尝试读取流 +def test_rtsp_connectivity(url, timeout=3): + cap = cv2.VideoCapture(url) + if not cap.isOpened(): + return False + start_time = time.time() + while time.time() - start_time < timeout: + ret, _ = cap.read() + if ret: + return True # 成功读取帧 + cap.release() + return False +# 主函数 +def main(输入, 输出): + with open(输入, "r", encoding="utf-8") as source_file: + lines = source_file.readlines() + results = [] + for line_number, line in enumerate(tqdm(lines, desc="检测中")): + parts = line.strip().split(",") + if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空 + channel_name, channel_url = parts + try: + is_valid = test_connectivity_and_download(channel_url) + except Exception as e: + print(f"检测URL {channel_url} 时发生错误: {e}") + is_valid = False # 将异常的URL视为无效 + status = "有效" if is_valid else "无效" + if "genre" in line.lower() or status == "有效": + results.append((channel_name.strip(), channel_url.strip(), status)) + # 写入文件 + with open(输出, "w", encoding="utf-8") as output_file: + for channel_name, channel_url, status in results: + output_file.write(f"{channel_name},{channel_url}\n") + print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}") +if __name__ == "__main__": + 输入 = "gat.txt" #input('请输入utf-8编码的直播源文件路径:') + 输出 = "gat.txt" + main(输入, 输出) +#######################普通排序 with open('gat.txt', 'r', encoding='UTF-8') as f: lines = f.readlines() - lines.sort() - with open('gat.txt', 'w', encoding='UTF-8') as f: for line in lines: f.write(line) - +#######################拼音排序 import re from pypinyin import lazy_pinyin - # 打开一个utf-8编码的文本文件 with open("gat.txt", "r", encoding="utf-8") as file: # 读取所有行并存储到列表中 lines = file.readlines() - # 定义一个函数,用于提取每行的第一个数字 def extract_first_number(line): match = re.search(r'\d+', line) return int(match.group()) if match else float('inf') - # 对列表中的行进行排序,按照第一个数字的大小排列,其余行按中文排序 sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip()))) - # 将排序后的行写入新的utf-8编码的文本文件 with open("gat.txt", "w", encoding="utf-8") as file: for line in sorted_lines: file.write(line) - - print("任务运行完毕,分类频道列表可查看文件夹内综合源.txt文件!")