diff --git a/py/IP更新.py b/py/IP更新检测.py similarity index 83% rename from py/IP更新.py rename to py/IP更新检测.py index 0b2d9d7..aac54d1 100644 --- a/py/IP更新.py +++ b/py/IP更新检测.py @@ -6,11 +6,6 @@ import cv2 # 导入OpenCV库 #本段代码更新文件方式-----追加写入 #同省份多城市查询IP - - - - - # 更新四川电信组播定义fofa链接列表 #/////////////////////////////////////// fofa_urls = [ 'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJjaGVuZ2R1Ig%3D%3D', # 成都 #/////////////////////////////////////// @@ -333,3 +328,87 @@ if valid_ips_ports: update_and_write_ips(local_file_path, valid_ips_ports) else: print("没有找到可访问的视频流服务或者没有提取到IP地址和端口号。") + + +###############检测playlist文件夹内所有txt文件内的组播 +###############检测playlist文件夹内所有txt文件内的组播 +###############检测playlist文件夹内所有txt文件内的组播 + +import os +import cv2 +import time +from tqdm import tqdm +import sys + +# 初始化字典以存储IP检测结果 +detected_ips = {} + +def get_ip_key(url): + """从URL中提取IP地址,并构造一个唯一的键""" + start = url.find('://') + 3 + end = url.find('/', start) + if end == -1: + end = len(url) + return url[start:end].strip() + +# 设置固定的文件夹路径 +folder_path = 'playlist' + +# 确保文件夹路径存在 +if not os.path.isdir(folder_path): + print("指定的文件夹不存在。") + sys.exit() + +# 遍历文件夹中的所有.txt文件 +for filename in os.listdir(folder_path): + if filename.endswith('.txt'): + file_path = os.path.join(folder_path, filename) + # 读取文件内容 + with open(file_path, 'r', encoding='utf-8') as file: + lines = file.readlines() + + # 准备写回文件 + with open(file_path, 'w', encoding='utf-8') as output_file: + # 使用 tqdm 显示进度条 + for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"): + parts = line.split(',', 1) + if len(parts) >= 2: + channel_name, url = parts + channel_name = channel_name.strip() + url = url.strip() + ip_key = get_ip_key(url) + + # 检查IP是否已经被检测过 + if ip_key in detected_ips: + # 如果之前检测成功,则写入该行 + if detected_ips[ip_key]['status'] == 'ok': + output_file.write(line) + continue # 无论之前检测结果如何,都不重新检测 + + # 初始化帧计数器和成功标志 + frame_count = 0 + success = False + # 尝试打开视频流 + cap = cv2.VideoCapture(url) + start_time = time.time() + while (time.time() - start_time) < 3: + ret, frame = cap.read() + if not ret: + break + frame_count += 1 + # 如果在3秒内读取到60帧以上,设置成功标志 + if frame_count >= 60: + success = True + break + cap.release() + + # 根据检测结果更新字典 + if success: + detected_ips[ip_key] = {'status': 'ok'} + output_file.write(line) + else: + detected_ips[ip_key] = {'status': 'fail'} + +# 打印检测结果 +for ip_key, result in detected_ips.items(): + print(f"IP Key: {ip_key}, Status: {result['status']}")