diff --git a/py/iptv-cv2-check.py b/py/iptv-cv2-check.py index c9056b3..18847b5 100644 --- a/py/iptv-cv2-check.py +++ b/py/iptv-cv2-check.py @@ -527,7 +527,7 @@ def filter_lines(input_file, output_file): filtered_lines = [] for line in lines: #if ('hls' in line and 'm3u' in line) or ('tsfile' in line and 'm3u' in line): #行中包含m3u的同时还要包含hls或者tsfile - if '凤凰' in line or '东森' in line or '天映' in line or '酒店' in line or '龙祥' in line or '美亚' in line or '好莱坞' in line: #仅提取港澳频道 + if '凤凰' in line or '东森' in line or '天映' in line or '酒店' in line or '龙祥' in line or '美亚' in line or '好莱坞' in line or '珠江' in line: #仅提取港澳频道 if 'udp' not in line and 'rtp' not in line and 'CCTV' not in line and '卫视' not in line: # 排除组播地址及央卫 filtered_lines.append(line) with open(output_file, 'w', encoding='utf-8') as output_file: @@ -575,73 +575,95 @@ deduplicate_lines(input_file_path, output_file_path) #################################################### 对整理好的频道列表测试HTTP连接 # 函数:获取视频分辨率 def get_video_resolution(video_path, timeout=2): + # 使用OpenCV创建视频捕获对象 cap = cv2.VideoCapture(video_path) + # 检查视频是否成功打开 if not cap.isOpened(): return None + # 获取视频的宽度和高度 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + # 释放视频捕获对象 cap.release() + # 返回视频的分辨率 return (width, height) - # 函数:处理每一行 def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines): + # 去除行尾的空白字符并按逗号分割行 parts = line.strip().split(',') + # 如果行包含特定的标签'#genre#',则直接写入新文件 if '#genre#' in line: - # 如果行包含 '#genre#',直接写入新文件 - with threading.Lock(): + with threading.Lock(): # 使用线程锁保证写入操作的原子性 output_file.write(line) print(f"已写入genre行:{line.strip()}") + # 如果分割后的部分数量为2,则继续处理 elif len(parts) == 2: channel_name, channel_url = parts + # 获取视频的分辨率 resolution = get_video_resolution(channel_url, timeout=2) - if resolution and resolution[1] >= 720: # 检查分辨率是否大于等于720p - with threading.Lock(): + # 如果分辨率有效且高度大于等于720p + if resolution and resolution[1] >= 720: + with threading.Lock(): # 使用线程锁 output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n") + # 将频道名、分辨率和URL添加到列表中 order_list.append((channel_name, resolution[1], channel_url)) + # 有效计数增加 valid_count[0] += 1 print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.") else: + # 如果分辨率不满足条件,无效计数增加 invalid_count[0] += 1 + # 打印当前处理进度 with threading.Lock(): print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%") - # 函数:多线程工作 def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines): + # 循环直到队列为空 while True: try: + # 从队列中获取任务,超时时间为1秒 line = task_queue.get(timeout=1) + # 处理获取的任务 process_line(line, output_file, order_list, valid_count, invalid_count, total_lines) - except Queue.Empty: + except Queue.Empty: # 如果队列为空,捕获异常 break finally: + # 标记任务已完成 task_queue.task_done() - + # 主函数 def main(source_file_path, output_file_path): + # 初始化列表和计数器 order_list = [] valid_count = [0] invalid_count = [0] task_queue = Queue() - # 读取源文件 + # 使用with语句打开源文件并读取所有行 with open(source_file_path, 'r', encoding='utf-8') as source_file: lines = source_file.readlines() + # 使用with语句打开输出文件准备写入 with open(output_file_path, 'w', encoding='utf-8') as output_file: - # 创建线程池 + # 创建线程池,最大工作线程数为64 with ThreadPoolExecutor(max_workers=64) as executor: - # 创建并启动工作线程 + # 为线程池中的每个线程提交worker函数 for _ in range(64): executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines)) - # 将所有行放入队列 + # 将所有行放入任务队列 for line in lines: task_queue.put(line) # 等待队列中的所有任务完成 task_queue.join() + # 打印任务完成的统计信息 print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}") + +# 程序入口点 if __name__ == "__main__": + # 定义源文件和输出文件的路径 source_file_path = 'iptv.txt' # 替换为你的源文件路径 - output_file_path = '检测结果.txt' # 替换为你的输出文件路径,不要后缀名 + output_file_path = '检测结果.txt' # 替换为你的输出文件路径 + # 调用主函数 main(source_file_path, output_file_path)