diff --git a/py/iptv流畅度检测.py b/py/iptv流畅度检测.py index 275a73c..1a28d77 100644 --- a/py/iptv流畅度检测.py +++ b/py/iptv流畅度检测.py @@ -158,7 +158,84 @@ remove_duplicates_keep_order(folder_path) print('文件去重完成!移除存储的旧文件!') +import os +import cv2 +import time +from tqdm import tqdm +import sys +# 初始化字典以存储IP检测结果 +detected_ips = {} + +def get_ip_key(url): + """从URL中提取IP地址,并构造一个唯一的键""" + start = url.find('://') + 3 + end = url.find('/', start) + if end == -1: + end = len(url) + return url[start:end].strip() + +# 设置固定的文件夹路径 +folder_path = 'playlist' + +# 确保文件夹路径存在 +if not os.path.isdir(folder_path): + print("指定的文件夹不存在。") + sys.exit() + +# 遍历文件夹中的所有.txt文件 +for filename in os.listdir(folder_path): + if filename.endswith('.txt'): + file_path = os.path.join(folder_path, filename) + # 读取文件内容 + with open(file_path, 'r', encoding='utf-8') as file: + lines = file.readlines() + + # 准备写回文件 + with open(file_path, 'w', encoding='utf-8') as output_file: + # 使用 tqdm 显示进度条 + for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"): + parts = line.split(',', 1) + if len(parts) >= 2: + channel_name, url = parts + channel_name = channel_name.strip() + url = url.strip() + ip_key = get_ip_key(url) + + # 检查IP是否已经被检测过 + if ip_key in detected_ips: + # 如果之前检测成功,则写入该行 + if detected_ips[ip_key]['status'] == 'ok': + output_file.write(line) + continue # 无论之前检测结果如何,都不重新检测 + + # 初始化帧计数器和成功标志 + frame_count = 0 + success = False + # 尝试打开视频流 + cap = cv2.VideoCapture(url) + start_time = time.time() + while (time.time() - start_time) < 3: + ret, frame = cap.read() + if not ret: + break + frame_count += 1 + # 如果在3秒内读取到60帧以上,设置成功标志 + if frame_count >= 60: + success = True + break + cap.release() + + # 根据检测结果更新字典 + if success: + detected_ips[ip_key] = {'status': 'ok'} + output_file.write(line) + else: + detected_ips[ip_key] = {'status': 'fail'} + +# 打印检测结果 +for ip_key, result in detected_ips.items(): + print(f"IP Key: {ip_key}, Status: {result['status']}") @@ -413,77 +490,11 @@ sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_numbe # 将排序后的行写入新的utf-8编码的文本文件,文件名基于原文件名 output_file_path = "sorted_" + os.path.basename(file_path) # 写入新文件 -with open('5.txt', "w", encoding="utf-8") as file: +with open('2.txt', "w", encoding="utf-8") as file: for line in sorted_lines: file.write(line) print(f"文件已排序并保存为: {output_file_path}") -import cv2 -import time -from tqdm import tqdm -# 初始化酒店源字典 -detected_ips = {} -# 存储文件路径 -file_path = "5.txt" -output_file_path = "2.txt" -def get_ip_key(url): - """从URL中提取IP地址,并构造一个唯一的键""" - # 找到'//'到第三个'.'之间的字符串 - start = url.find('://') + 3 # '://'.length 是 3 - end = start - dot_count = 0 - while dot_count < 3: - end = url.find('.', end) - if end == -1: # 如果没有找到第三个'.',就结束 - break - dot_count += 1 - return url[start:end] if dot_count == 3 else None -# 打开输入文件和输出文件 -with open(file_path, 'r', encoding='utf-8') as file: - lines = file.readlines() -# 获取总行数用于进度条 -total_lines = len(lines) -# 写入通过检测的行到新文件 -with open(output_file_path, 'w', encoding='utf-8') as output_file: - # 使用tqdm显示进度条 - for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'): - # 检查是否包含 'genre' - if 'genre' in line: - output_file.write(line) - continue - # 分割频道名称和URL,并去除空白字符 - parts = line.split(',', 1) - if len(parts) == 2: - channel_name, url = parts - channel_name = channel_name.strip() - url = url.strip() - # 构造IP键 - ip_key = get_ip_key(url) - if ip_key and ip_key in detected_ips: - # 如果IP键已存在,根据之前的结果决定是否写入新文件 - if detected_ips[ip_key]['status'] == 'ok': - output_file.write(line) - elif ip_key: # 新IP键,进行检测 - # 进行检测 - cap = cv2.VideoCapture(url) - start_time = time.time() - frame_count = 0 - # 尝试捕获10秒内的帧 - while frame_count < 235 and (time.time() - start_time) < 10: - ret, frame = cap.read() - if not ret: - break - frame_count += 1 - # 释放资源 - cap.release() - # 根据捕获的帧数判断状态并记录结果 - if frame_count >= 235: #10秒内超过230帧则写入 - detected_ips[ip_key] = {'status': 'ok'} - output_file.write(line) # 写入检测通过的行 - else: - detected_ips[ip_key] = {'status': 'fail'} -# 打印酒店源 -for ip_key, result in detected_ips.items(): - print(f"IP Key: {ip_key}, Status: {result['status']}") + ######################################################################################################################################################################################## ################################################################定义关键词分割规则 def check_and_write_file(input_file, output_file, keywords):