From 857d96312c3beb47d8c684af223ab0acaf7d22a1 Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Mon, 26 Aug 2024 23:46:16 +0800 Subject: [PATCH] =?UTF-8?q?Update=20=E6=94=B6=E9=9B=86.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/收集.py | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/py/收集.py b/py/收集.py index c06e5f2..aab36cc 100644 --- a/py/收集.py +++ b/py/收集.py @@ -353,6 +353,81 @@ for ip_key, result in detected_ips.items(): print(f"IP Key: {ip_key}, Status: {result['status']}") +import requests +import time +import cv2 +from urllib.parse import urlparse +from tqdm import tqdm + +# 测试HTTP连接并尝试下载数据 +def test_connectivity_and_download(url, initial_timeout=0.3, retry_timeout=0.8): + parsed_url = urlparse(url) + if parsed_url.scheme not in ['http', 'https']: + # 非HTTP(s)协议,尝试RTSP检测 + return test_rtsp_connectivity(url, retry_timeout) + else: + # HTTP(s)协议,使用原始方法 + try: + with requests.get(url, stream=True, timeout=initial_timeout) as response: + if response.status_code == 200: + start_time = time.time() + while time.time() - start_time < initial_timeout: + chunk = response.raw.read(512) # 尝试下载1KB数据 + if chunk: + return True # 成功下载数据 + except requests.RequestException as e: + print(f"请求异常: {e}") + pass #这行删掉则会在下载不到数据流的时候进行连通性测试 + + return False # 默认返回False + +print("/" * 80) + +# 测试RTSP连接并尝试读取流 +def test_rtsp_connectivity(url, timeout=3): + cap = cv2.VideoCapture(url) + if not cap.isOpened(): + return False + start_time = time.time() + while time.time() - start_time < timeout: + ret, _ = cap.read() + if ret: + return True # 成功读取帧 + cap.release() + return False + +# 主函数 +def main(输入, 输出): + with open(输入, "r", encoding="utf-8") as source_file: + lines = source_file.readlines() + + results = [] + for line_number, line in enumerate(tqdm(lines, desc="检测中")): + parts = line.strip().split(",") + if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空 + channel_name, channel_url = parts + try: + is_valid = test_connectivity_and_download(channel_url) + except Exception as e: + print(f"检测URL {channel_url} 时发生错误: {e}") + is_valid = False # 将异常的URL视为无效 + + status = "有效" if is_valid else "无效" + + if "genre" in line.lower() or status == "有效": + results.append((channel_name.strip(), channel_url.strip(), status)) + + # 写入文件 + with open(输出, "w", encoding="utf-8") as output_file: + for channel_name, channel_url, status in results: + output_file.write(f"{channel_name},{channel_url}\n") + + print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}") + +if __name__ == "__main__": + 输入 = "网络收集.txt" #input('请输入utf-8编码的直播源文件路径:') + 输出 = "网络收集.txt" + main(输入, 输出) @@ -409,7 +484,7 @@ def parse_file(input_file_path, output_file_name): ip_or_domain_to_lines[ip_or_domain].append(line) # 过滤掉小于1000字节的IP或域名段 filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items() - if sum(len(line) for line in lines) >= 200} + if sum(len(line) for line in lines) >= 500} # 如果没有满足条件的IP或域名段,则不生成文件 if not filtered_ip_or_domain_to_lines: