diff --git a/py/IPTV搜索引擎采集.py b/py/IPTV搜索引擎采集.py index 1dfb1ef..b73c2fc 100644 --- a/py/IPTV搜索引擎采集.py +++ b/py/IPTV搜索引擎采集.py @@ -100,3 +100,106 @@ for filename in os.listdir(rtp_folder): gen_files(valid_ips, province_isp.split('_')[0], province_isp.split('_')[1]) # 生成文本文件 else: print(f"未找到有效IP: {province_isp}") + +print('对playlist文件夹里面的所有txt文件进行去重处理') +def remove_duplicates_keep_order(folder_path): + for filename in os.listdir(folder_path): + if filename.endswith('.txt'): + file_path = os.path.join(folder_path, filename) + lines = set() + unique_lines = [] + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + if line not in lines: + unique_lines.append(line) + lines.add(line) + # 将保持顺序的去重后的内容写回原文件 + with open(file_path, 'w', encoding='utf-8') as file: + file.writelines(unique_lines) +# 使用示例 +folder_path = 'playlist' # 替换为你的文件夹路径 +remove_duplicates_keep_order(folder_path) +print('文件去重完成!移除存储的旧文件!') + +###############检测playlist文件夹内所有txt文件内的组播 +###############检测playlist文件夹内所有txt文件内的组播 +###############检测playlist文件夹内所有txt文件内的组播 + +import os +import cv2 +import time +from tqdm import tqdm +import sys + +# 初始化字典以存储IP检测结果 +detected_ips = {} + +def get_ip_key(url): + """从URL中提取IP地址,并构造一个唯一的键""" + start = url.find('://') + 3 + end = url.find('/', start) + if end == -1: + end = len(url) + return url[start:end].strip() + +# 设置固定的文件夹路径 +folder_path = 'playlist' + +# 确保文件夹路径存在 +if not os.path.isdir(folder_path): + print("指定的文件夹不存在。") + sys.exit() + +# 遍历文件夹中的所有.txt文件 +for filename in os.listdir(folder_path): + if filename.endswith('.txt'): + file_path = os.path.join(folder_path, filename) + # 读取文件内容 + with open(file_path, 'r', encoding='utf-8') as file: + lines = file.readlines() + + # 准备写回文件 + with open(file_path, 'w', encoding='utf-8') as output_file: + # 使用 tqdm 显示进度条 + for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"): + parts = line.split(',', 1) + if len(parts) >= 2: + channel_name, url = parts + channel_name = channel_name.strip() + url = url.strip() + ip_key = get_ip_key(url) + + # 检查IP是否已经被检测过 + if ip_key in detected_ips: + # 如果之前检测成功,则写入该行 + if detected_ips[ip_key]['status'] == 'ok': + output_file.write(line) + continue # 无论之前检测结果如何,都不重新检测 + + # 初始化帧计数器和成功标志 + frame_count = 0 + success = False + # 尝试打开视频流 + cap = cv2.VideoCapture(url) + start_time = time.time() + while (time.time() - start_time) < 5: + ret, frame = cap.read() + if not ret: + break + frame_count += 1 + # 如果在3秒内读取到60帧以上,设置成功标志 + if frame_count >= 5: + success = True + break + cap.release() + + # 根据检测结果更新字典 + if success: + detected_ips[ip_key] = {'status': 'ok'} + output_file.write(line) + else: + detected_ips[ip_key] = {'status': 'fail'} + +# 打印检测结果 +for ip_key, result in detected_ips.items(): + print(f"IP Key: {ip_key}, Status: {result['status']}")