From 22f9fd621766fdffd8b57324d01f40aa84bf48f1 Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Mon, 30 Sep 2024 13:07:51 +0800 Subject: [PATCH] Update GAT.py --- py/GAT.py | 109 +++++++++++++++++++++++++++++------------------------- 1 file changed, 59 insertions(+), 50 deletions(-) diff --git a/py/GAT.py b/py/GAT.py index 3945cea..4e5dbfe 100644 --- a/py/GAT.py +++ b/py/GAT.py @@ -174,63 +174,72 @@ with open('gat.txt', 'w', encoding='utf-8') as new_file: -import requests -from tqdm import tqdm -import threading -import re -def test_connectivity(url, max_attempts=2): - video_formats = ["m3u", "/", "rtsp"] - if not any(re.search(keyword, url, re.I) for keyword in video_formats): - print("\n特殊网址: 跳过检测") - return False - for _ in range(max_attempts): - try: - response = requests.get(url, timeout=0.3) - return response.status_code == 200 - except requests.RequestException: - pass - return False -# 处理每一行的函数 -def process_line(line, output_file, valid_count, invalid_count): - parts = line.strip().split(",") - if len(parts) == 2: - channel_name, channel_url = parts - if "genre" in line.lower(): - with threading.Lock(): - output_file.write("\n" +line) # 直接写入原始行 - elif test_connectivity(channel_url): - with threading.Lock(): - output_file.write(f"{channel_name},{channel_url}\n") - valid_count[0] += 1 - else: - with threading.Lock(): - invalid_count[0] += 1 - else: +# 函数:获取视频分辨率 +def get_video_resolution(video_path, timeout=0.8): + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + return None + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + cap.release() + return (width, height) +# 函数:处理每一行 +def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines): + parts = line.strip().split(',') + if '#genre#' in line: + # 如果行包含 '#genre#',直接写入新文件 with threading.Lock(): + output_file.write(line) + print(f"已写入genre行:{line.strip()}") + elif len(parts) == 2: + channel_name, channel_url = parts + resolution = get_video_resolution(channel_url, timeout=8) + if resolution and resolution[1] >= 720: # 检查分辨率是否大于等于720p + with threading.Lock(): + output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n") + order_list.append((channel_name, resolution[1], channel_url)) + valid_count[0] += 1 + print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.") + else: invalid_count[0] += 1 + with threading.Lock(): + print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%") +# 函数:多线程工作 +def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines): + while True: + try: + line = task_queue.get(timeout=1) + process_line(line, output_file, order_list, valid_count, invalid_count, total_lines) + except Queue.Empty: + break + finally: + task_queue.task_done() + # 主函数 def main(source_file_path, output_file_path): - with open(source_file_path, "r", encoding="utf-8") as source_file: - lines = source_file.readlines() + order_list = [] valid_count = [0] invalid_count = [0] - with open(output_file_path, "w", encoding="utf-8") as output_file: - threads = [] - for line in tqdm(lines, desc="地址有效"): - thread = threading.Thread(target=process_line, args=(line, output_file, valid_count, invalid_count)) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - print(f"任务完成,有效源数量: {valid_count[0]}, 无效源数量: {invalid_count[0]}") + task_queue = Queue() + # 读取源文件 + with open(source_file_path, 'r', encoding='utf-8') as source_file: + lines = source_file.readlines() + with open(output_file_path + '.txt', 'w', encoding='utf-8') as output_file: + # 创建线程池 + with ThreadPoolExecutor(max_workers=64) as executor: + # 创建并启动工作线程 + for _ in range(128): + executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines)) + # 将所有行放入队列 + for line in lines: + task_queue.put(line) + # 等待队列中的所有任务完成 + task_queue.join() + print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}") if __name__ == "__main__": - try: - source_file_path = "gat.txt" - output_file_path = "gat.txt" - main(source_file_path, output_file_path) - except Exception as e: - print(f"程序发生错误: {e}") - + source_file_path = 'gat.txt' # 替换为你的源文件路径 + output_file_path = 'gat' # 替换为你的输出文件路径,不要后缀名 + main(source_file_path, output_file_path) # 无需再打印酒店源,因为这里是对所有URL进行检测,而不是基于IP分组检测