From ec0e2dbe57ffa857ea45a72ae49ac6a8934277b2 Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Tue, 1 Oct 2024 03:47:35 +0800 Subject: [PATCH] Update p2p.py --- py/p2p.py | 388 ++++-------------------------------------------------- 1 file changed, 29 insertions(+), 359 deletions(-) diff --git a/py/p2p.py b/py/p2p.py index 6c6a52f..02571de 100644 --- a/py/p2p.py +++ b/py/p2p.py @@ -4,99 +4,72 @@ import time from datetime import datetime import os from urllib.parse import urlparse -import socket #check p3p源 rtp源 -import subprocess #check rtmp源 +import socket +import subprocess +# 当前日期 timestart = datetime.now() -#定义需屏蔽IP域名 -BlackHost=["127.0.0.1:8080","live3.lalifeier.eu.org","newcntv.qcloudcdn.com"] + +# 定义需屏蔽 IP 域名 +BlackHost = ["127.0.0.1:8080", "live3.lalifeier.eu.org", "newcntv.qcloudcdn.com"] ################################################ 读取文件内容 def read_txt_file(file_path): - # 定义需要跳过的字符串数组 - skip_strings = ['#genre#'] - # 定义需要包含的字符串数组 - required_strings = ['://'] with open(file_path, 'r', encoding='utf-8') as file: - # 列表推导式,筛选出既不包含跳过字符串且包含所需字符串的行 - lines = [ - line for line in file - if not any(skip_str in line for skip_str in skip_strings) and all(req_str in line for req_str in required_strings) - ] + lines = file.readlines() return lines - ################################################# 检测 URL 是否可访问并记录响应时间 def check_url(url, timeout=6): start_time = time.time() elapsed_time = None success = False try: - # 如果 URL 以 http 开头 if url.startswith("http"): headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', } - # 创建请求对象 req = urllib.request.Request(url, headers=headers) - # 发送请求并获取响应 with urllib.request.urlopen(req, timeout=timeout) as response: - # 如果响应状态码为 200,表示成功 if response.status == 200: success = True - # 如果 URL 以 p3p 开头 elif url.startswith("p3p"): success = check_p3p_url(url, timeout) - # 如果 URL 以 p2p 开头 elif url.startswith("p2p"): - success = check_p2p_url(url, timeout) - # 如果 URL 以 rtmp 或 rtsp 开头 - elif url.startswith("rtmp") or url.startswith("rtsp") : + success = check_p2p_url(url, timeout) + elif url.startswith("rtmp") or url.startswith("rtsp"): success = check_rtmp_url(url, timeout) - # 如果 URL 以 rtp 开头 elif url.startswith("rtp"): success = check_rtp_url(url, timeout) - # 如果执行到这一步,没有异常,计算时间 - elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒 + elapsed_time = (time.time() - start_time) * 1000 except Exception as e: print(f"Error checking {url}: {e}") - record_host(get_host_from_url(url)) - # 在发生异常的情况下,将 elapsed_time 设置为 None elapsed_time = None return elapsed_time, success - ################################################ def check_rtmp_url(url, timeout): try: - # 使用 subprocess 模块运行 ffprobe 命令来检查 rtmp 地址 result = subprocess.run(['ffprobe', url], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) - # 如果返回码为 0,表示命令执行成功,即 rtmp 地址有效 if result.returncode == 0: return True except subprocess.TimeoutExpired: - # 如果超时,打印超时信息 print(f"Timeout checking {url}") except Exception as e: - # 如果发生其他异常,打印错误信息 print(f"Error checking {url}: {e}") return False ################################################ def check_rtp_url(url, timeout): try: - # 解析 URL parsed_url = urlparse(url) - # 提取主机名(IP 地址)和端口号 host = parsed_url.hostname port = parsed_url.port - # 创建一个 UDP 套接字连接 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.settimeout(timeout) # 设置超时时间 - # 连接到指定的主机和端口 + s.settimeout(timeout) s.connect((host, port)) - s.sendto(b'', (host, port)) # 发送空的 UDP 数据包 - s.recv(1) # 尝试接收数据 + s.sendto(b'', (host, port)) + s.recv(1) return True except (socket.timeout, socket.error): return False @@ -104,376 +77,73 @@ def check_rtp_url(url, timeout): ################################################ def check_p3p_url(url, timeout): try: - # 解析 URL parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port path = parsed_url.path - # 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常 if not host or not port or not path: raise ValueError("Invalid p3p URL") - # 创建一个 TCP 连接 with socket.create_connection((host, port), timeout=timeout) as s: - # 构造请求,根据协议定义可能需要调整 request = f"GET {path} P3P/1.0\r\nHost: {host}\r\n\r\n" - s.sendall(request.encode()) # 发送请求 - # 读取响应 + s.sendall(request.encode()) response = s.recv(1024) - # 简单判断是否收到有效响应,如果响应中包含 "P3P",则认为地址有效 if b"P3P" in response: return True except Exception as e: - # 如果发生异常,打印错误信息 print(f"Error checking {url}: {e}") return False ################################################ def check_p2p_url(url, timeout): try: - # 解析 URL parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port path = parsed_url.path - # 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常 if not host or not port or not path: raise ValueError("Invalid P2P URL") - # 创建一个 TCP 连接 with socket.create_connection((host, port), timeout=timeout) as s: - # 自定义请求,这里只是一个占位符,需根据具体协议定义 request = f"YOUR_CUSTOM_REQUEST {path}\r\nHost: {host}\r\n\r\n" - s.sendall(request.encode()) # 发送请求 - # 读取响应 + s.sendall(request.encode()) response = s.recv(1024) - # 自定义响应解析,这里简单示例,如果响应中包含特定内容,则认为地址有效 if b"SOME_EXPECTED_RESPONSE" in response: return True except Exception as e: - # 如果发生异常,打印错误信息 print(f"Error checking {url}: {e}") return False - - ################################################# 处理单行文本并检测 URL def process_line(line): - # 如果行中包含“#genre#”或者不包含“://”,则跳过该行 - if "#genre#" in line or "://" not in line : - return None, None + if "#genre#" in line: + return line.strip() parts = line.split(',') - # 如果该行按逗号分割后长度为 2,表示有名称和 URL if len(parts) == 2: name, url = parts - # 检测 URL 的有效性并获取响应时间 elapsed_time, is_valid = check_url(url.strip()) - # 如果 URL 有效,返回响应时间和该行文本 if is_valid: - return elapsed_time, line.strip() - else: - # 如果 URL 无效,返回 None 和该行文本 - return None, line.strip() - return None, None - + return f"{elapsed_time:.2f}ms,{name},{url.strip()}" + return None ################################################# 多线程处理文本并检测 URL def process_urls_multithreaded(lines, max_workers=30): - blacklist = [] - successlist = [] - # 创建线程池执行器 + results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: - # 提交每个行的处理任务,并将任务和对应的行存储在字典中 futures = {executor.submit(process_line, line): line for line in lines} - # 遍历已完成的任务 for future in as_completed(futures): - elapsed_time, result = future.result() - # 如果有结果 + result = future.result() if result: - # 如果响应时间不为 None,将其添加到成功列表中 - if elapsed_time is not None: - successlist.append(f"{elapsed_time:.2f}ms,{result}") - else: - # 如果响应时间为 None,将其添加到黑名单中 - blacklist.append(result) - return successlist, blacklist - + results.append(result) + return results ################################################# 写入文件 def write_list(file_path, data_list): with open(file_path, 'w', encoding='utf-8') as file: - # 遍历列表中的每个元素并写入文件 for item in data_list: file.write(item + '\n') -# 增加外部 url 到检测清单,同时支持检测 m3u 格式 url -# urls 里所有的源都读到这里。 -urls_all_lines = [] - -################################################ -def get_url_file_extension(url): - # 解析 URL - parsed_url = urlparse(url) - # 获取路径部分 - path = parsed_url.path - # 提取文件扩展名 - extension = os.path.splitext(path)[1] - return extension - -################################################ -def convert_m3u_to_txt(m3u_content): - # 分行处理 - lines = m3u_content.split('\n') - # 用于存储结果的列表 - txt_lines = [] - # 临时变量用于存储频道名称 - channel_name = "" - for line in lines: - # 过滤掉 #EXTM3U 开头的行 - if line.startswith("#EXTM3U"): - continue - # 处理 #EXTINF 开头的行 - if line.startswith("#EXTINF"): - # 获取频道名称(假设频道名称在引号后) - channel_name = line.split(',')[-1].strip() - # 处理 URL 行 - elif line.startswith("http"): - txt_lines.append(f"{channel_name},{line.strip()}") - # 将结果合并成一个字符串,以换行符分隔 - # return '\n'.join(txt_lines) - return txt_lines - -url_statistics=[] - -def process_url(url): - try: - # 打开 URL 并读取内容 - with urllib.request.urlopen(url) as response: - # 以二进制方式读取数据 - data = response.read() - # 将二进制数据解码为字符串 - text = data.decode('utf-8') - # 如果 URL 的文件扩展名是.m3u 或.m3u8 - if get_url_file_extension(url)==".m3u" or get_url_file_extension(url)==".m3u8": - m3u_lines=convert_m3u_to_txt(text) - # 记录 m3u 文件中的行数和 URL - url_statistics.append(f"{len(m3u_lines)},{url.strip()}") - urls_all_lines.extend(m3u_lines) # 注意:extend - # 如果 URL 的文件扩展名是.txt - elif get_url_file_extension(url)==".txt": - lines = text.split('\n') - # 记录 txt 文件中的行数和 URL - url_statistics.append(f"{len(lines)},{url.strip()}") - for line in lines: - if "#genre#" not in line and "," in line and "://" in line: - #channel_name=line.split(',')[0].strip() - #channel_address=line.split(',')[1].strip() - urls_all_lines.append(line.strip()) - except Exception as e: - # 如果处理 URL 时发生错误,打印错误信息 - print(f"处理URL时发生错误:{e}") - - -# 去重复源 2024-08-06 (检测前剔除重复 url,提高检测效率) -def remove_duplicates_url(lines): - urls =[] - newlines=[] - for line in lines: - if "," in line and "://" in line: - # channel_name=line.split(',')[0].strip() - channel_url=line.split(',')[1].strip() - # 如果当前 URL 不在列表中,则添加到新列表和 URL 列表中 - if channel_url not in urls: - urls.append(channel_url) - newlines.append(line) - return newlines - - - -# 处理带$的 URL,把$之后的内容都去掉(包括$也去掉) 【2024-08-08 22:29:11】 -def clean_url(lines): - urls =[] - newlines=[] - for line in lines: - if "," in line and "://" in line: - last_dollar_index = line.rfind('$') - if last_dollar_index!= -1: - line=line[:last_dollar_index] - newlines.append(line) - return newlines - -# 处理带#的 URL 【2024-08-09 23:53:26】 -def split_url(lines): - newlines=[] - for line in lines: - # 拆分成频道名和 URL 部分 - channel_name, channel_address = line.split(',', 1) - # 需要加处理带#号源=予加速源 - if "#" not in channel_address: - newlines.append(line) - elif "#" in channel_address and "://" in channel_address: - # 如果有“#”号,则根据“#”号分隔 - url_list = channel_address.split('#') - for url in url_list: - if "://" in url: - newline=f'{channel_name},{url}' - newlines.append(line) - return newlines - -# 取得 host -def get_host_from_url(url: str) -> str: - try: - # 解析 URL - parsed_url = urlparse(url) - # 返回 URL 的 netloc(网络位置,通常是主机名和端口号) - return parsed_url.netloc - except Exception as e: - # 如果发生错误,返回错误信息 - return f"Error: {str(e)}" - -# 使用字典来统计 blackhost 的记录次数 -blacklist_dict = {} -def record_host(host): - # 如果 host 已经在字典中,计数加 1 - if host in blacklist_dict: - blacklist_dict[host] += 1 - # 如果 host 不在字典中,加入并初始化计数为 1 - else: - blacklist_dict[host] = 1 -# 将结果保存为 txt 文件 -def save_blackhost_to_txt(filename=f"{datetime.now().strftime('%Y%m%d_%H_%M_%S')}_blackhost_count.txt"): - with open(filename, "w") as file: - # 遍历字典,将 host 和对应的计数写入文件 - for host, count in blacklist_dict.items(): - file.write(f"{host}: {count}\n") - print(f"结果已保存到 {filename}") - if __name__ == "__main__": - # 定义要访问的多个 URL - urls = [ - #'https://raw.githubusercontent.com/YanG-1989/m3u/main/Gather.m3u', - #'https://raw.githubusercontent.com/iptv-org/iptv/master/streams/cn.m3u', - 'https://raw.bgithub.xyz/frxz751113/IPTVzb1/refs/heads/main/%E7%BB%BC%E5%90%88%E6%BA%90.txt', - 'https://raw.bgithub.xyz/newrecha/TVBOX/33d46519cbe0deb5f62d5d979dcdc8833295c66e/live/240919-1.txt' - #'' - ] - for url in urls: - print(f"处理URL: {url}") - # 读取上面 url 清单中直播源存入 urls_all_lines - process_url(url) - - # 获取当前脚本所在的目录 - current_dir = os.path.dirname(os.path.abspath(__file__)) - # 获取上一层目录 - parent_dir = os.path.dirname(current_dir) - - input_file1 = os.path.join(parent_dir, '综合源.txt') # 输入文件路径1 - input_file2 = os.path.join(current_dir, 'gat.txt') # 输入文件路径2 - success_file = os.path.join(current_dir, 'whitelist_auto.txt') # 成功清单文件路径 - success_file_tv = os.path.join(current_dir, 'whitelist_auto_tv.txt') # 成功清单文件路径(另存一份直接引用源) - blacklist_file = os.path.join(current_dir, 'blacklist_auto.txt') # 黑名单文件路径 - - # 读取输入文件内容 - lines1 = read_txt_file(input_file1) - lines2 = read_txt_file(input_file2) - lines=urls_all_lines + lines1 + lines2 # 从list变成集合提供检索效率⇒发现用了set后加#合并多行url,故去掉 - #lines=urls_all_lines # Test - - # 计算合并后合计个数 - urls_hj_before = len(lines) - - # 分级带#号直播源地址 - lines=split_url(lines) - urls_hj_before2 = len(lines) - - # 去$ - lines=clean_url(lines) - urls_hj_before3 = len(lines) - - # 去重 - lines=remove_duplicates_url(lines) - urls_hj = len(lines) - - # 处理URL并生成成功清单和黑名单 - successlist, blacklist = process_urls_multithreaded(lines) - - # 给successlist, blacklist排序 - # 定义排序函数 - def successlist_sort_key(item): - time_str = item.split(',')[0].replace('ms', '') - return float(time_str) - - successlist=sorted(successlist, key=successlist_sort_key) - blacklist=sorted(blacklist) - - # 计算check后ok和ng个数 - urls_ok = len(successlist) - urls_ng = len(blacklist) - - # 把successlist整理一下,生成一个可以直接引用的源,方便用zyplayer手动check - def remove_prefix_from_lines(lines): - result = [] - for line in lines: - if "#genre#" not in line and "," in line and "://" in line: - parts = line.split(",") - result.append(",".join(parts[1:])) - return result - - - # 加时间戳等 - version=datetime.now().strftime("%Y%m%d-%H-%M-%S")+",url" - successlist_tv = ["更新时间,#genre#"] +[version] + ['\n'] +\ - ["whitelist,#genre#"] + remove_prefix_from_lines(successlist) - successlist = ["更新时间,#genre#"] +[version] + ['\n'] +\ - ["RespoTime,whitelist,#genre#"] + successlist - blacklist = ["更新时间,#genre#"] +[version] + ['\n'] +\ - ["blacklist,#genre#"] + blacklist - - # 写入成功清单文件 - write_list(success_file, successlist) - write_list(success_file_tv, successlist_tv) - - # 写入黑名单文件 - write_list(blacklist_file, blacklist) - - print(f"成功清单文件已生成: {success_file}") - print(f"成功清单文件已生成(tv): {success_file_tv}") - print(f"黑名单文件已生成: {blacklist_file}") - - # 写入history - timenow=datetime.now().strftime("%Y%m%d_%H_%M_%S") - history_success_file = f'history/blacklist/{timenow}_whitelist_auto.txt' - history_blacklist_file = f'history/blacklist/{timenow}_blacklist_auto.txt' - write_list(history_success_file, successlist) - write_list(history_blacklist_file, blacklist) - print(f"history成功清单文件已生成: {history_success_file}") - print(f"history黑名单文件已生成: {history_blacklist_file}") - - # 执行的代码 - timeend = datetime.now() - - # 计算时间差 - elapsed_time = timeend - timestart - total_seconds = elapsed_time.total_seconds() - - # 转换为分钟和秒 - minutes = int(total_seconds // 60) - seconds = int(total_seconds % 60) - - # 格式化开始和结束时间 - timestart_str = timestart.strftime("%Y%m%d_%H_%M_%S") - timeend_str = timeend.strftime("%Y%m%d_%H_%M_%S") - - print(f"开始时间: {timestart_str}") - print(f"结束时间: {timeend_str}") - print(f"执行时间: {minutes} 分 {seconds} 秒") - print(f"urls_hj最初: {urls_hj_before} ") - print(f"urls_hj分解井号源后: {urls_hj_before2} ") - print(f"urls_hj去$后: {urls_hj_before3} ") - print(f"urls_hj去重后: {urls_hj} ") - print(f" urls_ok: {urls_ok} ") - print(f" urls_ng: {urls_ng} ") - - save_blackhost_to_txt() - -for statistics in url_statistics: #查看各个url的量有多少 2024-08-19 - print(statistics) - + input_file_path = "综合源.txt" # 替换为你的输入文件路径 + output_file_path = "your.txt" # 替换为你的输出文件路径 + lines = read_txt_file(input_file_path) + results = process_urls_multithreaded(lines) + write_list(output_file_path, results)