Update p2p.py
This commit is contained in:
@@ -8,15 +8,17 @@ import socket #check p3p源 rtp源
|
|||||||
import subprocess #check rtmp源
|
import subprocess #check rtmp源
|
||||||
|
|
||||||
timestart = datetime.now()
|
timestart = datetime.now()
|
||||||
|
#需屏蔽IP域名
|
||||||
BlackHost=["127.0.0.1:8080","live3.lalifeier.eu.org","newcntv.qcloudcdn.com"]
|
BlackHost=["127.0.0.1:8080","live3.lalifeier.eu.org","newcntv.qcloudcdn.com"]
|
||||||
|
|
||||||
# 读取文件内容
|
# 读取文件内容
|
||||||
def read_txt_file(file_path):
|
def read_txt_file(file_path):
|
||||||
skip_strings = ['#genre#'] # 定义需要跳过的字符串数组['#', '@', '#genre#']
|
# 定义需要跳过的字符串数组
|
||||||
required_strings = ['://'] # 定义需要包含的字符串数组['必需字符1', '必需字符2']
|
skip_strings = ['#genre#']
|
||||||
|
# 定义需要包含的字符串数组
|
||||||
|
required_strings = ['://']
|
||||||
with open(file_path, 'r', encoding='utf-8') as file:
|
with open(file_path, 'r', encoding='utf-8') as file:
|
||||||
|
# 列表推导式,筛选出既不包含跳过字符串且包含所需字符串的行
|
||||||
lines = [
|
lines = [
|
||||||
line for line in file
|
line for line in file
|
||||||
if not any(skip_str in line for skip_str in skip_strings) and all(req_str in line for req_str in required_strings)
|
if not any(skip_str in line for skip_str in skip_strings) and all(req_str in line for req_str in required_strings)
|
||||||
@@ -28,25 +30,31 @@ def check_url(url, timeout=6):
|
|||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
elapsed_time = None
|
elapsed_time = None
|
||||||
success = False
|
success = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# 如果 URL 以 http 开头
|
||||||
if url.startswith("http"):
|
if url.startswith("http"):
|
||||||
headers = {
|
headers = {
|
||||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
||||||
}
|
}
|
||||||
|
# 创建请求对象
|
||||||
req = urllib.request.Request(url, headers=headers)
|
req = urllib.request.Request(url, headers=headers)
|
||||||
|
# 发送请求并获取响应
|
||||||
with urllib.request.urlopen(req, timeout=timeout) as response:
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
||||||
|
# 如果响应状态码为 200,表示成功
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
success = True
|
success = True
|
||||||
|
# 如果 URL 以 p3p 开头
|
||||||
elif url.startswith("p3p"):
|
elif url.startswith("p3p"):
|
||||||
success = check_p3p_url(url, timeout)
|
success = check_p3p_url(url, timeout)
|
||||||
|
# 如果 URL 以 p2p 开头
|
||||||
elif url.startswith("p2p"):
|
elif url.startswith("p2p"):
|
||||||
success = check_p2p_url(url, timeout)
|
success = check_p2p_url(url, timeout)
|
||||||
|
# 如果 URL 以 rtmp 或 rtsp 开头
|
||||||
elif url.startswith("rtmp") or url.startswith("rtsp") :
|
elif url.startswith("rtmp") or url.startswith("rtsp") :
|
||||||
success = check_rtmp_url(url, timeout)
|
success = check_rtmp_url(url, timeout)
|
||||||
|
# 如果 URL 以 rtp 开头
|
||||||
elif url.startswith("rtp"):
|
elif url.startswith("rtp"):
|
||||||
success = check_rtp_url(url, timeout)
|
success = check_rtp_url(url, timeout)
|
||||||
|
|
||||||
# 如果执行到这一步,没有异常,计算时间
|
# 如果执行到这一步,没有异常,计算时间
|
||||||
elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒
|
elapsed_time = (time.time() - start_time) * 1000 # 转换为毫秒
|
||||||
|
|
||||||
@@ -55,17 +63,21 @@ def check_url(url, timeout=6):
|
|||||||
record_host(get_host_from_url(url))
|
record_host(get_host_from_url(url))
|
||||||
# 在发生异常的情况下,将 elapsed_time 设置为 None
|
# 在发生异常的情况下,将 elapsed_time 设置为 None
|
||||||
elapsed_time = None
|
elapsed_time = None
|
||||||
|
|
||||||
return elapsed_time, success
|
return elapsed_time, success
|
||||||
|
|
||||||
|
|
||||||
def check_rtmp_url(url, timeout):
|
def check_rtmp_url(url, timeout):
|
||||||
try:
|
try:
|
||||||
|
# 使用 subprocess 模块运行 ffprobe 命令来检查 rtmp 地址
|
||||||
result = subprocess.run(['ffprobe', url], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
|
result = subprocess.run(['ffprobe', url], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
|
||||||
|
# 如果返回码为 0,表示命令执行成功,即 rtmp 地址有效
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
return True
|
return True
|
||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
|
# 如果超时,打印超时信息
|
||||||
print(f"Timeout checking {url}")
|
print(f"Timeout checking {url}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# 如果发生其他异常,打印错误信息
|
||||||
print(f"Error checking {url}: {e}")
|
print(f"Error checking {url}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -78,9 +90,10 @@ def check_rtp_url(url, timeout):
|
|||||||
host = parsed_url.hostname
|
host = parsed_url.hostname
|
||||||
port = parsed_url.port
|
port = parsed_url.port
|
||||||
|
|
||||||
# 创建一个 socket 连接
|
# 创建一个 UDP 套接字连接
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
||||||
s.settimeout(timeout) # 设置超时时间
|
s.settimeout(timeout) # 设置超时时间
|
||||||
|
# 连接到指定的主机和端口
|
||||||
s.connect((host, port))
|
s.connect((host, port))
|
||||||
s.sendto(b'', (host, port)) # 发送空的 UDP 数据包
|
s.sendto(b'', (host, port)) # 发送空的 UDP 数据包
|
||||||
s.recv(1) # 尝试接收数据
|
s.recv(1) # 尝试接收数据
|
||||||
@@ -96,23 +109,24 @@ def check_p3p_url(url, timeout):
|
|||||||
port = parsed_url.port
|
port = parsed_url.port
|
||||||
path = parsed_url.path
|
path = parsed_url.path
|
||||||
|
|
||||||
# 检查解析是否成功
|
# 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常
|
||||||
if not host or not port or not path:
|
if not host or not port or not path:
|
||||||
raise ValueError("Invalid p3p URL")
|
raise ValueError("Invalid p3p URL")
|
||||||
|
|
||||||
# 创建一个 TCP 连接
|
# 创建一个 TCP 连接
|
||||||
with socket.create_connection((host, port), timeout=timeout) as s:
|
with socket.create_connection((host, port), timeout=timeout) as s:
|
||||||
# 发送一个简单的请求(根据协议定义可能需要调整)
|
# 构造请求,根据协议定义可能需要调整
|
||||||
request = f"GET {path} P3P/1.0\r\nHost: {host}\r\n\r\n"
|
request = f"GET {path} P3P/1.0\r\nHost: {host}\r\n\r\n"
|
||||||
s.sendall(request.encode())
|
s.sendall(request.encode()) # 发送请求
|
||||||
|
|
||||||
# 读取响应
|
# 读取响应
|
||||||
response = s.recv(1024)
|
response = s.recv(1024)
|
||||||
|
|
||||||
# 简单判断是否收到有效响应
|
# 简单判断是否收到有效响应,如果响应中包含 "P3P",则认为地址有效
|
||||||
if b"P3P" in response:
|
if b"P3P" in response:
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# 如果发生异常,打印错误信息
|
||||||
print(f"Error checking {url}: {e}")
|
print(f"Error checking {url}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -124,7 +138,7 @@ def check_p2p_url(url, timeout):
|
|||||||
port = parsed_url.port
|
port = parsed_url.port
|
||||||
path = parsed_url.path
|
path = parsed_url.path
|
||||||
|
|
||||||
# 检查解析是否成功
|
# 检查解析是否成功,如果主机名、端口号或路径有缺失,则抛出 ValueError 异常
|
||||||
if not host or not port or not path:
|
if not host or not port or not path:
|
||||||
raise ValueError("Invalid P2P URL")
|
raise ValueError("Invalid P2P URL")
|
||||||
|
|
||||||
@@ -132,29 +146,36 @@ def check_p2p_url(url, timeout):
|
|||||||
with socket.create_connection((host, port), timeout=timeout) as s:
|
with socket.create_connection((host, port), timeout=timeout) as s:
|
||||||
# 自定义请求,这里只是一个占位符,需根据具体协议定义
|
# 自定义请求,这里只是一个占位符,需根据具体协议定义
|
||||||
request = f"YOUR_CUSTOM_REQUEST {path}\r\nHost: {host}\r\n\r\n"
|
request = f"YOUR_CUSTOM_REQUEST {path}\r\nHost: {host}\r\n\r\n"
|
||||||
s.sendall(request.encode())
|
s.sendall(request.encode()) # 发送请求
|
||||||
|
|
||||||
# 读取响应
|
# 读取响应
|
||||||
response = s.recv(1024)
|
response = s.recv(1024)
|
||||||
|
|
||||||
# 自定义响应解析,这里简单示例
|
# 自定义响应解析,这里简单示例,如果响应中包含特定内容,则认为地址有效
|
||||||
if b"SOME_EXPECTED_RESPONSE" in response:
|
if b"SOME_EXPECTED_RESPONSE" in response:
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# 如果发生异常,打印错误信息
|
||||||
print(f"Error checking {url}: {e}")
|
print(f"Error checking {url}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
# 处理单行文本并检测 URL
|
# 处理单行文本并检测 URL
|
||||||
def process_line(line):
|
def process_line(line):
|
||||||
|
# 如果行中包含“#genre#”或者不包含“://”,则跳过该行
|
||||||
if "#genre#" in line or "://" not in line :
|
if "#genre#" in line or "://" not in line :
|
||||||
return None, None # 跳过包含“#genre#”的行
|
return None, None
|
||||||
parts = line.split(',')
|
parts = line.split(',')
|
||||||
|
# 如果该行按逗号分割后长度为 2,表示有名称和 URL
|
||||||
if len(parts) == 2:
|
if len(parts) == 2:
|
||||||
name, url = parts
|
name, url = parts
|
||||||
|
# 检测 URL 的有效性并获取响应时间
|
||||||
elapsed_time, is_valid = check_url(url.strip())
|
elapsed_time, is_valid = check_url(url.strip())
|
||||||
|
# 如果 URL 有效,返回响应时间和该行文本
|
||||||
if is_valid:
|
if is_valid:
|
||||||
return elapsed_time, line.strip()
|
return elapsed_time, line.strip()
|
||||||
else:
|
else:
|
||||||
|
# 如果 URL 无效,返回 None 和该行文本
|
||||||
return None, line.strip()
|
return None, line.strip()
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
@@ -162,21 +183,27 @@ def process_line(line):
|
|||||||
def process_urls_multithreaded(lines, max_workers=30):
|
def process_urls_multithreaded(lines, max_workers=30):
|
||||||
blacklist = []
|
blacklist = []
|
||||||
successlist = []
|
successlist = []
|
||||||
|
# 创建线程池执行器
|
||||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
# 提交每个行的处理任务,并将任务和对应的行存储在字典中
|
||||||
futures = {executor.submit(process_line, line): line for line in lines}
|
futures = {executor.submit(process_line, line): line for line in lines}
|
||||||
|
# 遍历已完成的任务
|
||||||
for future in as_completed(futures):
|
for future in as_completed(futures):
|
||||||
elapsed_time, result = future.result()
|
elapsed_time, result = future.result()
|
||||||
|
# 如果有结果
|
||||||
if result:
|
if result:
|
||||||
|
# 如果响应时间不为 None,将其添加到成功列表中
|
||||||
if elapsed_time is not None:
|
if elapsed_time is not None:
|
||||||
successlist.append(f"{elapsed_time:.2f}ms,{result}")
|
successlist.append(f"{elapsed_time:.2f}ms,{result}")
|
||||||
else:
|
else:
|
||||||
|
# 如果响应时间为 None,将其添加到黑名单中
|
||||||
blacklist.append(result)
|
blacklist.append(result)
|
||||||
return successlist, blacklist
|
return successlist, blacklist
|
||||||
|
|
||||||
# 写入文件
|
# 写入文件
|
||||||
def write_list(file_path, data_list):
|
def write_list(file_path, data_list):
|
||||||
with open(file_path, 'w', encoding='utf-8') as file:
|
with open(file_path, 'w', encoding='utf-8') as file:
|
||||||
|
# 遍历列表中的每个元素并写入文件
|
||||||
for item in data_list:
|
for item in data_list:
|
||||||
file.write(item + '\n')
|
file.write(item + '\n')
|
||||||
|
|
||||||
@@ -229,12 +256,16 @@ def process_url(url):
|
|||||||
data = response.read()
|
data = response.read()
|
||||||
# 将二进制数据解码为字符串
|
# 将二进制数据解码为字符串
|
||||||
text = data.decode('utf-8')
|
text = data.decode('utf-8')
|
||||||
|
# 如果 URL 的文件扩展名是.m3u 或.m3u8
|
||||||
if get_url_file_extension(url)==".m3u" or get_url_file_extension(url)==".m3u8":
|
if get_url_file_extension(url)==".m3u" or get_url_file_extension(url)==".m3u8":
|
||||||
m3u_lines=convert_m3u_to_txt(text)
|
m3u_lines=convert_m3u_to_txt(text)
|
||||||
|
# 记录 m3u 文件中的行数和 URL
|
||||||
url_statistics.append(f"{len(m3u_lines)},{url.strip()}")
|
url_statistics.append(f"{len(m3u_lines)},{url.strip()}")
|
||||||
urls_all_lines.extend(m3u_lines) # 注意:extend
|
urls_all_lines.extend(m3u_lines) # 注意:extend
|
||||||
|
# 如果 URL 的文件扩展名是.txt
|
||||||
elif get_url_file_extension(url)==".txt":
|
elif get_url_file_extension(url)==".txt":
|
||||||
lines = text.split('\n')
|
lines = text.split('\n')
|
||||||
|
# 记录 txt 文件中的行数和 URL
|
||||||
url_statistics.append(f"{len(lines)},{url.strip()}")
|
url_statistics.append(f"{len(lines)},{url.strip()}")
|
||||||
for line in lines:
|
for line in lines:
|
||||||
if "#genre#" not in line and "," in line and "://" in line:
|
if "#genre#" not in line and "," in line and "://" in line:
|
||||||
@@ -243,6 +274,7 @@ def process_url(url):
|
|||||||
urls_all_lines.append(line.strip())
|
urls_all_lines.append(line.strip())
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# 如果处理 URL 时发生错误,打印错误信息
|
||||||
print(f"处理URL时发生错误:{e}")
|
print(f"处理URL时发生错误:{e}")
|
||||||
|
|
||||||
|
|
||||||
@@ -254,17 +286,16 @@ def remove_duplicates_url(lines):
|
|||||||
if "," in line and "://" in line:
|
if "," in line and "://" in line:
|
||||||
# channel_name=line.split(',')[0].strip()
|
# channel_name=line.split(',')[0].strip()
|
||||||
channel_url=line.split(',')[1].strip()
|
channel_url=line.split(',')[1].strip()
|
||||||
if channel_url not in urls: # 如果发现当前url不在清单中,则假如newlines
|
# 如果当前 URL 不在列表中,则添加到新列表和 URL 列表中
|
||||||
|
if channel_url not in urls:
|
||||||
urls.append(channel_url)
|
urls.append(channel_url)
|
||||||
newlines.append(line)
|
newlines.append(line)
|
||||||
return newlines
|
return newlines
|
||||||
|
|
||||||
|
|
||||||
|
python
|
||||||
|
复制
|
||||||
# 处理带$的 URL,把$之后的内容都去掉(包括$也去掉) 【2024-08-08 22:29:11】
|
# 处理带$的 URL,把$之后的内容都去掉(包括$也去掉) 【2024-08-08 22:29:11】
|
||||||
#def clean_url(url):
|
|
||||||
# last_dollar_index = url.rfind('$') # 安全起见找最后一个$处理
|
|
||||||
# if last_dollar_index != -1:
|
|
||||||
# return url[:last_dollar_index]
|
|
||||||
# return url
|
|
||||||
def clean_url(lines):
|
def clean_url(lines):
|
||||||
urls =[]
|
urls =[]
|
||||||
newlines=[]
|
newlines=[]
|
||||||
@@ -297,9 +328,12 @@ def split_url(lines):
|
|||||||
# 取得 host
|
# 取得 host
|
||||||
def get_host_from_url(url: str) -> str:
|
def get_host_from_url(url: str) -> str:
|
||||||
try:
|
try:
|
||||||
|
# 解析 URL
|
||||||
parsed_url = urlparse(url)
|
parsed_url = urlparse(url)
|
||||||
|
# 返回 URL 的 netloc(网络位置,通常是主机名和端口号)
|
||||||
return parsed_url.netloc
|
return parsed_url.netloc
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# 如果发生错误,返回错误信息
|
||||||
return f"Error: {str(e)}"
|
return f"Error: {str(e)}"
|
||||||
|
|
||||||
# 使用字典来统计 blackhost 的记录次数
|
# 使用字典来统计 blackhost 的记录次数
|
||||||
@@ -312,8 +346,9 @@ def record_host(host):
|
|||||||
else:
|
else:
|
||||||
blacklist_dict[host] = 1
|
blacklist_dict[host] = 1
|
||||||
# 将结果保存为 txt 文件
|
# 将结果保存为 txt 文件
|
||||||
def save_blackhost_to_txt(filename=f"{datetime.now().strftime("%Y%m%d_%H_%M_%S")}_blackhost_count.txt"):
|
def save_blackhost_to_txt(filename=f"{datetime.now().strftime('%Y%m%d_%H_%M_%S')}_blackhost_count.txt"):
|
||||||
with open(filename, "w") as file:
|
with open(filename, "w") as file:
|
||||||
|
# 遍历字典,将 host 和对应的计数写入文件
|
||||||
for host, count in blacklist_dict.items():
|
for host, count in blacklist_dict.items():
|
||||||
file.write(f"{host}: {count}\n")
|
file.write(f"{host}: {count}\n")
|
||||||
print(f"结果已保存到 {filename}")
|
print(f"结果已保存到 {filename}")
|
||||||
@@ -329,7 +364,8 @@ if __name__ == "__main__":
|
|||||||
]
|
]
|
||||||
for url in urls:
|
for url in urls:
|
||||||
print(f"处理URL: {url}")
|
print(f"处理URL: {url}")
|
||||||
process_url(url) #读取上面url清单中直播源存入urls_all_lines
|
# 读取上面 url 清单中直播源存入 urls_all_lines
|
||||||
|
process_url(url)
|
||||||
|
|
||||||
# 获取当前脚本所在的目录
|
# 获取当前脚本所在的目录
|
||||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
|||||||
Reference in New Issue
Block a user