Update 网络收集.py

This commit is contained in:
frxz751113
2024-09-19 12:47:34 +08:00
committed by GitHub
parent dbf647c1dc
commit 9b96695c5d
+74 -59
View File
@@ -522,74 +522,89 @@ remove_duplicates('网络收集.txt', '网络收集.txt')
############################################################################全部检测,防止IP段失效 ############################################################################全部检测,防止IP段失效
import requests import requests
import time
import cv2
from urllib.parse import urlparse
from tqdm import tqdm from tqdm import tqdm
import cv2
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import time
# 测试HTTP连接并尝试下载数据
def test_connectivity_and_download(url, initial_timeout=3, retry_timeout=5):
parsed_url = urlparse(url)
if parsed_url.scheme not in ['http', 'https']:
# 非HTTP(s)协议,尝试RTSP检测
return test_rtsp_connectivity(url, retry_timeout)
else:
# HTTP(s)协议,使用原始方法
try:
with requests.get(url, stream=True, timeout=initial_timeout) as response:
if 200 <= response.status_code <= 403:
start_time = time.time()
while time.time() - start_time < initial_timeout:
chunk = response.raw.read(256) # 尝试下载1KB数据
if chunk:
return True # 成功下载数据
except requests.RequestException as e:
print(f"请求异常: {e}")
pass #这行删掉则会在下载不到数据流的时候进行连通性测试
return False # 默认返回False
print("/" * 80) # 函数:获取视频分辨率
# 测试RTSP连接并尝试读取流 def get_video_resolution(video_path, timeout=0.8):
def test_rtsp_connectivity(url, timeout=5): cap = cv2.VideoCapture(video_path)
cap = cv2.VideoCapture(url)
if not cap.isOpened(): if not cap.isOpened():
return False return None
start_time = time.time() width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
while time.time() - start_time < timeout: height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
ret, _ = cap.read()
if ret:
return True # 成功读取帧
cap.release() cap.release()
return False return (width, height)
# 函数:处理每一行
def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines):
parts = line.strip().split(',')
if '#genre#' in line:
# 如果行包含 '#genre#',直接写入新文件
with threading.Lock():
output_file.write(line)
print(f"已写入genre行:{line.strip()}")
elif len(parts) == 2:
channel_name, channel_url = parts
resolution = get_video_resolution(channel_url, timeout=8)
if resolution and resolution[1] >= 720: # 检查分辨率是否大于等于720p
with threading.Lock():
output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n")
order_list.append((channel_name, resolution[1], channel_url))
valid_count[0] += 1
print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.")
else:
invalid_count[0] += 1
with threading.Lock():
print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%")
# 函数:多线程工作
def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines):
while True:
try:
line = task_queue.get(timeout=1)
process_line(line, output_file, order_list, valid_count, invalid_count, total_lines)
except Queue.Empty:
break
finally:
task_queue.task_done()
# 主函数 # 主函数
def main(输入, 输出): def main(source_file_path, output_file_path):
with open(输入, "r", encoding="utf-8") as source_file: order_list = []
lines = source_file.readlines() valid_count = [0]
results = [] invalid_count = [0]
for line_number, line in enumerate(tqdm(lines, desc="检测中")): task_queue = Queue()
parts = line.strip().split(",")
if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空 # 读取源文件
channel_name, channel_url = parts with open(source_file_path, 'r', encoding='utf-8') as source_file:
try: lines = source_file.readlines()
is_valid = test_connectivity_and_download(channel_url)
except Exception as e: with open(output_file_path + '.txt', 'w', encoding='utf-8') as output_file:
print(f"检测URL {channel_url} 时发生错误: {e}") # 创建线程池
is_valid = False # 将异常的URL视为无效 with ThreadPoolExecutor(max_workers=64) as executor:
status = "有效" if is_valid else "无效" # 创建并启动工作线程
if "genre" in line.lower() or status == "有效": for _ in range(64):
results.append((channel_name.strip(), channel_url.strip(), status)) executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines))
# 写入文件
with open(输出, "w", encoding="utf-8") as output_file: # 将所有行放入队列
for channel_name, channel_url, status in results: for line in lines:
output_file.write(f"{channel_name},{channel_url}\n") task_queue.put(line)
# 等待队列中的所有任务完成
task_queue.join()
print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}")
print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}")
if __name__ == "__main__": if __name__ == "__main__":
输入 = "网络收集.txt" #input('请输入utf-8编码的直播源文件路径:') source_file_path = '网络收集.txt' # 替换为你的源文件路径
输出 = "网络收集.txt" output_file_path = '网络收集' # 替换为你的输出文件路径,不要后缀名
main(输入, 输出) main(source_file_path, output_file_path)