From e2d58503344d71230ca2e90e49fa885e1762dd08 Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Wed, 21 Aug 2024 10:07:14 +0800 Subject: [PATCH] =?UTF-8?q?Update=20=E7=BB=BC=E5=90=88=E6=9B=B4=E6=96=B0.p?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/综合更新.py | 147 +++++++++++++++++++++++-------------------------- 1 file changed, 69 insertions(+), 78 deletions(-) diff --git a/py/综合更新.py b/py/综合更新.py index 4c2ab7e..290e83b 100644 --- a/py/综合更新.py +++ b/py/综合更新.py @@ -158,84 +158,6 @@ remove_duplicates_keep_order(folder_path) print('文件去重完成!移除存储的旧文件!') -import os -import cv2 -import time -from tqdm import tqdm -import sys - -# 初始化字典以存储IP检测结果 -detected_ips = {} - -def get_ip_key(url): - """从URL中提取IP地址,并构造一个唯一的键""" - start = url.find('://') + 3 - end = url.find('/', start) - if end == -1: - end = len(url) - return url[start:end].strip() - -# 设置固定的文件夹路径 -folder_path = 'playlist' - -# 确保文件夹路径存在 -if not os.path.isdir(folder_path): - print("指定的文件夹不存在。") - sys.exit() - -# 遍历文件夹中的所有.txt文件 -for filename in os.listdir(folder_path): - if filename.endswith('.txt'): - file_path = os.path.join(folder_path, filename) - # 读取文件内容 - with open(file_path, 'r', encoding='utf-8') as file: - lines = file.readlines() - - # 准备写回文件 - with open(file_path, 'w', encoding='utf-8') as output_file: - # 使用 tqdm 显示进度条 - for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"): - parts = line.split(',', 1) - if len(parts) >= 2: - channel_name, url = parts - channel_name = channel_name.strip() - url = url.strip() - ip_key = get_ip_key(url) - - # 检查IP是否已经被检测过 - if ip_key in detected_ips: - # 如果之前检测成功,则写入该行 - if detected_ips[ip_key]['status'] == 'ok': - output_file.write(line) - continue # 无论之前检测结果如何,都不重新检测 - - # 初始化帧计数器和成功标志 - frame_count = 0 - success = False - # 尝试打开视频流 - cap = cv2.VideoCapture(url) - start_time = time.time() - while (time.time() - start_time) < 3: - ret, frame = cap.read() - if not ret: - break - frame_count += 1 - # 如果在3秒内读取到60帧以上,设置成功标志 - if frame_count >= 60: - success = True - break - cap.release() - - # 根据检测结果更新字典 - if success: - detected_ips[ip_key] = {'status': 'ok'} - output_file.write(line) - else: - detected_ips[ip_key] = {'status': 'fail'} - -# 打印检测结果 -for ip_key, result in detected_ips.items(): - print(f"IP Key: {ip_key}, Status: {result['status']}") @@ -495,6 +417,75 @@ with open('2.txt', "w", encoding="utf-8") as file: file.write(line) print(f"文件已排序并保存为: {output_file_path}") +以下为组播IP流畅性检测 +import cv2 +import time +from tqdm import tqdm +# 初始化酒店源字典 +detected_ips = {} +# 存储文件路径 +file_path = "2.txt" +output_file_path = "2.txt" +def get_ip_key(url): + """从URL中提取IP地址,并构造一个唯一的键""" + # 找到'//'到第三个'.'之间的字符串 + start = url.find('://') + 3 # '://'.length 是 3 + end = start + dot_count = 0 + while dot_count < 3: + end = url.find('.', end) + if end == -1: # 如果没有找到第三个'.',就结束 + break + dot_count += 1 + return url[start:end] if dot_count == 3 else None +# 打开输入文件和输出文件 +with open(file_path, 'r', encoding='utf-8') as file: + lines = file.readlines() +# 获取总行数用于进度条 +total_lines = len(lines) +# 写入通过检测的行到新文件 +with open(output_file_path, 'w', encoding='utf-8') as output_file: + # 使用tqdm显示进度条 + for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'): + # 检查是否包含 'genre' + if 'genre' in line: + output_file.write(line) + continue + # 分割频道名称和URL,并去除空白字符 + parts = line.split(',', 1) + if len(parts) == 2: + channel_name, url = parts + channel_name = channel_name.strip() + url = url.strip() + # 构造IP键 + ip_key = get_ip_key(url) + if ip_key and ip_key in detected_ips: + # 如果IP键已存在,根据之前的结果决定是否写入新文件 + if detected_ips[ip_key]['status'] == 'ok': + output_file.write(line) + elif ip_key: # 新IP键,进行检测 + # 进行检测 + cap = cv2.VideoCapture(url) + start_time = time.time() + frame_count = 0 + # 尝试捕获10秒内的帧 + while frame_count < 60 and (time.time() - start_time) < 3: + ret, frame = cap.read() + if not ret: + break + frame_count += 1 + # 释放资源 + cap.release() + # 根据捕获的帧数判断状态并记录结果 + if frame_count >= 65: #10秒内超过230帧则写入 + detected_ips[ip_key] = {'status': 'ok'} + output_file.write(line) # 写入检测通过的行 + else: + detected_ips[ip_key] = {'status': 'fail'} +# 打印酒店源 +for ip_key, result in detected_ips.items(): + print(f"IP Key: {ip_key}, Status: {result['status']}") + ######################################################################################################################################################################################## ################################################################定义关键词分割规则 def check_and_write_file(input_file, output_file, keywords):