Update GAT.py
This commit is contained in:
@@ -174,50 +174,62 @@ with open('gat.txt', 'w', encoding='utf-8') as new_file:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
import cv2
|
import requests
|
||||||
import time
|
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
import threading
|
||||||
# 存储文件路径
|
import re
|
||||||
file_path = "gat.txt"
|
def test_connectivity(url, max_attempts=1):
|
||||||
output_file_path = "gat.txt"
|
video_formats = ["m3u", "/", "rtsp"]
|
||||||
|
if not any(re.search(keyword, url, re.I) for keyword in video_formats):
|
||||||
# 打开输入文件和输出文件
|
print("\n特殊网址: 跳过检测")
|
||||||
with open(file_path, 'r', encoding='utf-8') as file:
|
return False
|
||||||
lines = file.readlines()
|
for _ in range(max_attempts):
|
||||||
|
try:
|
||||||
# 获取总行数用于进度条
|
response = requests.get(url, timeout=0.3)
|
||||||
total_lines = len(lines)
|
return response.status_code == 200
|
||||||
|
except requests.RequestException:
|
||||||
# 写入通过检测的行到新文件
|
pass
|
||||||
with open(output_file_path, 'w', encoding='utf-8') as output_file:
|
return False
|
||||||
# 使用tqdm显示进度条
|
# 处理每一行的函数
|
||||||
for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'):
|
def process_line(line, output_file, valid_count, invalid_count):
|
||||||
# 检查是否包含 'genre'
|
parts = line.strip().split(",")
|
||||||
if 'genre' in line:
|
if len(parts) == 2:
|
||||||
output_file.write(line)
|
channel_name, channel_url = parts
|
||||||
continue
|
if "genre" in line.lower():
|
||||||
# 分割频道名称和URL,并去除空白字符
|
with threading.Lock():
|
||||||
parts = line.split(',', 1)
|
output_file.write("\n" +line) # 直接写入原始行
|
||||||
if len(parts) == 2:
|
elif test_connectivity(channel_url):
|
||||||
channel_name, url = parts
|
with threading.Lock():
|
||||||
channel_name = channel_name.strip()
|
output_file.write(f"{channel_name},{channel_url}\n")
|
||||||
url = url.strip()
|
valid_count[0] += 1
|
||||||
# 进行检测
|
else:
|
||||||
cap = cv2.VideoCapture(url)
|
with threading.Lock():
|
||||||
start_time = time.time()
|
invalid_count[0] += 1
|
||||||
frame_count = 0
|
else:
|
||||||
# 尝试捕获10秒内的帧
|
with threading.Lock():
|
||||||
while frame_count < 25 and (time.time() - start_time) < 2:
|
invalid_count[0] += 1
|
||||||
ret, frame = cap.read()
|
# 主函数
|
||||||
if not ret:
|
def main(source_file_path, output_file_path):
|
||||||
break
|
with open(source_file_path, "r", encoding="utf-8") as source_file:
|
||||||
frame_count += 1
|
lines = source_file.readlines()
|
||||||
# 释放资源
|
valid_count = [0]
|
||||||
cap.release()
|
invalid_count = [0]
|
||||||
# 根据捕获的帧数判断状态并记录结果
|
with open(output_file_path, "w", encoding="utf-8") as output_file:
|
||||||
if frame_count >= 25: # 10秒内超过200帧则写入
|
threads = []
|
||||||
output_file.write(line) # 写入检测通过的行
|
for line in tqdm(lines, desc="地址有效"):
|
||||||
|
thread = threading.Thread(target=process_line, args=(line, output_file, valid_count, invalid_count))
|
||||||
|
thread.start()
|
||||||
|
threads.append(thread)
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
print(f"任务完成,有效源数量: {valid_count[0]}, 无效源数量: {invalid_count[0]}")
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
source_file_path = "gat.txt"
|
||||||
|
output_file_path = "gat.txt"
|
||||||
|
main(source_file_path, output_file_path)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"程序发生错误: {e}")
|
||||||
|
|
||||||
# 无需再打印酒店源,因为这里是对所有URL进行检测,而不是基于IP分组检测
|
# 无需再打印酒店源,因为这里是对所有URL进行检测,而不是基于IP分组检测
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user