From 5447a48bba8167e586cd05b81f987b3d8770d0ec Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Fri, 20 Sep 2024 23:30:31 +0800 Subject: [PATCH] =?UTF-8?q?Update=20=E7=BD=91=E7=BB=9C=E6=94=B6=E9=9B=86.p?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/网络收集.py | 124 ++++++++++++++++++++++++------------------------- 1 file changed, 60 insertions(+), 64 deletions(-) diff --git a/py/网络收集.py b/py/网络收集.py index cf99f03..25b21fa 100644 --- a/py/网络收集.py +++ b/py/网络收集.py @@ -523,75 +523,71 @@ remove_duplicates('网络收集.txt', '网络收集.txt') ############################################################################全部检测,防止IP段失效 -import requests -import time -import cv2 -from urllib.parse import urlparse -from tqdm import tqdm - -# 测试HTTP连接并尝试下载数据 -def test_connectivity_and_download(url, initial_timeout=3, retry_timeout=5): - parsed_url = urlparse(url) - if parsed_url.scheme not in ['http', 'https']: - # 非HTTP(s)协议,尝试RTSP检测 - return test_rtsp_connectivity(url, retry_timeout) - else: - # HTTP(s)协议,使用原始方法 - try: - with requests.get(url, stream=True, timeout=initial_timeout) as response: - if 200 <= response.status_code <= 403: - start_time = time.time() - while time.time() - start_time < initial_timeout: - chunk = response.raw.read(256) # 尝试下载1KB数据 - if chunk: - return True # 成功下载数据 - except requests.RequestException as e: - print(f"请求异常: {e}") - pass #这行删掉则会在下载不到数据流的时候进行连通性测试 - return False # 默认返回False - -print("/" * 80) -# 测试RTSP连接并尝试读取流 -def test_rtsp_connectivity(url, timeout=5): - cap = cv2.VideoCapture(url) +# 函数:获取视频分辨率 +def get_video_resolution(video_path, timeout=0.8): + cap = cv2.VideoCapture(video_path) if not cap.isOpened(): - return False - start_time = time.time() - while time.time() - start_time < timeout: - ret, _ = cap.read() - if ret: - return True # 成功读取帧 + return None + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() - return False - + return (width, height) +# 函数:处理每一行 +def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines): + parts = line.strip().split(',') + if '#genre#' in line: + # 如果行包含 '#genre#',直接写入新文件 + with threading.Lock(): + output_file.write(line) + print(f"已写入genre行:{line.strip()}") + elif len(parts) == 2: + channel_name, channel_url = parts + resolution = get_video_resolution(channel_url, timeout=8) + if resolution and resolution[1] >= 720: # 检查分辨率是否大于等于720p + with threading.Lock(): + output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n") + order_list.append((channel_name, resolution[1], channel_url)) + valid_count[0] += 1 + print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.") + else: + invalid_count[0] += 1 + with threading.Lock(): + print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%") +# 函数:多线程工作 +def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines): + while True: + try: + line = task_queue.get(timeout=1) + process_line(line, output_file, order_list, valid_count, invalid_count, total_lines) + except Queue.Empty: + break + finally: + task_queue.task_done() # 主函数 -def main(输入, 输出): - with open(输入, "r", encoding="utf-8") as source_file: +def main(source_file_path, output_file_path): + order_list = [] + valid_count = [0] + invalid_count = [0] + task_queue = Queue() + # 读取源文件 + with open(source_file_path, 'r', encoding='utf-8') as source_file: lines = source_file.readlines() - results = [] - for line_number, line in enumerate(tqdm(lines, desc="检测中")): - parts = line.strip().split(",") - if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空 - channel_name, channel_url = parts - try: - is_valid = test_connectivity_and_download(channel_url) - except Exception as e: - print(f"检测URL {channel_url} 时发生错误: {e}") - is_valid = False # 将异常的URL视为无效 - status = "有效" if is_valid else "无效" - if "genre" in line.lower() or status == "有效": - results.append((channel_name.strip(), channel_url.strip(), status)) - # 写入文件 - with open(输出, "w", encoding="utf-8") as output_file: - for channel_name, channel_url, status in results: - output_file.write(f"{channel_name},{channel_url}\n") - - print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}") + with open(output_file_path + '.txt', 'w', encoding='utf-8') as output_file: + # 创建线程池 + with ThreadPoolExecutor(max_workers=64) as executor: + # 创建并启动工作线程 + for _ in range(64): + executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines)) + # 将所有行放入队列 + for line in lines: + task_queue.put(line) + # 等待队列中的所有任务完成 + task_queue.join() + print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}") if __name__ == "__main__": - 输入 = "网络收集.txt" #input('请输入utf-8编码的直播源文件路径:') - 输出 = "网络收集.txt" - main(输入, 输出) - + source_file_path = '网络收集.txt' # 替换为你的源文件路径 + output_file_path = '网络收集' # 替换为你的输出文件路径,不要后缀名 + main(source_file_path, output_file_path)