diff --git a/py/p2p.py b/py/p2p.py index c888c53..81adf7d 100644 --- a/py/p2p.py +++ b/py/p2p.py @@ -8,45 +8,53 @@ import socket #check p3p源 rtp源 import subprocess #check rtmp源 timestart = datetime.now() - -BlackHost=["127.0.0.1:8080","live3.lalifeier.eu.org","newcntv.qcloudcdn.com"] +#需屏蔽IP域名 +BlackHost=["127.0.0.1:8080","live3.lalifeier.eu.org","newcntv.qcloudcdn.com"] # 读取文件内容 def read_txt_file(file_path): - skip_strings = ['#genre#'] # 定义需要跳过的字符串数组['#', '@', '#genre#'] - required_strings = ['://'] # 定义需要包含的字符串数组['必需字符1', '必需字符2'] - + # 定义需要跳过的字符串数组 + skip_strings = ['#genre#'] + # 定义需要包含的字符串数组 + required_strings = ['://'] with open(file_path, 'r', encoding='utf-8') as file: + # 列表推导式,筛选出既不包含跳过字符串且包含所需字符串的行 lines = [ line for line in file if not any(skip_str in line for skip_str in skip_strings) and all(req_str in line for req_str in required_strings) ] return lines -# 检测URL是否可访问并记录响应时间 +# 检测 URL 是否可访问并记录响应时间 def check_url(url, timeout=6): start_time = time.time() elapsed_time = None success = False - try: + # 如果 URL 以 http 开头 if url.startswith("http"): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', } + # 创建请求对象 req = urllib.request.Request(url, headers=headers) + # 发送请求并获取响应 with urllib.request.urlopen(req, timeout=timeout) as response: + # 如果响应状态码为 200,表示成功 if response.status == 200: success = True + # 如果 URL 以 p3p 开头 elif url.startswith("p3p"): success = check_p3p_url(url, timeout) + # 如果 URL 以 p2p 开头 elif url.startswith("p2p"): success = check_p2p_url(url, timeout) + # 如果 URL 以 rtmp 或 rtsp 开头 elif url.startswith("rtmp") or url.startswith("rtsp") : success = check_rtmp_url(url, timeout) + # 如果 URL 以 rtp 开头 elif url.startswith("rtp"): success = check_rtp_url(url, timeout) - # 如果执行到这一步,没有异常,计算时间 elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 @@ -55,34 +63,39 @@ def check_url(url, timeout=6): record_host(get_host_from_url(url)) # 在发生异常的情况下,将 elapsed_time 设置为 None elapsed_time = None - return elapsed_time, success + def check_rtmp_url(url, timeout): try: + # 使用 subprocess 模块运行 ffprobe 命令来检查 rtmp 地址 result = subprocess.run(['ffprobe', url], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) + # 如果返回码为 0,表示命令执行成功,即 rtmp 地址有效 if result.returncode == 0: return True except subprocess.TimeoutExpired: + # 如果超时,打印超时信息 print(f"Timeout checking {url}") except Exception as e: + # 如果发生其他异常,打印错误信息 print(f"Error checking {url}: {e}") return False def check_rtp_url(url, timeout): try: - # 解析URL + # 解析 URL parsed_url = urlparse(url) - # 提取主机名(IP地址)和端口号 + # 提取主机名(IP 地址)和端口号 host = parsed_url.hostname port = parsed_url.port - # 创建一个 socket 连接 + # 创建一个 UDP 套接字连接 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.settimeout(timeout) # 设置超时时间 + # 连接到指定的主机和端口 s.connect((host, port)) - s.sendto(b'', (host, port)) # 发送空的UDP数据包 + s.sendto(b'', (host, port)) # 发送空的 UDP 数据包 s.recv(1) # 尝试接收数据 return True except (socket.timeout, socket.error): @@ -90,41 +103,42 @@ def check_rtp_url(url, timeout): def check_p3p_url(url, timeout): try: - # 解析URL + # 解析 URL parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port path = parsed_url.path - # 检查解析是否成功 + # 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常 if not host or not port or not path: raise ValueError("Invalid p3p URL") # 创建一个 TCP 连接 with socket.create_connection((host, port), timeout=timeout) as s: - # 发送一个简单的请求(根据协议定义可能需要调整) + # 构造请求,根据协议定义可能需要调整 request = f"GET {path} P3P/1.0\r\nHost: {host}\r\n\r\n" - s.sendall(request.encode()) - + s.sendall(request.encode()) # 发送请求 + # 读取响应 response = s.recv(1024) - # 简单判断是否收到有效响应 + # 简单判断是否收到有效响应,如果响应中包含 "P3P",则认为地址有效 if b"P3P" in response: return True except Exception as e: + # 如果发生异常,打印错误信息 print(f"Error checking {url}: {e}") return False def check_p2p_url(url, timeout): try: - # 解析URL + # 解析 URL parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port path = parsed_url.path - # 检查解析是否成功 + # 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常 if not host or not port or not path: raise ValueError("Invalid P2P URL") @@ -132,60 +146,73 @@ def check_p2p_url(url, timeout): with socket.create_connection((host, port), timeout=timeout) as s: # 自定义请求,这里只是一个占位符,需根据具体协议定义 request = f"YOUR_CUSTOM_REQUEST {path}\r\nHost: {host}\r\n\r\n" - s.sendall(request.encode()) + s.sendall(request.encode()) # 发送请求 # 读取响应 response = s.recv(1024) - # 自定义响应解析,这里简单示例 + # 自定义响应解析,这里简单示例,如果响应中包含特定内容,则认为地址有效 if b"SOME_EXPECTED_RESPONSE" in response: return True except Exception as e: + # 如果发生异常,打印错误信息 print(f"Error checking {url}: {e}") return False -# 处理单行文本并检测URL + +# 处理单行文本并检测 URL def process_line(line): + # 如果行中包含“#genre#”或者不包含“://”,则跳过该行 if "#genre#" in line or "://" not in line : - return None, None # 跳过包含“#genre#”的行 + return None, None parts = line.split(',') + # 如果该行按逗号分割后长度为 2,表示有名称和 URL if len(parts) == 2: name, url = parts + # 检测 URL 的有效性并获取响应时间 elapsed_time, is_valid = check_url(url.strip()) + # 如果 URL 有效,返回响应时间和该行文本 if is_valid: return elapsed_time, line.strip() else: + # 如果 URL 无效,返回 None 和该行文本 return None, line.strip() return None, None -# 多线程处理文本并检测URL +# 多线程处理文本并检测 URL def process_urls_multithreaded(lines, max_workers=30): blacklist = [] successlist = [] - + # 创建线程池执行器 with ThreadPoolExecutor(max_workers=max_workers) as executor: + # 提交每个行的处理任务,并将任务和对应的行存储在字典中 futures = {executor.submit(process_line, line): line for line in lines} + # 遍历已完成的任务 for future in as_completed(futures): elapsed_time, result = future.result() + # 如果有结果 if result: + # 如果响应时间不为 None,将其添加到成功列表中 if elapsed_time is not None: successlist.append(f"{elapsed_time:.2f}ms,{result}") else: + # 如果响应时间为 None,将其添加到黑名单中 blacklist.append(result) return successlist, blacklist # 写入文件 def write_list(file_path, data_list): with open(file_path, 'w', encoding='utf-8') as file: + # 遍历列表中的每个元素并写入文件 for item in data_list: file.write(item + '\n') -# 增加外部url到检测清单,同时支持检测m3u格式url -# urls里所有的源都读到这里。 +# 增加外部 url 到检测清单,同时支持检测 m3u 格式 url +# urls 里所有的源都读到这里。 urls_all_lines = [] def get_url_file_extension(url): - # 解析URL + # 解析 URL parsed_url = urlparse(url) # 获取路径部分 path = parsed_url.path @@ -223,18 +250,22 @@ url_statistics=[] def process_url(url): try: - # 打开URL并读取内容 + # 打开 URL 并读取内容 with urllib.request.urlopen(url) as response: # 以二进制方式读取数据 data = response.read() # 将二进制数据解码为字符串 text = data.decode('utf-8') + # 如果 URL 的文件扩展名是.m3u 或.m3u8 if get_url_file_extension(url)==".m3u" or get_url_file_extension(url)==".m3u8": m3u_lines=convert_m3u_to_txt(text) + # 记录 m3u 文件中的行数和 URL url_statistics.append(f"{len(m3u_lines)},{url.strip()}") urls_all_lines.extend(m3u_lines) # 注意:extend + # 如果 URL 的文件扩展名是.txt elif get_url_file_extension(url)==".txt": lines = text.split('\n') + # 记录 txt 文件中的行数和 URL url_statistics.append(f"{len(lines)},{url.strip()}") for line in lines: if "#genre#" not in line and "," in line and "://" in line: @@ -243,10 +274,11 @@ def process_url(url): urls_all_lines.append(line.strip()) except Exception as e: + # 如果处理 URL 时发生错误,打印错误信息 print(f"处理URL时发生错误:{e}") -# 去重复源 2024-08-06 (检测前剔除重复url,提高检测效率) +# 去重复源 2024-08-06 (检测前剔除重复 url,提高检测效率) def remove_duplicates_url(lines): urls =[] newlines=[] @@ -254,35 +286,34 @@ def remove_duplicates_url(lines): if "," in line and "://" in line: # channel_name=line.split(',')[0].strip() channel_url=line.split(',')[1].strip() - if channel_url not in urls: # 如果发现当前url不在清单中,则假如newlines + # 如果当前 URL 不在列表中,则添加到新列表和 URL 列表中 + if channel_url not in urls: urls.append(channel_url) newlines.append(line) return newlines -# 处理带$的URL,把$之后的内容都去掉(包括$也去掉) 【2024-08-08 22:29:11】 -#def clean_url(url): -# last_dollar_index = url.rfind('$') # 安全起见找最后一个$处理 -# if last_dollar_index != -1: -# return url[:last_dollar_index] -# return url + +python + 复制 +# 处理带$的 URL,把$之后的内容都去掉(包括$也去掉) 【2024-08-08 22:29:11】 def clean_url(lines): urls =[] newlines=[] for line in lines: if "," in line and "://" in line: last_dollar_index = line.rfind('$') - if last_dollar_index != -1: + if last_dollar_index!= -1: line=line[:last_dollar_index] newlines.append(line) return newlines -# 处理带#的URL 【2024-08-09 23:53:26】 +# 处理带#的 URL 【2024-08-09 23:53:26】 def split_url(lines): newlines=[] for line in lines: - # 拆分成频道名和URL部分 + # 拆分成频道名和 URL 部分 channel_name, channel_address = line.split(',', 1) - #需要加处理带#号源=予加速源 + # 需要加处理带#号源=予加速源 if "#" not in channel_address: newlines.append(line) elif "#" in channel_address and "://" in channel_address: @@ -294,15 +325,18 @@ def split_url(lines): newlines.append(line) return newlines -# 取得host +# 取得 host def get_host_from_url(url: str) -> str: try: + # 解析 URL parsed_url = urlparse(url) + # 返回 URL 的 netloc(网络位置,通常是主机名和端口号) return parsed_url.netloc except Exception as e: + # 如果发生错误,返回错误信息 return f"Error: {str(e)}" -# 使用字典来统计blackhost的记录次数 +# 使用字典来统计 blackhost 的记录次数 blacklist_dict = {} def record_host(host): # 如果 host 已经在字典中,计数加 1 @@ -312,14 +346,15 @@ def record_host(host): else: blacklist_dict[host] = 1 # 将结果保存为 txt 文件 -def save_blackhost_to_txt(filename=f"{datetime.now().strftime("%Y%m%d_%H_%M_%S")}_blackhost_count.txt"): +def save_blackhost_to_txt(filename=f"{datetime.now().strftime('%Y%m%d_%H_%M_%S')}_blackhost_count.txt"): with open(filename, "w") as file: + # 遍历字典,将 host 和对应的计数写入文件 for host, count in blacklist_dict.items(): file.write(f"{host}: {count}\n") print(f"结果已保存到 {filename}") if __name__ == "__main__": - # 定义要访问的多个URL + # 定义要访问的多个 URL urls = [ #'https://raw.githubusercontent.com/YanG-1989/m3u/main/Gather.m3u', #'https://raw.githubusercontent.com/iptv-org/iptv/master/streams/cn.m3u', @@ -329,7 +364,8 @@ if __name__ == "__main__": ] for url in urls: print(f"处理URL: {url}") - process_url(url) #读取上面url清单中直播源存入urls_all_lines + # 读取上面 url 清单中直播源存入 urls_all_lines + process_url(url) # 获取当前脚本所在的目录 current_dir = os.path.dirname(os.path.abspath(__file__))