From a0060c30f7a6e6f768ed4a9725d72f91678dc62e Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Tue, 1 Oct 2024 00:16:10 +0800 Subject: [PATCH] =?UTF-8?q?Delete=20py/=E4=B8=8D=E4=B9=B1=E5=BA=8F?= =?UTF-8?q?=E6=9C=89=E6=95=88=E6=80=A7=E6=A3=80=E6=B5=8B.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/不乱序有效性检测.py | 137 ----------------------------------------- 1 file changed, 137 deletions(-) delete mode 100644 py/不乱序有效性检测.py diff --git a/py/不乱序有效性检测.py b/py/不乱序有效性检测.py deleted file mode 100644 index 5c15635..0000000 --- a/py/不乱序有效性检测.py +++ /dev/null @@ -1,137 +0,0 @@ -import re -import os -def filter_lines(file_path): - with open('综合源.txt', 'r', encoding='utf-8') as file: - lines = file.readlines() - - filtered_lines = [] - for line in lines: - if ',' in line: - if 'epg' not in line and 'mitv' not in line and 'udp' not in line and 'rtp' not in line and 'tsfile' not in line \ - and 'P2p' not in line and 'p2p' not in line and 'p3p' not in line and 'P2P' not in line and 'P3p' not in line and 'P3P' not in line: - filtered_lines.append(line) - - return filtered_lines - -def write_filtered_lines(output_file_path, filtered_lines): - with open(output_file_path, 'w', encoding='utf-8') as output_file: - output_file.writelines(filtered_lines) - -if __name__ == "__main__": - input_file_path = '综合源.txt' - output_file_path = "有效检测结果.txt" - - filtered_lines = filter_lines(input_file_path) - write_filtered_lines(output_file_path, filtered_lines) - -print("/" * 80) - -# ###########################################定义替换规则的字典,对整行内的内容进行替换 -replacements = { - " ": "", -} - -# 打开原始文件读取内容,并写入新文件 -with open('有效检测结果.txt', 'r', encoding='utf-8') as file: - lines = file.readlines() - -# 创建新文件并写入替换后的内容 -with open('有效检测结果.txt', 'w', encoding='utf-8') as new_file: - for line in lines: - for old, new in replacements.items(): - line = line.replace(old, new) - new_file.write(line) - - -import requests -import time -import cv2 -from urllib.parse import urlparse -from tqdm import tqdm - -# 测试HTTP连接并尝试下载数据 -def test_connectivity_and_download(url, initial_timeout=1, retry_timeout=1): - parsed_url = urlparse(url) - if parsed_url.scheme not in ['http', 'https']: - # 非HTTP(s)协议,尝试RTSP检测 - return test_rtsp_connectivity(url, retry_timeout) - else: - # HTTP(s)协议,使用原始方法 - try: - with requests.get(url, stream=True, timeout=initial_timeout) as response: - if response.status_code == 200: - start_time = time.time() - while time.time() - start_time < initial_timeout: - chunk = response.raw.read(512) # 尝试下载1KB数据 - if chunk: - return True # 成功下载数据 - except requests.RequestException as e: - print(f"请求异常: {e}") - pass #这行删掉则会在下载不到数据流的时候进行连通性测试 - - return False # 默认返回False - -# 测试RTSP连接并尝试读取流 -def test_rtsp_connectivity(url, timeout=3): - cap = cv2.VideoCapture(url) - if not cap.isOpened(): - return False - start_time = time.time() - while time.time() - start_time < timeout: - ret, _ = cap.read() - if ret: - return True # 成功读取帧 - cap.release() - return False - -# 主函数 -def main(输入, 输出): - with open(输入, "r", encoding="utf-8") as source_file: - lines = source_file.readlines() - - results = [] - for line_number, line in enumerate(tqdm(lines, desc="检测中")): - parts = line.strip().split(",") - if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空 - channel_name, channel_url = parts - try: - is_valid = test_connectivity_and_download(channel_url) - except Exception as e: - print(f"检测URL {channel_url} 时发生错误: {e}") - is_valid = False # 将异常的URL视为无效 - - status = "有效" if is_valid else "无效" - - if "genre" in line.lower() or status == "有效": - results.append((channel_name.strip(), channel_url.strip(), status)) - - # 写入文件 - with open(输出, "w", encoding="utf-8") as output_file: - for channel_name, channel_url, status in results: - output_file.write(f"{channel_name},{channel_url}\n") - - print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}") - -if __name__ == "__main__": - 输入 = "有效检测结果.txt" #input('请输入utf-8编码的直播源文件路径:') - 输出 = "有效检测结果.txt" - main(输入, 输出) - -# ###########################################定义替换规则的字典,对整行内的内容进行替换 -replacements = { - ",None": "", - " ": "", - "\n\n": "", -} - -# 打开原始文件读取内容,并写入新文件 -with open('有效检测结果.txt', 'r', encoding='utf-8') as file: - lines = file.readlines() - -# 创建新文件并写入替换后的内容 -with open('有效检测结果.txt', 'w', encoding='utf-8') as new_file: - for line in lines: - for old, new in replacements.items(): - line = line.replace(old, new) - new_file.write(line) -