diff --git a/py/收集.py b/py/收集.py index 5049f06..e1c6d4b 100644 --- a/py/收集.py +++ b/py/收集.py @@ -98,7 +98,7 @@ def read_and_process_file(input_filename, output_filename, encodings=['utf-8', ' # 调用函数 read_and_process_file('汇总.txt', '汇总.txt') -######################################################################################################## +###################################################################去重##################################### def remove_duplicates(input_file, output_file): # 用于存储已经遇到的URL和包含genre的行 seen_urls = set() @@ -128,7 +128,7 @@ def remove_duplicates(input_file, output_file): # 使用方法 remove_duplicates('汇总.txt', '汇总.txt') -######################################################################################################## +###############################################################################替换######################### # 导入fileinput模块 import fileinput # 定义替换规则的字典 @@ -264,17 +264,17 @@ print("替换完成,新文件已保存。") -######################################################################################################## +######################################################################################提取 import re import os # 定义一个包含所有要排除的关键词的列表 excluded_keywords = [ - 'epg', 'mitv', 'udp', 'rtp', 'P2p', 'p2p', 'p3p', 'P2P', 'P3p', '/hls/', '/tsfile/', 'P3P', '腔', '曲', '/zy.', '/xgj.', '春节' + 'epg', 'mitv', 'udp', 'rtp', 'P2p', 'p2p', 'p3p', 'P2P', '[', 'P3p', '/hls/', '/tsfile/', 'P3P', '腔', '曲', '/zy.', '/xgj.', '春节' ] # 定义一个包含所有要提取的关键词的列表 extract_keywords = [ - 'CCTV', '卫视', '动作电影', '风云剧场', '怀旧剧场', '影迷', '高清电影', '动作电影', '影院', '全球大片', '剧场', '家庭影院', '电影', '星光', '华语', '美国大片', '峨眉', '凤凰', '星空', '人间', '亚洲', '环球' + 'CCTV', '卫视', '动作电影', '风云剧场', '怀旧剧场', '影迷', '高清电影', '动作电影', '影院', '全球大片', '剧场', 'TVB', '家庭影院', '电影', '星光', '华语', '美国大片', '峨眉', '凤凰', '星空', '人间', '亚洲', '环球' # 在这里添加需要提取的关键词 ] @@ -291,15 +291,14 @@ with open('2.txt', 'r', encoding='utf-8') as file: if not any(keyword in line for keyword in excluded_keywords): outfile.write(line) # 写入符合条件的行到文件 -import re +############################################################### +import re def parse_file(input_file_path, output_file_name): # 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分 ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)') - # 用于存储每个IP或域名及其对应的行列表 ip_or_domain_to_lines = {} - # 读取原始文件内容 with open(input_file_path, 'r', encoding='utf-8') as file: for line in file: @@ -318,16 +317,13 @@ def parse_file(input_file_path, output_file_name): if ip_or_domain not in ip_or_domain_to_lines: ip_or_domain_to_lines[ip_or_domain] = [] ip_or_domain_to_lines[ip_or_domain].append(line) - - # 过滤掉小于5000字节的IP或域名段 + ############################################################################### 过滤掉小于1500字节的IP或域名段 filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items() - if sum(len(line) for line in lines) >= 500} - + if sum(len(line) for line in lines) >= 1500} # 如果没有满足条件的IP或域名段,则不生成文件 if not filtered_ip_or_domain_to_lines: print("没有满足条件的IP或域名段,不生成文件。") return - # 合并所有满足条件的IP或域名的行到一个文件 with open(output_file_name, 'w', encoding='utf-8') as output_file: for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items(): @@ -336,13 +332,12 @@ def parse_file(input_file_path, output_file_name): for line in lines: output_file.write(line + '\n') output_file.write('\n') # 在每个小段后添加一个空行作为分隔 - # 调用函数并传入文件路径和输出文件名 parse_file('2.txt', '2.txt') - +############################################################################检测同IP第一个链接,缩小验源数量 import cv2 import time from tqdm import tqdm @@ -351,14 +346,12 @@ detected_ips = {} # 存储文件路径 file_path = "2.txt" output_file_path = "网络收集.txt" - def get_ip_key(url): """从URL中提取IP地址,并构造一个唯一的键""" # 找到'//'到第一个'/'之间的字符串 start = url.find('://') + 3 # '://'.length 是 3 end = url.find('/', start) # 找到第一个'/'的位置 return url[start:end] if end != -1 else None - # 打开输入文件和输出文件 with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() @@ -390,7 +383,7 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file: start_time = time.time() frame_count = 0 # 尝试捕获10秒内的帧 - while frame_count < 30 and (time.time() - start_time) < 5: + while frame_count < 10 and (time.time() - start_time) < 3: ret, frame = cap.read() if not ret: break @@ -398,7 +391,7 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file: # 释放资源 cap.release() # 根据捕获的帧数判断状态并记录结果 - if frame_count >= 30: #10秒内超过230帧则写入 + if frame_count >= 10: #10秒内超过230帧则写入 detected_ips[ip_key] = {'status': 'ok'} output_file.write(line) # 写入检测通过的行 else: @@ -408,6 +401,9 @@ for ip_key, result in detected_ips.items(): print(f"IP Key: {ip_key}, Status: {result['status']}") + + +############################################################################全部检测,防止IP段失效 import requests import time import cv2 @@ -415,7 +411,7 @@ from urllib.parse import urlparse from tqdm import tqdm # 测试HTTP连接并尝试下载数据 -def test_connectivity_and_download(url, initial_timeout=1.1, retry_timeout=1): +def test_connectivity_and_download(url, initial_timeout=1, retry_timeout=2): parsed_url = urlparse(url) if parsed_url.scheme not in ['http', 'https']: # 非HTTP(s)协议,尝试RTSP检测 @@ -427,17 +423,15 @@ def test_connectivity_and_download(url, initial_timeout=1.1, retry_timeout=1): if 200 <= response.status_code <= 403: start_time = time.time() while time.time() - start_time < initial_timeout: - chunk = response.raw.read(128) # 尝试下载1KB数据 + chunk = response.raw.read(1024) # 尝试下载1KB数据 if chunk: return True # 成功下载数据 except requests.RequestException as e: print(f"请求异常: {e}") pass #这行删掉则会在下载不到数据流的时候进行连通性测试 - return False # 默认返回False print("/" * 80) - # 测试RTSP连接并尝试读取流 def test_rtsp_connectivity(url, timeout=3): cap = cv2.VideoCapture(url) @@ -455,7 +449,6 @@ def test_rtsp_connectivity(url, timeout=3): def main(输入, 输出): with open(输入, "r", encoding="utf-8") as source_file: lines = source_file.readlines() - results = [] for line_number, line in enumerate(tqdm(lines, desc="检测中")): parts = line.strip().split(",") @@ -466,19 +459,15 @@ def main(输入, 输出): except Exception as e: print(f"检测URL {channel_url} 时发生错误: {e}") is_valid = False # 将异常的URL视为无效 - status = "有效" if is_valid else "无效" - if "genre" in line.lower() or status == "有效": results.append((channel_name.strip(), channel_url.strip(), status)) - # 写入文件 with open(输出, "w", encoding="utf-8") as output_file: for channel_name, channel_url, status in results: output_file.write(f"{channel_name},{channel_url}\n") print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}") - if __name__ == "__main__": 输入 = "网络收集.txt" #input('请输入utf-8编码的直播源文件路径:') 输出 = "网络收集.txt" @@ -539,13 +528,11 @@ def parse_file(input_file_path, output_file_name): ip_or_domain_to_lines[ip_or_domain].append(line) # 过滤掉小于1000字节的IP或域名段 filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items() - if sum(len(line) for line in lines) >= 500} - + if sum(len(line) for line in lines) >= 800} # 过滤掉小于1000字节的IP或域名段 # 如果没有满足条件的IP或域名段,则不生成文件 if not filtered_ip_or_domain_to_lines: print("没有满足条件的IP或域名段,不生成文件。") return - # 合并所有满足条件的IP或域名的行到一个文件 with open(output_file_name, 'w', encoding='utf-8') as output_file: for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items(): @@ -553,7 +540,6 @@ def parse_file(input_file_path, output_file_name): if alphabet_counter >= 26: number_counter += 1 alphabet_counter = 0 # 重置字母计数器 - # 生成分类名 genre_name = chr(65 + alphabet_counter) + str(number_counter) output_file.write(f"{genre_name},#genre#\n") @@ -561,18 +547,14 @@ def parse_file(input_file_path, output_file_name): output_file.write(line + '\n') output_file.write('\n') # 在每个小段后添加一个空行作为分隔 alphabet_counter += 1 # 递增字母计数器 - # 调用函数并传入文件路径和输出文件名 parse_file('网络收集.txt', '网络收集.txt') ################################################################################################任务结束,删除不必要的过程文件 files_to_remove = ["2.txt", "汇总.txt"] - for file in files_to_remove: if os.path.exists(file): os.remove(file) else: # 如果文件不存在,则提示异常并打印提示信息 print(f"文件 {file} 不存在,跳过删除。") - - print("任务运行完毕,频道列表可查看文件夹内源.txt文件!")