Update 收集.py
This commit is contained in:
@@ -260,25 +260,120 @@ print("替换完成,新文件已保存。")
|
|||||||
########################################################################################################
|
########################################################################################################
|
||||||
import re
|
import re
|
||||||
import os
|
import os
|
||||||
def filter_lines(file_path):
|
# 定义一个包含所有要排除的关键词的列表
|
||||||
with open(input_file_path, 'r', encoding='utf-8') as file:
|
excluded_keywords = [
|
||||||
lines = file.readlines()
|
'epg', 'mitv', 'udp', 'rtp', 'tsfile', 'hls', '[', 'P2p', 'p2p', 'p3p', 'P2P', 'P3p', 'P3P', '腔', '曲', '春节'
|
||||||
filtered_lines = []
|
]
|
||||||
for line in lines:
|
|
||||||
if ',' in line:
|
# 定义一个包含所有要提取的关键词的列表
|
||||||
if 'epg' not in line and 'mitv' not in line and 'udp' not in line and 'rtp' not in line and 'tsfile' not in line and '/hls/' not in line and '[' not in line \
|
extract_keywords = [
|
||||||
and 'P2p' not in line and 'p2p' not in line and 'p3p' not in line and 'P2P' not in line and 'P3p' not in line and 'P3P' not in line and '腔' not in line and '曲' not in line and '春节' not in line:
|
'CCTV', '卫视', '动作电影', '风云剧场', '怀旧剧场', '影迷', '高清电影', '动作电影', '每日影院', '全球大片', '第一剧场', '家庭影院', '影迷电影', '星光', '华语', '美国大片', '峨眉', '凤凰', '星空', '人间', '亚洲', '环球'
|
||||||
filtered_lines.append(line)
|
# 在这里添加需要提取的关键词
|
||||||
return filtered_lines
|
]
|
||||||
def write_filtered_lines(output_file_path, filtered_lines):
|
|
||||||
with open(output_file_path, 'w', encoding='utf-8') as output_file:
|
# 读取文件并处理每一行
|
||||||
output_file.writelines(filtered_lines)
|
with open('2.txt', 'r', encoding='utf-8') as file:
|
||||||
|
lines = file.readlines()
|
||||||
|
|
||||||
|
# 创建或打开一个输出文件用于写入处理后的数据
|
||||||
|
with open('2.txt', 'w', encoding='utf-8') as outfile:
|
||||||
|
for line in lines:
|
||||||
|
# 首先检查行是否包含任何提取关键词
|
||||||
|
if any(keyword in line for keyword in extract_keywords):
|
||||||
|
# 如果包含提取关键词,进一步检查行是否不包含任何排除关键词
|
||||||
|
if not any(keyword in line for keyword in excluded_keywords):
|
||||||
|
outfile.write(line) # 写入符合条件的行到文件
|
||||||
|
|
||||||
|
#################################################### 对整理好的频道列表测试HTTP连接
|
||||||
|
def test_connectivity(url, max_attempts=2): #定义测试HTTP连接的次数
|
||||||
|
# 尝试连接指定次数
|
||||||
|
for _ in range(max_attempts):
|
||||||
|
try:
|
||||||
|
response = requests.head(url, timeout=1) # 发送HEAD请求,仅支持V4,修改此行数字可定义链接超时##////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
#response = requests.get(url, timeout=1) # 发送get请求,支持V6,修改此行数字可定义链接超时##############################//////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
return response.status_code == 200 # 返回True如果状态码为200
|
||||||
|
except requests.RequestException: # 捕获requests引发的异常
|
||||||
|
pass # 发生异常时忽略
|
||||||
|
#return False # 如果所有尝试都失败,返回False
|
||||||
|
pass
|
||||||
|
# 使用队列来收集结果的函数
|
||||||
|
def process_line(line, result_queue):
|
||||||
|
parts = line.strip().split(",") # 去除行首尾空白并按逗号分割
|
||||||
|
if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空
|
||||||
|
channel_name, channel_url = parts # 分别赋值频道名称和URL
|
||||||
|
if test_connectivity(channel_url): # 测试URL是否有效
|
||||||
|
result_queue.put((channel_name, channel_url, "有效")) # 将结果放入队列
|
||||||
|
else:
|
||||||
|
result_queue.put((channel_name, channel_url, "无效")) # 将结果放入队列
|
||||||
|
else:
|
||||||
|
# 格式不正确的行不放入队列
|
||||||
|
pass
|
||||||
|
# 主函数
|
||||||
|
def main(source_file_path, output_file_path):
|
||||||
|
with open(source_file_path, "r", encoding="utf-8") as source_file: # 打开源文件
|
||||||
|
lines = source_file.readlines() # 读取所有行s
|
||||||
|
result_queue = queue.Queue() # 创建队列
|
||||||
|
threads = [] # 初始化线程列表
|
||||||
|
for line in tqdm(lines, desc="检测进行中"): # 显示进度条
|
||||||
|
thread = threading.Thread(target=process_line, args=(line, result_queue)) # 创建线程
|
||||||
|
thread.start() # 启动线程
|
||||||
|
threads.append(thread) # 将线程加入线程列表
|
||||||
|
for thread in threads: # 等待所有线程完成
|
||||||
|
thread.join()
|
||||||
|
# 初始化计数器
|
||||||
|
valid_count = 0
|
||||||
|
invalid_count = 0
|
||||||
|
with open(output_file_path, "w", encoding="utf-8") as output_file: # 打开输出文件
|
||||||
|
for _ in range(result_queue.qsize()): # 使用队列的大小来循环
|
||||||
|
item = result_queue.get() # 获取队列中的项目
|
||||||
|
# 只有在队列中存在有效的项目时才写入文件
|
||||||
|
if item[0] and item[1]: # 确保channel_name和channel_url都不为None
|
||||||
|
output_file.write(f"{item[0]},{item[1]},{item[2]}\n") # 写入文件
|
||||||
|
if item[2] == "有效": # 统计有效源数量
|
||||||
|
valid_count += 1
|
||||||
|
else: # 统计无效源数量
|
||||||
|
invalid_count += 1
|
||||||
|
print(f"任务完成, 有效源数量: {valid_count}, 无效源数量: {invalid_count}") # 打印结果
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
input_file_path = "2.txt"
|
try:
|
||||||
output_file_path = "2.txt"
|
source_file_path = "2.txt" # 输入源文件路径
|
||||||
filtered_lines = filter_lines(input_file_path)
|
output_file_path = "2.txt" # 设置输出文件路径
|
||||||
write_filtered_lines(output_file_path, filtered_lines)
|
main(source_file_path, output_file_path) # 调用main函数
|
||||||
print(f"文件已过滤完成")
|
except Exception as e:
|
||||||
|
print(f"程序发生错误: {e}") # 打印错误信息
|
||||||
|
|
||||||
|
#########################################################################提取2中的有效行
|
||||||
|
def filter_lines(file_path):
|
||||||
|
with open(file_path, 'r', encoding='utf-8') as file: # 打开文件
|
||||||
|
lines = file.readlines() # 读取所有行
|
||||||
|
filtered_lines = [] # 初始化过滤后的行列表
|
||||||
|
for line in lines: # 遍历所有行
|
||||||
|
if 'genre' in line or '有效' in line: # 如果行中包含'genre'或'有效'
|
||||||
|
filtered_lines.append(line) # 将行添加到过滤后的行列表
|
||||||
|
return filtered_lines # 返回过滤后的行列表
|
||||||
|
def write_filtered_lines(output_file_path, filtered_lines):
|
||||||
|
with open(output_file_path, 'w', encoding='utf-8') as output_file: # 打开输出文件
|
||||||
|
output_file.writelines(filtered_lines) # 写入过滤后的行
|
||||||
|
if __name__ == "__main__":
|
||||||
|
input_file_path = "2.txt" # 设置输入文件路径
|
||||||
|
output_file_path = "2.txt" # 设置输出文件路径
|
||||||
|
filtered_lines = filter_lines(input_file_path) # 调用filter_lines函数
|
||||||
|
write_filtered_lines(output_file_path, filtered_lines) # 调用write_filtered_lines函数
|
||||||
|
###################################################################################定义替换规则的字典,对整行内的内容进行替换
|
||||||
|
replacements = {
|
||||||
|
",有效": "", # 将",有效"替换为空字符串
|
||||||
|
"#genre#,无效": "#genre#", # 将"#genre#,无效"替换为"#genre#"
|
||||||
|
}
|
||||||
|
# 打开原始文件读取内容,并写入新文件
|
||||||
|
with open('2.txt', 'r', encoding='utf-8') as file:
|
||||||
|
lines = file.readlines()
|
||||||
|
# 创建新文件并写入替换后的内容
|
||||||
|
with open('2.txt', 'w', encoding='utf-8') as new_file:
|
||||||
|
for line in lines:
|
||||||
|
for old, new in replacements.items(): # 遍历替换规则字典
|
||||||
|
line = line.replace(old, new) # 替换行中的内容
|
||||||
|
new_file.write(line) # 写入新文件
|
||||||
|
print("新文件已保存。") # 打印完成信息
|
||||||
|
|
||||||
|
|
||||||
import re
|
import re
|
||||||
@@ -347,7 +442,7 @@ parse_file('2.txt', '网络收集.txt')
|
|||||||
import cv2
|
import cv2
|
||||||
import time
|
import time
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
# 初始化酒店源字典
|
# 初始化2字典
|
||||||
detected_ips = {}
|
detected_ips = {}
|
||||||
# 存储文件路径
|
# 存储文件路径
|
||||||
file_path = "网络收集.txt"
|
file_path = "网络收集.txt"
|
||||||
@@ -395,7 +490,7 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file:
|
|||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
frame_count = 0
|
frame_count = 0
|
||||||
# 尝试捕获10秒内的帧
|
# 尝试捕获10秒内的帧
|
||||||
while frame_count < 240 and (time.time() - start_time) < 10:
|
while frame_count < 30 and (time.time() - start_time) < 5:
|
||||||
ret, frame = cap.read()
|
ret, frame = cap.read()
|
||||||
if not ret:
|
if not ret:
|
||||||
break
|
break
|
||||||
@@ -403,12 +498,12 @@ with open(output_file_path, 'w', encoding='utf-8') as output_file:
|
|||||||
# 释放资源
|
# 释放资源
|
||||||
cap.release()
|
cap.release()
|
||||||
# 根据捕获的帧数判断状态并记录结果
|
# 根据捕获的帧数判断状态并记录结果
|
||||||
if frame_count >= 240: #10秒内超过230帧则写入
|
if frame_count >= 30: #10秒内超过230帧则写入
|
||||||
detected_ips[ip_key] = {'status': 'ok'}
|
detected_ips[ip_key] = {'status': 'ok'}
|
||||||
output_file.write(line) # 写入检测通过的行
|
output_file.write(line) # 写入检测通过的行
|
||||||
else:
|
else:
|
||||||
detected_ips[ip_key] = {'status': 'fail'}
|
detected_ips[ip_key] = {'status': 'fail'}
|
||||||
# 打印酒店源
|
# 打印2
|
||||||
for ip_key, result in detected_ips.items():
|
for ip_key, result in detected_ips.items():
|
||||||
print(f"IP Key: {ip_key}, Status: {result['status']}")
|
print(f"IP Key: {ip_key}, Status: {result['status']}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user