Update 收集.py

This commit is contained in:
frxz751113
2024-08-27 01:44:34 +08:00
committed by GitHub
parent b85706c2d6
commit 496384f5db
+18 -36
View File
@@ -98,7 +98,7 @@ def read_and_process_file(input_filename, output_filename, encodings=['utf-8', '
# 调用函数 # 调用函数
read_and_process_file('汇总.txt', '汇总.txt') read_and_process_file('汇总.txt', '汇总.txt')
######################################################################################################## ###################################################################去重#####################################
def remove_duplicates(input_file, output_file): def remove_duplicates(input_file, output_file):
# 用于存储已经遇到的URL和包含genre的行 # 用于存储已经遇到的URL和包含genre的行
seen_urls = set() seen_urls = set()
@@ -128,7 +128,7 @@ def remove_duplicates(input_file, output_file):
# 使用方法 # 使用方法
remove_duplicates('汇总.txt', '汇总.txt') remove_duplicates('汇总.txt', '汇总.txt')
######################################################################################################## ###############################################################################替换#########################
# 导入fileinput模块 # 导入fileinput模块
import fileinput import fileinput
# 定义替换规则的字典 # 定义替换规则的字典
@@ -264,17 +264,17 @@ print("替换完成,新文件已保存。")
######################################################################################################## ######################################################################################提取
import re import re
import os import os
# 定义一个包含所有要排除的关键词的列表 # 定义一个包含所有要排除的关键词的列表
excluded_keywords = [ excluded_keywords = [
'epg', 'mitv', 'udp', 'rtp', 'P2p', 'p2p', 'p3p', 'P2P', 'P3p', '/hls/', '/tsfile/', 'P3P', '', '', '/zy.', '/xgj.', '春节' 'epg', 'mitv', 'udp', 'rtp', 'P2p', 'p2p', 'p3p', 'P2P', '[', 'P3p', '/hls/', '/tsfile/', 'P3P', '', '', '/zy.', '/xgj.', '春节'
] ]
# 定义一个包含所有要提取的关键词的列表 # 定义一个包含所有要提取的关键词的列表
extract_keywords = [ extract_keywords = [
'CCTV', '卫视', '动作电影', '风云剧场', '怀旧剧场', '影迷', '高清电影', '动作电影', '影院', '全球大片', '剧场', '家庭影院', '电影', '星光', '华语', '美国大片', '峨眉', '凤凰', '星空', '人间', '亚洲', '环球' 'CCTV', '卫视', '动作电影', '风云剧场', '怀旧剧场', '影迷', '高清电影', '动作电影', '影院', '全球大片', '剧场', 'TVB', '家庭影院', '电影', '星光', '华语', '美国大片', '峨眉', '凤凰', '星空', '人间', '亚洲', '环球'
# 在这里添加需要提取的关键词 # 在这里添加需要提取的关键词
] ]
@@ -291,15 +291,14 @@ with open('2.txt', 'r', encoding='utf-8') as file:
if not any(keyword in line for keyword in excluded_keywords): if not any(keyword in line for keyword in excluded_keywords):
outfile.write(line) # 写入符合条件的行到文件 outfile.write(line) # 写入符合条件的行到文件
import re
###############################################################
import re
def parse_file(input_file_path, output_file_name): def parse_file(input_file_path, output_file_name):
# 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分 # 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分
ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)') ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)')
# 用于存储每个IP或域名及其对应的行列表 # 用于存储每个IP或域名及其对应的行列表
ip_or_domain_to_lines = {} ip_or_domain_to_lines = {}
# 读取原始文件内容 # 读取原始文件内容
with open(input_file_path, 'r', encoding='utf-8') as file: with open(input_file_path, 'r', encoding='utf-8') as file:
for line in file: for line in file:
@@ -318,16 +317,13 @@ def parse_file(input_file_path, output_file_name):
if ip_or_domain not in ip_or_domain_to_lines: if ip_or_domain not in ip_or_domain_to_lines:
ip_or_domain_to_lines[ip_or_domain] = [] ip_or_domain_to_lines[ip_or_domain] = []
ip_or_domain_to_lines[ip_or_domain].append(line) ip_or_domain_to_lines[ip_or_domain].append(line)
############################################################################### 过滤掉小于1500字节的IP或域名段
# 过滤掉小于5000字节的IP或域名段
filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items() filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items()
if sum(len(line) for line in lines) >= 500} if sum(len(line) for line in lines) >= 1500}
# 如果没有满足条件的IP或域名段,则不生成文件 # 如果没有满足条件的IP或域名段,则不生成文件
if not filtered_ip_or_domain_to_lines: if not filtered_ip_or_domain_to_lines:
print("没有满足条件的IP或域名段,不生成文件。") print("没有满足条件的IP或域名段,不生成文件。")
return return
# 合并所有满足条件的IP或域名的行到一个文件 # 合并所有满足条件的IP或域名的行到一个文件
with open(output_file_name, 'w', encoding='utf-8') as output_file: with open(output_file_name, 'w', encoding='utf-8') as output_file:
for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items(): for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items():
@@ -336,13 +332,12 @@ def parse_file(input_file_path, output_file_name):
for line in lines: for line in lines:
output_file.write(line + '\n') output_file.write(line + '\n')
output_file.write('\n') # 在每个小段后添加一个空行作为分隔 output_file.write('\n') # 在每个小段后添加一个空行作为分隔
# 调用函数并传入文件路径和输出文件名 # 调用函数并传入文件路径和输出文件名
parse_file('2.txt', '2.txt') parse_file('2.txt', '2.txt')
############################################################################检测同IP第一个链接,缩小验源数量
import cv2 import cv2
import time import time
from tqdm import tqdm from tqdm import tqdm
@@ -351,14 +346,12 @@ detected_ips = {}
# 存储文件路径 # 存储文件路径
file_path = "2.txt" file_path = "2.txt"
output_file_path = "网络收集.txt" output_file_path = "网络收集.txt"
def get_ip_key(url): def get_ip_key(url):
"""从URL中提取IP地址,并构造一个唯一的键""" """从URL中提取IP地址,并构造一个唯一的键"""
# 找到'//'到第一个'/'之间的字符串 # 找到'//'到第一个'/'之间的字符串
start = url.find('://') + 3 # '://'.length 是 3 start = url.find('://') + 3 # '://'.length 是 3
end = url.find('/', start) # 找到第一个'/'的位置 end = url.find('/', start) # 找到第一个'/'的位置
return url[start:end] if end != -1 else None return url[start:end] if end != -1 else None
# 打开输入文件和输出文件 # 打开输入文件和输出文件
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines() lines = file.readlines()
@@ -390,7 +383,7 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file:
start_time = time.time() start_time = time.time()
frame_count = 0 frame_count = 0
# 尝试捕获10秒内的帧 # 尝试捕获10秒内的帧
while frame_count < 30 and (time.time() - start_time) < 5: while frame_count < 10 and (time.time() - start_time) < 3:
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
break break
@@ -398,7 +391,7 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file:
# 释放资源 # 释放资源
cap.release() cap.release()
# 根据捕获的帧数判断状态并记录结果 # 根据捕获的帧数判断状态并记录结果
if frame_count >= 30: #10秒内超过230帧则写入 if frame_count >= 10: #10秒内超过230帧则写入
detected_ips[ip_key] = {'status': 'ok'} detected_ips[ip_key] = {'status': 'ok'}
output_file.write(line) # 写入检测通过的行 output_file.write(line) # 写入检测通过的行
else: else:
@@ -408,6 +401,9 @@ for ip_key, result in detected_ips.items():
print(f"IP Key: {ip_key}, Status: {result['status']}") print(f"IP Key: {ip_key}, Status: {result['status']}")
############################################################################全部检测,防止IP段失效
import requests import requests
import time import time
import cv2 import cv2
@@ -415,7 +411,7 @@ from urllib.parse import urlparse
from tqdm import tqdm from tqdm import tqdm
# 测试HTTP连接并尝试下载数据 # 测试HTTP连接并尝试下载数据
def test_connectivity_and_download(url, initial_timeout=1.1, retry_timeout=1): def test_connectivity_and_download(url, initial_timeout=1, retry_timeout=2):
parsed_url = urlparse(url) parsed_url = urlparse(url)
if parsed_url.scheme not in ['http', 'https']: if parsed_url.scheme not in ['http', 'https']:
# 非HTTP(s)协议,尝试RTSP检测 # 非HTTP(s)协议,尝试RTSP检测
@@ -427,17 +423,15 @@ def test_connectivity_and_download(url, initial_timeout=1.1, retry_timeout=1):
if 200 <= response.status_code <= 403: if 200 <= response.status_code <= 403:
start_time = time.time() start_time = time.time()
while time.time() - start_time < initial_timeout: while time.time() - start_time < initial_timeout:
chunk = response.raw.read(128) # 尝试下载1KB数据 chunk = response.raw.read(1024) # 尝试下载1KB数据
if chunk: if chunk:
return True # 成功下载数据 return True # 成功下载数据
except requests.RequestException as e: except requests.RequestException as e:
print(f"请求异常: {e}") print(f"请求异常: {e}")
pass #这行删掉则会在下载不到数据流的时候进行连通性测试 pass #这行删掉则会在下载不到数据流的时候进行连通性测试
return False # 默认返回False return False # 默认返回False
print("/" * 80) print("/" * 80)
# 测试RTSP连接并尝试读取流 # 测试RTSP连接并尝试读取流
def test_rtsp_connectivity(url, timeout=3): def test_rtsp_connectivity(url, timeout=3):
cap = cv2.VideoCapture(url) cap = cv2.VideoCapture(url)
@@ -455,7 +449,6 @@ def test_rtsp_connectivity(url, timeout=3):
def main(输入, 输出): def main(输入, 输出):
with open(输入, "r", encoding="utf-8") as source_file: with open(输入, "r", encoding="utf-8") as source_file:
lines = source_file.readlines() lines = source_file.readlines()
results = [] results = []
for line_number, line in enumerate(tqdm(lines, desc="检测中")): for line_number, line in enumerate(tqdm(lines, desc="检测中")):
parts = line.strip().split(",") parts = line.strip().split(",")
@@ -466,19 +459,15 @@ def main(输入, 输出):
except Exception as e: except Exception as e:
print(f"检测URL {channel_url} 时发生错误: {e}") print(f"检测URL {channel_url} 时发生错误: {e}")
is_valid = False # 将异常的URL视为无效 is_valid = False # 将异常的URL视为无效
status = "有效" if is_valid else "无效" status = "有效" if is_valid else "无效"
if "genre" in line.lower() or status == "有效": if "genre" in line.lower() or status == "有效":
results.append((channel_name.strip(), channel_url.strip(), status)) results.append((channel_name.strip(), channel_url.strip(), status))
# 写入文件 # 写入文件
with open(输出, "w", encoding="utf-8") as output_file: with open(输出, "w", encoding="utf-8") as output_file:
for channel_name, channel_url, status in results: for channel_name, channel_url, status in results:
output_file.write(f"{channel_name},{channel_url}\n") output_file.write(f"{channel_name},{channel_url}\n")
print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}") print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}")
if __name__ == "__main__": if __name__ == "__main__":
输入 = "网络收集.txt" #input('请输入utf-8编码的直播源文件路径:') 输入 = "网络收集.txt" #input('请输入utf-8编码的直播源文件路径:')
输出 = "网络收集.txt" 输出 = "网络收集.txt"
@@ -539,13 +528,11 @@ def parse_file(input_file_path, output_file_name):
ip_or_domain_to_lines[ip_or_domain].append(line) ip_or_domain_to_lines[ip_or_domain].append(line)
# 过滤掉小于1000字节的IP或域名段 # 过滤掉小于1000字节的IP或域名段
filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items() filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items()
if sum(len(line) for line in lines) >= 500} if sum(len(line) for line in lines) >= 800} # 过滤掉小于1000字节的IP或域名段
# 如果没有满足条件的IP或域名段,则不生成文件 # 如果没有满足条件的IP或域名段,则不生成文件
if not filtered_ip_or_domain_to_lines: if not filtered_ip_or_domain_to_lines:
print("没有满足条件的IP或域名段,不生成文件。") print("没有满足条件的IP或域名段,不生成文件。")
return return
# 合并所有满足条件的IP或域名的行到一个文件 # 合并所有满足条件的IP或域名的行到一个文件
with open(output_file_name, 'w', encoding='utf-8') as output_file: with open(output_file_name, 'w', encoding='utf-8') as output_file:
for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items(): for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items():
@@ -553,7 +540,6 @@ def parse_file(input_file_path, output_file_name):
if alphabet_counter >= 26: if alphabet_counter >= 26:
number_counter += 1 number_counter += 1
alphabet_counter = 0 # 重置字母计数器 alphabet_counter = 0 # 重置字母计数器
# 生成分类名 # 生成分类名
genre_name = chr(65 + alphabet_counter) + str(number_counter) genre_name = chr(65 + alphabet_counter) + str(number_counter)
output_file.write(f"{genre_name},#genre#\n") output_file.write(f"{genre_name},#genre#\n")
@@ -561,18 +547,14 @@ def parse_file(input_file_path, output_file_name):
output_file.write(line + '\n') output_file.write(line + '\n')
output_file.write('\n') # 在每个小段后添加一个空行作为分隔 output_file.write('\n') # 在每个小段后添加一个空行作为分隔
alphabet_counter += 1 # 递增字母计数器 alphabet_counter += 1 # 递增字母计数器
# 调用函数并传入文件路径和输出文件名 # 调用函数并传入文件路径和输出文件名
parse_file('网络收集.txt', '网络收集.txt') parse_file('网络收集.txt', '网络收集.txt')
################################################################################################任务结束,删除不必要的过程文件 ################################################################################################任务结束,删除不必要的过程文件
files_to_remove = ["2.txt", "汇总.txt"] files_to_remove = ["2.txt", "汇总.txt"]
for file in files_to_remove: for file in files_to_remove:
if os.path.exists(file): if os.path.exists(file):
os.remove(file) os.remove(file)
else: # 如果文件不存在,则提示异常并打印提示信息 else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file} 不存在,跳过删除。") print(f"文件 {file} 不存在,跳过删除。")
print("任务运行完毕,频道列表可查看文件夹内源.txt文件!") print("任务运行完毕,频道列表可查看文件夹内源.txt文件!")