From 65b6c80b915d9d6a43b5f951cb2801d2bbc58dfb Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Tue, 6 May 2025 08:21:31 +0800 Subject: [PATCH] =?UTF-8?q?Update=20=E6=B5=8B=E7=BB=98=E7=AB=99=E9=87=87?= =?UTF-8?q?=E9=9B=86.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/测绘站采集.py | 55 +++++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/py/测绘站采集.py b/py/测绘站采集.py index 245023d..9577463 100644 --- a/py/测绘站采集.py +++ b/py/测绘站采集.py @@ -40,6 +40,7 @@ DELAY_RANGE = (3, 6) # 随机延迟时间范围(秒) MAX_RETRIES = 3 # 最大重试次数 REQUEST_TIMEOUT = 10 # 请求超时时间(秒) + def get_random_header(): """生成随机请求头""" return { @@ -48,6 +49,7 @@ def get_random_header(): 'Referer': 'https://fofa.info/' } + def safe_request(url): """带重试机制的请求函数""" for attempt in range(MAX_RETRIES): @@ -72,17 +74,18 @@ def safe_request(url): return response.text except Exception as e: - print(f"请求失败(第{attempt+1}次重试): {str(e)}") + print(f"请求失败(第{attempt + 1}次重试): {str(e)}") if attempt == MAX_RETRIES - 1: raise + def validate_video(url, mcast): """验证视频流有效性""" video_url = f"{url}/rtp/{mcast}" print(f"正在验证: {video_url}") try: - # 发送请求,尝试下载 1 千字节的数据 + # 发送请求,尝试下载1千字节的数据 response = requests.get(video_url, headers=get_random_header(), timeout=REQUEST_TIMEOUT, stream=True) response.raise_for_status() @@ -98,6 +101,7 @@ def validate_video(url, mcast): print(f"视频验证异常: {str(e)}") return False + def main(): # 获取需要处理的文件列表 files = [f.split('.')[0] for f in os.listdir('rtp') if f.endswith('.txt')] @@ -151,11 +155,14 @@ def main(): dst.write(modified + '\n') print(f"已生成播放列表: {output_file}") + if __name__ == '__main__': main() print('对playlist文件夹里面的所有txt文件进行去重处理') + + def remove_duplicates_keep_order(folder_path): for filename in os.listdir(folder_path): if filename.endswith('.txt'): @@ -170,11 +177,14 @@ def remove_duplicates_keep_order(folder_path): # 将保持顺序的去重后的内容写回原文件 with open(file_path, 'w', encoding='utf-8') as file: file.writelines(unique_lines) + + # 使用示例 folder_path = 'playlist' # 替换为你的文件夹路径 remove_duplicates_keep_order(folder_path) print('文件去重完成!移除存储的旧文件!') + ###################################################### ##################################################### ###################################################################################################################### @@ -185,14 +195,15 @@ print('文件去重完成!移除存储的旧文件!') ###############检测playlist文件夹内所有txt文件内的组播 import os -import cv2 import time from tqdm import tqdm import sys + # 初始化字典以存储IP检测结果 detected_ips = {} + def get_ip_key(url): """从URL中提取IP地址或域名,并构造一个唯一的键""" start = url.find('://') + 3 @@ -201,6 +212,7 @@ def get_ip_key(url): end = len(url) return url[start:end].strip() + # 设置固定的文件夹路径 folder_path = 'playlist' @@ -209,6 +221,7 @@ if not os.path.isdir(folder_path): print("指定的文件夹不存在。") sys.exit() + # 遍历文件夹中的所有.txt文件 for filename in os.listdir(folder_path): if filename.endswith('.txt'): @@ -219,7 +232,7 @@ for filename in os.listdir(folder_path): # 准备写回文件 with open(file_path, 'w', encoding='utf-8') as output_file: - # 使用 tqdm 显示进度条 + # 使用tqdm显示进度条 for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"): parts = line.split(',', 1) if len(parts) >= 2: @@ -227,33 +240,24 @@ for filename in os.listdir(folder_path): channel_name = channel_name.strip() url = url.strip() ip_key = get_ip_key(url) - + # 检查IP或域名是否已经被检测过 if ip_key in detected_ips: # 如果之前检测成功,则写入该行 if detected_ips[ip_key]['status'] == 'ok': output_file.write(line) continue # 无论之前检测结果如何,都不重新检测 - - # 初始化帧计数器和成功标志 - frame_count = 0 - success = False - # 尝试打开视频流 - cap = cv2.VideoCapture(url) - start_time = time.time() - while (time.time() - start_time) < 8: - ret, frame = cap.read() - if not ret: - break - frame_count += 1 - # 如果在8秒内读取到1帧以上,设置成功标志 - if frame_count >= 1: - success = True - break - cap.release() - - # 根据检测结果更新字典 - if success: + + # 读取组播地址 + try: + with open(f'rtp/{filename.split("_")[0]}_{filename.split("_")[1]}.txt', 'r', encoding='utf-8') as f: + mcast = f.readline().split('rtp://')[1].split()[0].strip() + except Exception as e: + print(f"文件读取失败: {str(e)}") + continue + + # 验证地址有效性 + if validate_video(url, mcast): detected_ips[ip_key] = {'status': 'ok'} output_file.write(line) else: @@ -263,7 +267,6 @@ for filename in os.listdir(folder_path): for ip_key, result in detected_ips.items(): print(f"IP Key: {ip_key}, Status: {result['status']}") - ###################################################################################################################### ######################################################################################################################