diff --git a/py/GAT.py b/py/GAT.py index 0afb9b0..f82b489 100644 --- a/py/GAT.py +++ b/py/GAT.py @@ -174,50 +174,62 @@ with open('gat.txt', 'w', encoding='utf-8') as new_file: -import cv2 -import time +import requests from tqdm import tqdm - -# 存储文件路径 -file_path = "gat.txt" -output_file_path = "gat.txt" - -# 打开输入文件和输出文件 -with open(file_path, 'r', encoding='utf-8') as file: - lines = file.readlines() - -# 获取总行数用于进度条 -total_lines = len(lines) - -# 写入通过检测的行到新文件 -with open(output_file_path, 'w', encoding='utf-8') as output_file: - # 使用tqdm显示进度条 - for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'): - # 检查是否包含 'genre' - if 'genre' in line: - output_file.write(line) - continue - # 分割频道名称和URL,并去除空白字符 - parts = line.split(',', 1) - if len(parts) == 2: - channel_name, url = parts - channel_name = channel_name.strip() - url = url.strip() - # 进行检测 - cap = cv2.VideoCapture(url) - start_time = time.time() - frame_count = 0 - # 尝试捕获10秒内的帧 - while frame_count < 25 and (time.time() - start_time) < 2: - ret, frame = cap.read() - if not ret: - break - frame_count += 1 - # 释放资源 - cap.release() - # 根据捕获的帧数判断状态并记录结果 - if frame_count >= 25: # 10秒内超过200帧则写入 - output_file.write(line) # 写入检测通过的行 +import threading +import re +def test_connectivity(url, max_attempts=1): + video_formats = ["m3u", "/", "rtsp"] + if not any(re.search(keyword, url, re.I) for keyword in video_formats): + print("\n特殊网址: 跳过检测") + return False + for _ in range(max_attempts): + try: + response = requests.get(url, timeout=0.3) + return response.status_code == 200 + except requests.RequestException: + pass + return False +# 处理每一行的函数 +def process_line(line, output_file, valid_count, invalid_count): + parts = line.strip().split(",") + if len(parts) == 2: + channel_name, channel_url = parts + if "genre" in line.lower(): + with threading.Lock(): + output_file.write("\n" +line) # 直接写入原始行 + elif test_connectivity(channel_url): + with threading.Lock(): + output_file.write(f"{channel_name},{channel_url}\n") + valid_count[0] += 1 + else: + with threading.Lock(): + invalid_count[0] += 1 + else: + with threading.Lock(): + invalid_count[0] += 1 +# 主函数 +def main(source_file_path, output_file_path): + with open(source_file_path, "r", encoding="utf-8") as source_file: + lines = source_file.readlines() + valid_count = [0] + invalid_count = [0] + with open(output_file_path, "w", encoding="utf-8") as output_file: + threads = [] + for line in tqdm(lines, desc="地址有效"): + thread = threading.Thread(target=process_line, args=(line, output_file, valid_count, invalid_count)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + print(f"任务完成,有效源数量: {valid_count[0]}, 无效源数量: {invalid_count[0]}") +if __name__ == "__main__": + try: + source_file_path = "gat.txt" + output_file_path = "gat.txt" + main(source_file_path, output_file_path) + except Exception as e: + print(f"程序发生错误: {e}") # 无需再打印酒店源,因为这里是对所有URL进行检测,而不是基于IP分组检测