diff --git a/py/iptv流畅度检测.py b/py/iptv流畅度检测.py index 71e1e59..d39bf77 100644 --- a/py/iptv流畅度检测.py +++ b/py/iptv流畅度检测.py @@ -715,7 +715,7 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file: start_time = time.time() frame_count = 0 # 尝试捕获5秒内的帧 - while frame_count < 66 and (time.time() - start_time) < 3:#//////////////////////////////////////////////////////////////////////////////////////########### + while frame_count < 72 and (time.time() - start_time) < 3:#//////////////////////////////////////////////////////////////////////////////////////########### ret, frame = cap.read() if not ret: break @@ -723,7 +723,7 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file: # 释放资源 cap.release() # 根据捕获的帧数判断状态并记录结果#////////////////////////////////////////////////////////////////////////////////////////////////////////////////########### - if frame_count >= 66: #5秒内超过100帧则写入#/////////////////////////////////////////////////////////////////////////////////////////////////////########### + if frame_count >= 72: #5秒内超过100帧则写入#/////////////////////////////////////////////////////////////////////////////////////////////////////########### detected_ips[ip_key] = {'status': 'ok'} output_file.write(line) # 写入检测通过的行 else: @@ -777,98 +777,98 @@ from opencc import OpenCC import base64 import cv2 #################################################### 对整理好的频道列表测试HTTP连接 -# 函数:获取视频分辨率 -def get_video_resolution(video_path, timeout=2): - # 使用OpenCV创建视频捕获对象 - cap = cv2.VideoCapture(video_path) - # 检查视频是否成功打开 - if not cap.isOpened(): - return None - # 获取视频的宽度和高度 - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - # 释放视频捕获对象 - cap.release() - # 返回视频的分辨率 - return (width, height) - -# 函数:处理每一行 -def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines): - # 去除行尾的空白字符并按逗号分割行 - parts = line.strip().split(',') - # 如果行包含特定的标签'#genre#',则直接写入新文件 - if '#genre#' in line: - with threading.Lock(): # 使用线程锁保证写入操作的原子性 - output_file.write(line) - print(f"已写入genre行:{line.strip()}") - # 如果分割后的部分数量为2,则继续处理 - elif len(parts) == 2: - channel_name, channel_url = parts - # 获取视频的分辨率 - resolution = get_video_resolution(channel_url, timeout=2) - # 如果分辨率有效且高度大于等于720p - if resolution and resolution[1] >= 720: - with threading.Lock(): # 使用线程锁 - output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n") - # 将频道名、分辨率和URL添加到列表中 - order_list.append((channel_name, resolution[1], channel_url)) - # 有效计数增加 - valid_count[0] += 1 - print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.") - else: - # 如果分辨率不满足条件,无效计数增加 - invalid_count[0] += 1 - # 打印当前处理进度 - with threading.Lock(): - print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%") - -# 函数:多线程工作 -def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines): - # 循环直到队列为空 - while True: - try: - # 从队列中获取任务,超时时间为1秒 - line = task_queue.get(timeout=1) - # 处理获取的任务 - process_line(line, output_file, order_list, valid_count, invalid_count, total_lines) - except Queue.Empty: # 如果队列为空,捕获异常 - break - finally: - # 标记任务已完成 - task_queue.task_done() - -# 主函数 -def main(source_file_path, output_file_path): - # 初始化列表和计数器 - order_list = [] - valid_count = [0] - invalid_count = [0] - task_queue = Queue() - # 使用with语句打开源文件并读取所有行 - with open(source_file_path, 'r', encoding='utf-8') as source_file: - lines = source_file.readlines() - # 使用with语句打开输出文件准备写入 - with open(output_file_path, 'w', encoding='utf-8') as output_file: - # 创建线程池,最大工作线程数为64 - with ThreadPoolExecutor(max_workers=264) as executor: - # 为线程池中的每个线程提交worker函数 - for _ in range(264): - executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines)) - # 将所有行放入任务队列 - for line in lines: - task_queue.put(line) - # 等待队列中的所有任务完成 - task_queue.join() - # 打印任务完成的统计信息 - print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}") - -# 程序入口点 -if __name__ == "__main__": - # 定义源文件和输出文件的路径 - source_file_path = '酒店优选.txt' # 替换为你的源文件路径 - output_file_path = '酒店优选.txt' # 替换为你的输出文件路径 - # 调用主函数 - main(source_file_path, output_file_path) +# 函数:获取视频分辨率\\ +def get_video_resolution(video_path, timeout=2):\\ + # 使用OpenCV创建视频捕获对象\\ + cap = cv2.VideoCapture(video_path)\\ + # 检查视频是否成功打开\\ + if not cap.isOpened():\\ + return None\\ + # 获取视频的宽度和高度\\ + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))\\ + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))\\ + # 释放视频捕获对象\\ + cap.release()\\ + # 返回视频的分辨率\\ + return (width, height)\\ +\\ +# 函数:处理每一行\\ +def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines):\\ + # 去除行尾的空白字符并按逗号分割行\\ + parts = line.strip().split(',')\\ + # 如果行包含特定的标签'#genre#',则直接写入新文件\\ + if '#genre#' in line:\\ + with threading.Lock(): # 使用线程锁保证写入操作的原子性\\ + output_file.write(line)\\ + print(f"已写入genre行:{line.strip()}")\\ + # 如果分割后的部分数量为2,则继续处理\\ + elif len(parts) == 2:\\ + channel_name, channel_url = parts\\ + # 获取视频的分辨率\\ + resolution = get_video_resolution(channel_url, timeout=2)\\ + # 如果分辨率有效且高度大于等于720p\\ + if resolution and resolution[1] >= 720:\\ + with threading.Lock(): # 使用线程锁\\ + output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n")\\ + # 将频道名、分辨率和URL添加到列表中\\ + order_list.append((channel_name, resolution[1], channel_url))\\ + # 有效计数增加\\ + valid_count[0] += 1\\ + print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.")\\ + else:\\ + # 如果分辨率不满足条件,无效计数增加\\ + invalid_count[0] += 1\\ + # 打印当前处理进度\\ + with threading.Lock():\\ + print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%")\\ +\\ +# 函数:多线程工作\\ +def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines):\\ + # 循环直到队列为空\\ + while True:\\ + try:\\ + # 从队列中获取任务,超时时间为1秒\\ + line = task_queue.get(timeout=1)\\ + # 处理获取的任务\\ + process_line(line, output_file, order_list, valid_count, invalid_count, total_lines)\\ + except Queue.Empty: # 如果队列为空,捕获异常\\ + break\\ + finally:\\ + # 标记任务已完成\\ + task_queue.task_done()\\ +\\ +# 主函数\\ +def main(source_file_path, output_file_path):\\ + # 初始化列表和计数器\\ + order_list = []\\ + valid_count = [0]\\ + invalid_count = [0]\\ + task_queue = Queue()\\ + # 使用with语句打开源文件并读取所有行\\ + with open(source_file_path, 'r', encoding='utf-8') as source_file:\\ + lines = source_file.readlines()\\ + # 使用with语句打开输出文件准备写入\\ + with open(output_file_path, 'w', encoding='utf-8') as output_file:\\ + # 创建线程池,最大工作线程数为64\\ + with ThreadPoolExecutor(max_workers=264) as executor:\\ + # 为线程池中的每个线程提交worker函数\\ + for _ in range(264):\\ + executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines))\\ + # 将所有行放入任务队列\\ + for line in lines:\\ + task_queue.put(line)\\ + # 等待队列中的所有任务完成\\ + task_queue.join()\\ + # 打印任务完成的统计信息\\ + print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}")\\ +\\ +# 程序入口点\\ +if __name__ == "__main__":\\ + # 定义源文件和输出文件的路径\\ + source_file_path = '酒店优选.txt' # 替换为你的源文件路径\\ + output_file_path = '酒店优选.txt' # 替换为你的输出文件路径\\ + # 调用主函数\\ + main(source_file_path, output_file_path)\\ ####################### 提示用户输入文件名(拖入文件操作)打开用户指定的文件对不规范频道名再次替换