diff --git a/py/blacklist.py b/py/blacklist.py index dee7846..83c9c1f 100644 --- a/py/blacklist.py +++ b/py/blacklist.py @@ -6,9 +6,7 @@ import os from urllib.parse import urlparse import socket #check p3p源 rtp源 import subprocess #check rtmp源 - timestart = datetime.now() - # 读取文件内容 # 定义一个函数,用于读取文本文件并过滤出符合条件的行 def read_txt_file(file_path): @@ -16,7 +14,6 @@ def read_txt_file(file_path): skip_strings = ['#genre#'] # 定义一个列表,包含每行必须包含的字符串,当前只要求包含 '://' 即URL协议标识 required_strings = ['://'] - # 使用with语句打开文件,确保最后文件会被正确关闭 with open(file_path, 'r', encoding='utf-8') as file: # 使用列表推导式读取文件,并对每一行进行过滤 @@ -28,7 +25,6 @@ def read_txt_file(file_path): ] # 返回过滤后的行列表 return lines - # 定义一个函数,用于检测一个URL是否可访问,并记录访问的响应时间 def check_url(url, timeout=6): # 记录开始检查的时间 @@ -60,7 +56,6 @@ def check_url(url, timeout=6): success = check_rtmp_url(url, timeout) elif url.startswith("rtp"): success = check_rtp_url(url, timeout) - # 如果没有异常发生,计算从开始到当前的经过时间,并转换为毫秒 elapsed_time = (time.time() - start_time) * 1000 except Exception as e: @@ -68,11 +63,9 @@ def check_url(url, timeout=6): print(f"Error checking {url}: {e}") # 并将elapsed_time设置为None elapsed_time = None - # 返回经过时间和访问成功标志 return elapsed_time, success - # 定义一个函数,用于检查RTMP URL是否可访问 def check_rtmp_url(url, timeout): try: @@ -95,7 +88,6 @@ def check_rtmp_url(url, timeout): print(f"Error checking {url}: {e}") # 如果发生异常或ffprobe命令执行失败,则返回False return False - # 定义一个函数,用于检查RTP URL是否可访问 def check_rtp_url(url, timeout): try: @@ -103,7 +95,6 @@ def check_rtp_url(url, timeout): parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port - # 创建一个UDP socket连接,用于RTP协议通信 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.settimeout(timeout) # 设置socket超时时间 @@ -118,7 +109,6 @@ def check_rtp_url(url, timeout): except (socket.timeout, socket.error): # 如果发生超时或socket错误,则认为RTP服务不可访问 return False - # 定义一个函数,用于检查P3P URL是否可访问 def check_p3p_url(url, timeout): try: @@ -131,7 +121,6 @@ def check_p3p_url(url, timeout): # 检查解析结果是否有效,如果主机名、端口号或路径为空,则抛出异常 if not host or not port or not path: raise ValueError("Invalid p3p URL") - # 创建TCP连接 with socket.create_connection((host, port), timeout=timeout) as s: # 构建一个简单的HTTP请求 @@ -151,8 +140,6 @@ def check_p3p_url(url, timeout): return False - - # 定义一个函数,用于处理单行文本并检测其中的URL是否有效 def process_line(line): # 如果行中包含 "#genre#" 或者不包含 "://" 则跳过该行 @@ -173,12 +160,10 @@ def process_line(line): return None, line.strip() # 如果行格式不正确,返回None return None, None - # 定义一个函数,使用多线程处理文本并检测每个URL def process_urls_multithreaded(lines, max_workers=28): blacklist = [] # 用于存储不可访问的URL列表 successlist = [] # 用于存储可访问的URL列表 - # 创建一个ThreadPoolExecutor,它是concurrent.futures模块中用于多线程的接口 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 使用字典推导式为每一行创建一个future,并启动process_line函数 @@ -197,7 +182,6 @@ def process_urls_multithreaded(lines, max_workers=28): blacklist.append(result) # 返回可访问和不可访问的URL列表 return successlist, blacklist - # 定义一个函数,用于将数据列表写入到文件 def write_list(file_path, data_list): # 使用with语句打开文件,确保最后文件会被正确关闭 @@ -207,13 +191,10 @@ def write_list(file_path, data_list): # 写入每一行数据到文件,每个item后面添加换行符 file.write(item + '\n') - - # 增加外部url到检测清单,同时支持检测m3u格式url # urls里所有的源都读到这里。 urls_all_lines = [] - def get_url_file_extension(url): # 解析URL parsed_url = urlparse(url) @@ -222,7 +203,6 @@ def get_url_file_extension(url): # 提取文件扩展名 extension = os.path.splitext(path)[1] return extension - def convert_m3u_to_txt(m3u_content): # 分行处理 lines = m3u_content.split('\n') @@ -248,7 +228,6 @@ def convert_m3u_to_txt(m3u_content): # 将结果合并成一个字符串,以换行符分隔 return '\n'.join(txt_lines) - def process_url(url): try: # 打开URL并读取内容 @@ -270,7 +249,6 @@ def process_url(url): except Exception as e: print(f"处理URL时发生错误:{e}") - # 去重复源 2024-08-06 (检测前剔除重复url,提高检测效率) def remove_duplicates_url(lines): urls =[] @@ -283,7 +261,6 @@ def remove_duplicates_url(lines): urls.append(channel_url) newlines.append(line) return newlines - # 处理带$的URL,把$之后的内容都去掉(包括$也去掉) 【2024-08-08 22:29:11】 #def clean_url(url): # last_dollar_index = url.rfind('$') # 安全起见找最后一个$处理 @@ -300,7 +277,6 @@ def clean_url(lines): line=line[:last_dollar_index] newlines.append(line) return newlines - # 处理带#的URL 【2024-08-09 23:53:26】 def split_url(lines): newlines=[] @@ -318,7 +294,6 @@ def split_url(lines): newline=f'{channel_name},{url}' newlines.append(line) return newlines - # 判断是否是直接运行此脚本 if __name__ == "__main__": # 定义一个URL列表,这些URL将被用来获取直播源数据 @@ -334,12 +309,10 @@ if __name__ == "__main__": for url in urls: print(f"处理URL: {url}") process_url(url) # 调用process_url函数读取直播源并存储到urls_all_lines列表 - # 获取当前脚本文件所在的目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前脚本文件所在目录的上一级目录 parent_dir = os.path.dirname(current_dir) - # 定义输入文件路径,它们包含之前处理得到的数据 input_file1 = os.path.join(parent_dir, 'merged_output.txt') input_file2 = os.path.join(current_dir, 'blacklist_auto.txt') @@ -349,31 +322,25 @@ if __name__ == "__main__": success_file_tv = os.path.join(current_dir, 'whitelist_auto_tv.txt') # 定义黑名单文件路径,存储无效直播源 blacklist_file = os.path.join(current_dir, 'blacklist_auto.txt') - # 读取输入文件内容并存储到lines1和lines2 lines1 = read_txt_file(input_file1) lines2 = read_txt_file(input_file2) # 将从URL获取的直播源、input_file1和input_file2中的行合并到lines lines = urls_all_lines + lines1 + lines2 - # 计算合并后的直播源总数 urls_hj_before = len(lines) - # 分级处理带#号的直播源地址,这可能意味着处理不同的直播源质量或选项 lines=split_url(lines) # 计算处理后直播源的数量 urls_hj_before2 = len(lines) - # 清除直播源URL中的$符号,这可能是为了规范化URL lines=clean_url(lines) # 计算清除$符号后直播源的数量 urls_hj_before3 = len(lines) - # 去除重复的直播源URL lines=remove_duplicates_url(lines) # 计算去重后的直播源数量 urls_hj = len(lines) - # 使用多线程处理直播源并生成有效和无效的直播源列表 successlist, blacklist = process_urls_multithreaded(lines) @@ -386,11 +353,9 @@ if __name__ == "__main__": successlist=sorted(successlist, key=successlist_sort_key) # 对黑名单进行排序 blacklist=sorted(blacklist) - # 计算有效和无效直播源的数量 urls_ok = len(successlist) urls_ng = len(blacklist) - # 整理successlist,生成可以直接引用的直播源列表 def remove_prefix_from_lines(lines): result = [] @@ -399,24 +364,20 @@ if __name__ == "__main__": parts = line.split(",") result.append(",".join(parts[1:])) return result - # 添加时间戳和其他信息到成功和黑名单 version = datetime.now().strftime("%Y%m%d-%H-%M-%S") + ",url" successlist_tv = ["更新时间,#genre#"] + [version] + ['\n'] + ["whitelist,#genre#"] + remove_prefix_from_lines(successlist) successlist = ["更新时间,#genre#"] + [version] + ['\n'] + ["RespoTime,whitelist,#genre#"] + successlist blacklist = ["更新时间,#genre#"] + [version] + ['\n'] + ["blacklist,#genre#"] + blacklist - # 将整理后的直播源列表写入文件 write_list(success_file, successlist) write_list(success_file_tv, successlist_tv) # 将黑名单写入文件 write_list(blacklist_file, blacklist) - # 打印成功清单和黑名单文件的生成信息 print(f"成功清单文件已生成: {success_file}") print(f"成功清单文件已生成(tv): {success_file_tv}") print(f"黑名单文件已生成: {blacklist_file}") - # 写入历史记录文件 timenow = datetime.now().strftime("%Y%m%d_%H_%M_%S") history_success_file = f'history/blacklist/{timenow}_whitelist_auto.txt' @@ -426,23 +387,18 @@ if __name__ == "__main__": # 打印历史记录文件的生成信息 print(f"history成功清单文件已生成: {history_success_file}") print(f"history黑名单文件已生成: {history_blacklist_file}") - # 计算脚本执行结束时间 timeend = datetime.now() - # 计算脚本执行所用的总时间 elapsed_time = timeend - timestart total_seconds = elapsed_time.total_seconds() - # 将总时间转换为分钟和秒 minutes = int(total_seconds // 60) seconds = int(total_seconds % 60) - # 格式化脚本开始和结束的时间字符串 timestart_str = timestart.strftime("%Y%m%d_%H_%M_%S") timeend_str = timeend.strftime("%Y%m%d_%H_%M_%S") - # 打印脚本开始、结束和执行时间信息 print(f"开始时间: {timestart_str}") print(f"结束时间: {timeend_str}")