Update 网络收集.py

This commit is contained in:
frxz751113
2024-09-15 17:28:39 +08:00
committed by GitHub
parent 9c2587327b
commit 44f98784d7
+59 -54
View File
@@ -409,64 +409,69 @@ parse_file('2.txt', '2.txt')
import cv2 import cv2
import time import time
from tqdm import tqdm from tqdm import tqdm
# 初始化2字典 import os
detected_ips = {}
# 存储文件路径 # 存储文件路径
file_path = "2.txt" file_path = "2.txt"
output_file_path = "网络收集.txt" output_file_path = "网络收集.txt"
def get_ip_key(url):
"""从URL中提取IP地址,并构造一个唯一的键"""
# 找到'//'到第一个'/'之间的字符串
start = url.find('://') + 3 # '://'.length 是 3
end = url.find('/', start) # 找到第一个'/'的位置
return url[start:end] if end != -1 else None
# 打开输入文件和输出文件
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 获取总行数用于进度条
total_lines = len(lines)
# 写入通过检测的行到新文件
with open(output_file_path, 'a', encoding='utf-8') as output_file:
# 使用tqdm显示进度条
for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'):
# 检查是否包含 'genre'
if 'genre' in line:
output_file.write(line)
continue
# 分割频道名称和URL,并去除空白字符
parts = line.split(',', 1)
if len(parts) == 2:
channel_name, url = parts
channel_name = channel_name.strip()
url = url.strip()
# 构造IP键
ip_key = get_ip_key(url)
# 检查IP键是否存在
if ip_key and ip_key in detected_ips:
# 如果IP键已存在,根据之前的结果决定是否写入新文件
if detected_ips[ip_key]['status'] == 'ok':
output_file.write(line)
elif ip_key: # 新IP键,进行检测
# 进行检测
cap = cv2.VideoCapture(url) #支持http(s)/rts(m)p协议
start_time = time.time()
frame_count = 0
# 尝试捕获4秒内的帧
while frame_count < 50 and (time.time() - start_time) < 3:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# 释放资源
cap.release()
# 根据捕获的帧数判断状态并记录结果 def get_ip_key(url):
if frame_count >= 50: # 6秒内超过25帧则写入 """从 URL 中提取 IP 地址,并构造一个唯一的键"""
detected_ips[ip_key] = {'status': 'ok'} start = url.find('://') + 3
output_file.write(line) # 写入检测通过的行 end = url.find('/', start)
else: return url[start:end] if end!= -1 else None
detected_ips[ip_key] = {'status': 'fail'}
# 打印结果 def merge_and_filter():
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
total_lines = len(lines)
# 处理输入文件中的数据并进行检测
with open(output_file_path, 'a', encoding='utf-8') as output_file:
for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'):
if 'genre' in line:
output_file.write(line)
continue
parts = line.split(',', 1)
if len(parts) == 2:
channel_name, url = parts
channel_name = channel_name.strip()
url = url.strip()
ip_key = get_ip_key(url)
if ip_key and ip_key in detected_ips:
if detected_ips[ip_key]['status'] == 'ok':
output_file.write(line)
elif ip_key:
cap = cv2.VideoCapture(url)
start_time = time.time()
frame_count = 0
while frame_count < 50 and (time.time() - start_time) < 3:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
cap.release()
if frame_count >= 50:
detected_ips[ip_key] = {'status': 'ok'}
output_file.write(line)
else:
detected_ips[ip_key] = {'status': 'fail'}
# 合并任意字符加上网络收集.txt 的文件
all_files = [f for f in os.listdir(os.getcwd()) if f.endswith('网络收集.txt')]
with open(output_file_path, 'a', encoding='utf-8') as main_output:
for file_name in all_files:
if file_name!= output_file_path:
with open(file_name, 'r', encoding='utf-8') as other_file:
content = other_file.read()
if content:
main_output.write('\n')
main_output.write(content)
detected_ips = {}
merge_and_filter()
for ip_key, result in detected_ips.items(): for ip_key, result in detected_ips.items():
print(f"IP Key: {ip_key}, Status: {result['status']}") print(f"IP Key: {ip_key}, Status: {result['status']}")