diff --git a/py/p2p.py b/py/p2p.py index 81adf7d..678e2b5 100644 --- a/py/p2p.py +++ b/py/p2p.py @@ -8,10 +8,10 @@ import socket #check p3p源 rtp源 import subprocess #check rtmp源 timestart = datetime.now() -#需屏蔽IP域名 +#定义需屏蔽IP域名 BlackHost=["127.0.0.1:8080","live3.lalifeier.eu.org","newcntv.qcloudcdn.com"] -# 读取文件内容 +################################################ 读取文件内容 def read_txt_file(file_path): # 定义需要跳过的字符串数组 skip_strings = ['#genre#'] @@ -25,7 +25,8 @@ def read_txt_file(file_path): ] return lines -# 检测 URL 是否可访问并记录响应时间 + +################################################# 检测 URL 是否可访问并记录响应时间 def check_url(url, timeout=6): start_time = time.time() elapsed_time = None @@ -57,7 +58,6 @@ def check_url(url, timeout=6): success = check_rtp_url(url, timeout) # 如果执行到这一步,没有异常,计算时间 elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 - except Exception as e: print(f"Error checking {url}: {e}") record_host(get_host_from_url(url)) @@ -66,6 +66,7 @@ def check_url(url, timeout=6): return elapsed_time, success +################################################ def check_rtmp_url(url, timeout): try: # 使用 subprocess 模块运行 ffprobe 命令来检查 rtmp 地址 @@ -81,15 +82,14 @@ def check_rtmp_url(url, timeout): print(f"Error checking {url}: {e}") return False +################################################ def check_rtp_url(url, timeout): try: # 解析 URL parsed_url = urlparse(url) - # 提取主机名(IP 地址)和端口号 host = parsed_url.hostname port = parsed_url.port - # 创建一个 UDP 套接字连接 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.settimeout(timeout) # 设置超时时间 @@ -101,6 +101,7 @@ def check_rtp_url(url, timeout): except (socket.timeout, socket.error): return False +################################################ def check_p3p_url(url, timeout): try: # 解析 URL @@ -108,20 +109,16 @@ def check_p3p_url(url, timeout): host = parsed_url.hostname port = parsed_url.port path = parsed_url.path - # 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常 if not host or not port or not path: raise ValueError("Invalid p3p URL") - # 创建一个 TCP 连接 with socket.create_connection((host, port), timeout=timeout) as s: # 构造请求,根据协议定义可能需要调整 request = f"GET {path} P3P/1.0\r\nHost: {host}\r\n\r\n" s.sendall(request.encode()) # 发送请求 - # 读取响应 response = s.recv(1024) - # 简单判断是否收到有效响应,如果响应中包含 "P3P",则认为地址有效 if b"P3P" in response: return True @@ -130,6 +127,7 @@ def check_p3p_url(url, timeout): print(f"Error checking {url}: {e}") return False +################################################ def check_p2p_url(url, timeout): try: # 解析 URL @@ -137,20 +135,16 @@ def check_p2p_url(url, timeout): host = parsed_url.hostname port = parsed_url.port path = parsed_url.path - # 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常 if not host or not port or not path: raise ValueError("Invalid P2P URL") - # 创建一个 TCP 连接 with socket.create_connection((host, port), timeout=timeout) as s: # 自定义请求,这里只是一个占位符,需根据具体协议定义 request = f"YOUR_CUSTOM_REQUEST {path}\r\nHost: {host}\r\n\r\n" s.sendall(request.encode()) # 发送请求 - # 读取响应 response = s.recv(1024) - # 自定义响应解析,这里简单示例,如果响应中包含特定内容,则认为地址有效 if b"SOME_EXPECTED_RESPONSE" in response: return True @@ -160,7 +154,8 @@ def check_p2p_url(url, timeout): return False -# 处理单行文本并检测 URL + +################################################# 处理单行文本并检测 URL def process_line(line): # 如果行中包含“#genre#”或者不包含“://”,则跳过该行 if "#genre#" in line or "://" not in line : @@ -179,7 +174,8 @@ def process_line(line): return None, line.strip() return None, None -# 多线程处理文本并检测 URL + +################################################# 多线程处理文本并检测 URL def process_urls_multithreaded(lines, max_workers=30): blacklist = [] successlist = [] @@ -200,7 +196,8 @@ def process_urls_multithreaded(lines, max_workers=30): blacklist.append(result) return successlist, blacklist -# 写入文件 + +################################################# 写入文件 def write_list(file_path, data_list): with open(file_path, 'w', encoding='utf-8') as file: # 遍历列表中的每个元素并写入文件 @@ -211,6 +208,7 @@ def write_list(file_path, data_list): # urls 里所有的源都读到这里。 urls_all_lines = [] +################################################ def get_url_file_extension(url): # 解析 URL parsed_url = urlparse(url) @@ -220,16 +218,14 @@ def get_url_file_extension(url): extension = os.path.splitext(path)[1] return extension +################################################ def convert_m3u_to_txt(m3u_content): # 分行处理 lines = m3u_content.split('\n') - # 用于存储结果的列表 txt_lines = [] - # 临时变量用于存储频道名称 channel_name = "" - for line in lines: # 过滤掉 #EXTM3U 开头的行 if line.startswith("#EXTM3U"): @@ -241,7 +237,6 @@ def convert_m3u_to_txt(m3u_content): # 处理 URL 行 elif line.startswith("http"): txt_lines.append(f"{channel_name},{line.strip()}") - # 将结果合并成一个字符串,以换行符分隔 # return '\n'.join(txt_lines) return txt_lines @@ -272,7 +267,6 @@ def process_url(url): #channel_name=line.split(',')[0].strip() #channel_address=line.split(',')[1].strip() urls_all_lines.append(line.strip()) - except Exception as e: # 如果处理 URL 时发生错误,打印错误信息 print(f"处理URL时发生错误:{e}")