Update p2p.py

This commit is contained in:
frxz751113
2024-10-01 02:28:51 +08:00
committed by GitHub
parent 4c01b91011
commit a7b1ff7a5b
+16 -22
View File
@@ -8,10 +8,10 @@ import socket #check p3p源 rtp源
import subprocess #check rtmp源
timestart = datetime.now()
#需屏蔽IP域名
#定义需屏蔽IP域名
BlackHost=["127.0.0.1:8080","live3.lalifeier.eu.org","newcntv.qcloudcdn.com"]
# 读取文件内容
################################################ 读取文件内容
def read_txt_file(file_path):
# 定义需要跳过的字符串数组
skip_strings = ['#genre#']
@@ -25,7 +25,8 @@ def read_txt_file(file_path):
]
return lines
# 检测 URL 是否可访问并记录响应时间
################################################# 检测 URL 是否可访问并记录响应时间
def check_url(url, timeout=6):
start_time = time.time()
elapsed_time = None
@@ -57,7 +58,6 @@ def check_url(url, timeout=6):
success = check_rtp_url(url, timeout)
# 如果执行到这一步,没有异常,计算时间
elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒
except Exception as e:
print(f"Error checking {url}: {e}")
record_host(get_host_from_url(url))
@@ -66,6 +66,7 @@ def check_url(url, timeout=6):
return elapsed_time, success
################################################
def check_rtmp_url(url, timeout):
try:
# 使用 subprocess 模块运行 ffprobe 命令来检查 rtmp 地址
@@ -81,15 +82,14 @@ def check_rtmp_url(url, timeout):
print(f"Error checking {url}: {e}")
return False
################################################
def check_rtp_url(url, timeout):
try:
# 解析 URL
parsed_url = urlparse(url)
# 提取主机名(IP 地址)和端口号
host = parsed_url.hostname
port = parsed_url.port
# 创建一个 UDP 套接字连接
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(timeout) # 设置超时时间
@@ -101,6 +101,7 @@ def check_rtp_url(url, timeout):
except (socket.timeout, socket.error):
return False
################################################
def check_p3p_url(url, timeout):
try:
# 解析 URL
@@ -108,20 +109,16 @@ def check_p3p_url(url, timeout):
host = parsed_url.hostname
port = parsed_url.port
path = parsed_url.path
# 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常
if not host or not port or not path:
raise ValueError("Invalid p3p URL")
# 创建一个 TCP 连接
with socket.create_connection((host, port), timeout=timeout) as s:
# 构造请求,根据协议定义可能需要调整
request = f"GET {path} P3P/1.0\r\nHost: {host}\r\n\r\n"
s.sendall(request.encode()) # 发送请求
# 读取响应
response = s.recv(1024)
# 简单判断是否收到有效响应,如果响应中包含 "P3P",则认为地址有效
if b"P3P" in response:
return True
@@ -130,6 +127,7 @@ def check_p3p_url(url, timeout):
print(f"Error checking {url}: {e}")
return False
################################################
def check_p2p_url(url, timeout):
try:
# 解析 URL
@@ -137,20 +135,16 @@ def check_p2p_url(url, timeout):
host = parsed_url.hostname
port = parsed_url.port
path = parsed_url.path
# 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常
if not host or not port or not path:
raise ValueError("Invalid P2P URL")
# 创建一个 TCP 连接
with socket.create_connection((host, port), timeout=timeout) as s:
# 自定义请求,这里只是一个占位符,需根据具体协议定义
request = f"YOUR_CUSTOM_REQUEST {path}\r\nHost: {host}\r\n\r\n"
s.sendall(request.encode()) # 发送请求
# 读取响应
response = s.recv(1024)
# 自定义响应解析,这里简单示例,如果响应中包含特定内容,则认为地址有效
if b"SOME_EXPECTED_RESPONSE" in response:
return True
@@ -160,7 +154,8 @@ def check_p2p_url(url, timeout):
return False
# 处理单行文本并检测 URL
################################################# 处理单行文本并检测 URL
def process_line(line):
# 如果行中包含“#genre#”或者不包含“://”,则跳过该行
if "#genre#" in line or "://" not in line :
@@ -179,7 +174,8 @@ def process_line(line):
return None, line.strip()
return None, None
# 多线程处理文本并检测 URL
################################################# 多线程处理文本并检测 URL
def process_urls_multithreaded(lines, max_workers=30):
blacklist = []
successlist = []
@@ -200,7 +196,8 @@ def process_urls_multithreaded(lines, max_workers=30):
blacklist.append(result)
return successlist, blacklist
# 写入文件
################################################# 写入文件
def write_list(file_path, data_list):
with open(file_path, 'w', encoding='utf-8') as file:
# 遍历列表中的每个元素并写入文件
@@ -211,6 +208,7 @@ def write_list(file_path, data_list):
# urls 里所有的源都读到这里。
urls_all_lines = []
################################################
def get_url_file_extension(url):
# 解析 URL
parsed_url = urlparse(url)
@@ -220,16 +218,14 @@ def get_url_file_extension(url):
extension = os.path.splitext(path)[1]
return extension
################################################
def convert_m3u_to_txt(m3u_content):
# 分行处理
lines = m3u_content.split('\n')
# 用于存储结果的列表
txt_lines = []
# 临时变量用于存储频道名称
channel_name = ""
for line in lines:
# 过滤掉 #EXTM3U 开头的行
if line.startswith("#EXTM3U"):
@@ -241,7 +237,6 @@ def convert_m3u_to_txt(m3u_content):
# 处理 URL 行
elif line.startswith("http"):
txt_lines.append(f"{channel_name},{line.strip()}")
# 将结果合并成一个字符串,以换行符分隔
# return '\n'.join(txt_lines)
return txt_lines
@@ -272,7 +267,6 @@ def process_url(url):
#channel_name=line.split(',')[0].strip()
#channel_address=line.split(',')[1].strip()
urls_all_lines.append(line.strip())
except Exception as e:
# 如果处理 URL 时发生错误,打印错误信息
print(f"处理URL时发生错误:{e}")