Update blacklist.py

This commit is contained in:
frxz751113
2024-08-17 09:56:41 +08:00
committed by GitHub
parent 2690eb926c
commit 202995d860
-44
View File
@@ -6,9 +6,7 @@ import os
from urllib.parse import urlparse
import socket #check p3p源 rtp源
import subprocess #check rtmp源
timestart = datetime.now()
# 读取文件内容
# 定义一个函数,用于读取文本文件并过滤出符合条件的行
def read_txt_file(file_path):
@@ -16,7 +14,6 @@ def read_txt_file(file_path):
skip_strings = ['#genre#']
# 定义一个列表,包含每行必须包含的字符串,当前只要求包含 '://' 即URL协议标识
required_strings = ['://']
# 使用with语句打开文件,确保最后文件会被正确关闭
with open(file_path, 'r', encoding='utf-8') as file:
# 使用列表推导式读取文件,并对每一行进行过滤
@@ -28,7 +25,6 @@ def read_txt_file(file_path):
]
# 返回过滤后的行列表
return lines
# 定义一个函数,用于检测一个URL是否可访问,并记录访问的响应时间
def check_url(url, timeout=6):
# 记录开始检查的时间
@@ -60,7 +56,6 @@ def check_url(url, timeout=6):
success = check_rtmp_url(url, timeout)
elif url.startswith("rtp"):
success = check_rtp_url(url, timeout)
# 如果没有异常发生,计算从开始到当前的经过时间,并转换为毫秒
elapsed_time = (time.time() - start_time) * 1000
except Exception as e:
@@ -68,11 +63,9 @@ def check_url(url, timeout=6):
print(f"Error checking {url}: {e}")
# 并将elapsed_time设置为None
elapsed_time = None
# 返回经过时间和访问成功标志
return elapsed_time, success
# 定义一个函数,用于检查RTMP URL是否可访问
def check_rtmp_url(url, timeout):
try:
@@ -95,7 +88,6 @@ def check_rtmp_url(url, timeout):
print(f"Error checking {url}: {e}")
# 如果发生异常或ffprobe命令执行失败,则返回False
return False
# 定义一个函数,用于检查RTP URL是否可访问
def check_rtp_url(url, timeout):
try:
@@ -103,7 +95,6 @@ def check_rtp_url(url, timeout):
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port
# 创建一个UDP socket连接,用于RTP协议通信
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(timeout) # 设置socket超时时间
@@ -118,7 +109,6 @@ def check_rtp_url(url, timeout):
except (socket.timeout, socket.error):
# 如果发生超时或socket错误,则认为RTP服务不可访问
return False
# 定义一个函数,用于检查P3P URL是否可访问
def check_p3p_url(url, timeout):
try:
@@ -131,7 +121,6 @@ def check_p3p_url(url, timeout):
# 检查解析结果是否有效,如果主机名、端口号或路径为空,则抛出异常
if not host or not port or not path:
raise ValueError("Invalid p3p URL")
# 创建TCP连接
with socket.create_connection((host, port), timeout=timeout) as s:
# 构建一个简单的HTTP请求
@@ -151,8 +140,6 @@ def check_p3p_url(url, timeout):
return False
# 定义一个函数,用于处理单行文本并检测其中的URL是否有效
def process_line(line):
# 如果行中包含 "#genre#" 或者不包含 "://" 则跳过该行
@@ -173,12 +160,10 @@ def process_line(line):
return None, line.strip()
# 如果行格式不正确,返回None
return None, None
# 定义一个函数,使用多线程处理文本并检测每个URL
def process_urls_multithreaded(lines, max_workers=28):
blacklist = [] # 用于存储不可访问的URL列表
successlist = [] # 用于存储可访问的URL列表
# 创建一个ThreadPoolExecutor,它是concurrent.futures模块中用于多线程的接口
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 使用字典推导式为每一行创建一个future,并启动process_line函数
@@ -197,7 +182,6 @@ def process_urls_multithreaded(lines, max_workers=28):
blacklist.append(result)
# 返回可访问和不可访问的URL列表
return successlist, blacklist
# 定义一个函数,用于将数据列表写入到文件
def write_list(file_path, data_list):
# 使用with语句打开文件,确保最后文件会被正确关闭
@@ -207,13 +191,10 @@ def write_list(file_path, data_list):
# 写入每一行数据到文件,每个item后面添加换行符
file.write(item + '\n')
# 增加外部url到检测清单,同时支持检测m3u格式url
# urls里所有的源都读到这里。
urls_all_lines = []
def get_url_file_extension(url):
# 解析URL
parsed_url = urlparse(url)
@@ -222,7 +203,6 @@ def get_url_file_extension(url):
# 提取文件扩展名
extension = os.path.splitext(path)[1]
return extension
def convert_m3u_to_txt(m3u_content):
# 分行处理
lines = m3u_content.split('\n')
@@ -248,7 +228,6 @@ def convert_m3u_to_txt(m3u_content):
# 将结果合并成一个字符串,以换行符分隔
return '\n'.join(txt_lines)
def process_url(url):
try:
# 打开URL并读取内容
@@ -270,7 +249,6 @@ def process_url(url):
except Exception as e:
print(f"处理URL时发生错误:{e}")
# 去重复源 2024-08-06 (检测前剔除重复url,提高检测效率)
def remove_duplicates_url(lines):
urls =[]
@@ -283,7 +261,6 @@ def remove_duplicates_url(lines):
urls.append(channel_url)
newlines.append(line)
return newlines
# 处理带$的URL,把$之后的内容都去掉(包括$也去掉) 【2024-08-08 22:29:11】
#def clean_url(url):
# last_dollar_index = url.rfind('$') # 安全起见找最后一个$处理
@@ -300,7 +277,6 @@ def clean_url(lines):
line=line[:last_dollar_index]
newlines.append(line)
return newlines
# 处理带#的URL 【2024-08-09 23:53:26】
def split_url(lines):
newlines=[]
@@ -318,7 +294,6 @@ def split_url(lines):
newline=f'{channel_name},{url}'
newlines.append(line)
return newlines
# 判断是否是直接运行此脚本
if __name__ == "__main__":
# 定义一个URL列表,这些URL将被用来获取直播源数据
@@ -334,12 +309,10 @@ if __name__ == "__main__":
for url in urls:
print(f"处理URL: {url}")
process_url(url) # 调用process_url函数读取直播源并存储到urls_all_lines列表
# 获取当前脚本文件所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 获取当前脚本文件所在目录的上一级目录
parent_dir = os.path.dirname(current_dir)
# 定义输入文件路径,它们包含之前处理得到的数据
input_file1 = os.path.join(parent_dir, 'merged_output.txt')
input_file2 = os.path.join(current_dir, 'blacklist_auto.txt')
@@ -349,31 +322,25 @@ if __name__ == "__main__":
success_file_tv = os.path.join(current_dir, 'whitelist_auto_tv.txt')
# 定义黑名单文件路径,存储无效直播源
blacklist_file = os.path.join(current_dir, 'blacklist_auto.txt')
# 读取输入文件内容并存储到lines1和lines2
lines1 = read_txt_file(input_file1)
lines2 = read_txt_file(input_file2)
# 将从URL获取的直播源、input_file1和input_file2中的行合并到lines
lines = urls_all_lines + lines1 + lines2
# 计算合并后的直播源总数
urls_hj_before = len(lines)
# 分级处理带#号的直播源地址,这可能意味着处理不同的直播源质量或选项
lines=split_url(lines)
# 计算处理后直播源的数量
urls_hj_before2 = len(lines)
# 清除直播源URL中的$符号,这可能是为了规范化URL
lines=clean_url(lines)
# 计算清除$符号后直播源的数量
urls_hj_before3 = len(lines)
# 去除重复的直播源URL
lines=remove_duplicates_url(lines)
# 计算去重后的直播源数量
urls_hj = len(lines)
# 使用多线程处理直播源并生成有效和无效的直播源列表
successlist, blacklist = process_urls_multithreaded(lines)
@@ -386,11 +353,9 @@ if __name__ == "__main__":
successlist=sorted(successlist, key=successlist_sort_key)
# 对黑名单进行排序
blacklist=sorted(blacklist)
# 计算有效和无效直播源的数量
urls_ok = len(successlist)
urls_ng = len(blacklist)
# 整理successlist,生成可以直接引用的直播源列表
def remove_prefix_from_lines(lines):
result = []
@@ -399,24 +364,20 @@ if __name__ == "__main__":
parts = line.split(",")
result.append(",".join(parts[1:]))
return result
# 添加时间戳和其他信息到成功和黑名单
version = datetime.now().strftime("%Y%m%d-%H-%M-%S") + ",url"
successlist_tv = ["更新时间,#genre#"] + [version] + ['\n'] + ["whitelist,#genre#"] + remove_prefix_from_lines(successlist)
successlist = ["更新时间,#genre#"] + [version] + ['\n'] + ["RespoTime,whitelist,#genre#"] + successlist
blacklist = ["更新时间,#genre#"] + [version] + ['\n'] + ["blacklist,#genre#"] + blacklist
# 将整理后的直播源列表写入文件
write_list(success_file, successlist)
write_list(success_file_tv, successlist_tv)
# 将黑名单写入文件
write_list(blacklist_file, blacklist)
# 打印成功清单和黑名单文件的生成信息
print(f"成功清单文件已生成: {success_file}")
print(f"成功清单文件已生成(tv): {success_file_tv}")
print(f"黑名单文件已生成: {blacklist_file}")
# 写入历史记录文件
timenow = datetime.now().strftime("%Y%m%d_%H_%M_%S")
history_success_file = f'history/blacklist/{timenow}_whitelist_auto.txt'
@@ -426,23 +387,18 @@ if __name__ == "__main__":
# 打印历史记录文件的生成信息
print(f"history成功清单文件已生成: {history_success_file}")
print(f"history黑名单文件已生成: {history_blacklist_file}")
# 计算脚本执行结束时间
timeend = datetime.now()
# 计算脚本执行所用的总时间
elapsed_time = timeend - timestart
total_seconds = elapsed_time.total_seconds()
# 将总时间转换为分钟和秒
minutes = int(total_seconds // 60)
seconds = int(total_seconds % 60)
# 格式化脚本开始和结束的时间字符串
timestart_str = timestart.strftime("%Y%m%d_%H_%M_%S")
timeend_str = timeend.strftime("%Y%m%d_%H_%M_%S")
# 打印脚本开始、结束和执行时间信息
print(f"开始时间: {timestart_str}")
print(f"结束时间: {timeend_str}")