Update GAT.py
This commit is contained in:
@@ -175,86 +175,96 @@ with open('gat.txt', 'w', encoding='utf-8') as new_file:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
import cv2
|
import requests
|
||||||
import time
|
import time
|
||||||
|
import cv2
|
||||||
|
from urllib.parse import urlparse
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
# 测试HTTP连接并尝试下载数据
|
||||||
# 存储文件路径
|
def test_connectivity_and_download(url, initial_timeout=1, retry_timeout=1):
|
||||||
file_path = "gat.txt"
|
parsed_url = urlparse(url)
|
||||||
output_file_path = "gat.txt"
|
if parsed_url.scheme not in ['http', 'https']:
|
||||||
|
# 非HTTP(s)协议,尝试RTSP检测
|
||||||
# 打开输入文件和输出文件
|
return test_rtsp_connectivity(url, retry_timeout)
|
||||||
with open(file_path, 'r', encoding='utf-8') as file:
|
else:
|
||||||
lines = file.readlines()
|
# HTTP(s)协议,使用原始方法
|
||||||
|
try:
|
||||||
# 获取总行数用于进度条
|
with requests.get(url, stream=True, timeout=initial_timeout) as response:
|
||||||
total_lines = len(lines)
|
if response.status_code == 200:
|
||||||
|
start_time = time.time()
|
||||||
# 写入通过检测的行到新文件
|
while time.time() - start_time < initial_timeout:
|
||||||
with open(output_file_path, 'w', encoding='utf-8') as output_file:
|
chunk = response.raw.read(512) # 尝试下载1KB数据
|
||||||
# 使用tqdm显示进度条
|
if chunk:
|
||||||
for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'):
|
return True # 成功下载数据
|
||||||
# 检查是否包含 'genre'
|
except requests.RequestException as e:
|
||||||
if 'genre' in line:
|
print(f"请求异常: {e}")
|
||||||
output_file.write(line)
|
pass #这行删掉则会在下载不到数据流的时候进行连通性测试
|
||||||
continue
|
return False # 默认返回False
|
||||||
# 分割频道名称和URL,并去除空白字符
|
print("/" * 80)
|
||||||
parts = line.split(',', 1)
|
# 测试RTSP连接并尝试读取流
|
||||||
if len(parts) == 2:
|
def test_rtsp_connectivity(url, timeout=3):
|
||||||
channel_name, url = parts
|
cap = cv2.VideoCapture(url)
|
||||||
channel_name = channel_name.strip()
|
if not cap.isOpened():
|
||||||
url = url.strip()
|
return False
|
||||||
# 进行检测
|
start_time = time.time()
|
||||||
cap = cv2.VideoCapture(url)
|
while time.time() - start_time < timeout:
|
||||||
start_time = time.time()
|
ret, _ = cap.read()
|
||||||
frame_count = 0
|
if ret:
|
||||||
# 尝试捕获10秒内的帧
|
return True # 成功读取帧
|
||||||
while frame_count < 25 and (time.time() - start_time) < 2:
|
cap.release()
|
||||||
ret, frame = cap.read()
|
return False
|
||||||
if not ret:
|
# 主函数
|
||||||
break
|
def main(输入, 输出):
|
||||||
frame_count += 1
|
with open(输入, "r", encoding="utf-8") as source_file:
|
||||||
# 释放资源
|
lines = source_file.readlines()
|
||||||
cap.release()
|
results = []
|
||||||
# 根据捕获的帧数判断状态并记录结果
|
for line_number, line in enumerate(tqdm(lines, desc="检测中")):
|
||||||
if frame_count >= 25: # 10秒内超过200帧则写入
|
parts = line.strip().split(",")
|
||||||
output_file.write(line) # 写入检测通过的行
|
if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空
|
||||||
|
channel_name, channel_url = parts
|
||||||
# 无需再打印酒店源,因为这里是对所有URL进行检测,而不是基于IP分组检测
|
try:
|
||||||
|
is_valid = test_connectivity_and_download(channel_url)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"检测URL {channel_url} 时发生错误: {e}")
|
||||||
|
is_valid = False # 将异常的URL视为无效
|
||||||
|
status = "有效" if is_valid else "无效"
|
||||||
|
if "genre" in line.lower() or status == "有效":
|
||||||
|
results.append((channel_name.strip(), channel_url.strip(), status))
|
||||||
|
# 写入文件
|
||||||
|
with open(输出, "w", encoding="utf-8") as output_file:
|
||||||
|
for channel_name, channel_url, status in results:
|
||||||
|
output_file.write(f"{channel_name},{channel_url}\n")
|
||||||
|
print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}")
|
||||||
|
if __name__ == "__main__":
|
||||||
|
输入 = "gat.txt" #input('请输入utf-8编码的直播源文件路径:')
|
||||||
|
输出 = "gat.txt"
|
||||||
|
main(输入, 输出)
|
||||||
|
|
||||||
|
|
||||||
|
#######################普通排序
|
||||||
with open('gat.txt', 'r', encoding='UTF-8') as f:
|
with open('gat.txt', 'r', encoding='UTF-8') as f:
|
||||||
lines = f.readlines()
|
lines = f.readlines()
|
||||||
|
|
||||||
lines.sort()
|
lines.sort()
|
||||||
|
|
||||||
with open('gat.txt', 'w', encoding='UTF-8') as f:
|
with open('gat.txt', 'w', encoding='UTF-8') as f:
|
||||||
for line in lines:
|
for line in lines:
|
||||||
f.write(line)
|
f.write(line)
|
||||||
|
|
||||||
|
|
||||||
|
#######################拼音排序
|
||||||
import re
|
import re
|
||||||
from pypinyin import lazy_pinyin
|
from pypinyin import lazy_pinyin
|
||||||
|
|
||||||
# 打开一个utf-8编码的文本文件
|
# 打开一个utf-8编码的文本文件
|
||||||
with open("gat.txt", "r", encoding="utf-8") as file:
|
with open("gat.txt", "r", encoding="utf-8") as file:
|
||||||
# 读取所有行并存储到列表中
|
# 读取所有行并存储到列表中
|
||||||
lines = file.readlines()
|
lines = file.readlines()
|
||||||
|
|
||||||
# 定义一个函数,用于提取每行的第一个数字
|
# 定义一个函数,用于提取每行的第一个数字
|
||||||
def extract_first_number(line):
|
def extract_first_number(line):
|
||||||
match = re.search(r'\d+', line)
|
match = re.search(r'\d+', line)
|
||||||
return int(match.group()) if match else float('inf')
|
return int(match.group()) if match else float('inf')
|
||||||
|
|
||||||
# 对列表中的行进行排序,按照第一个数字的大小排列,其余行按中文排序
|
# 对列表中的行进行排序,按照第一个数字的大小排列,其余行按中文排序
|
||||||
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
|
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
|
||||||
|
|
||||||
# 将排序后的行写入新的utf-8编码的文本文件
|
# 将排序后的行写入新的utf-8编码的文本文件
|
||||||
with open("gat.txt", "w", encoding="utf-8") as file:
|
with open("gat.txt", "w", encoding="utf-8") as file:
|
||||||
for line in sorted_lines:
|
for line in sorted_lines:
|
||||||
file.write(line)
|
file.write(line)
|
||||||
|
|
||||||
|
|
||||||
print("任务运行完毕,分类频道列表可查看文件夹内综合源.txt文件!")
|
print("任务运行完毕,分类频道列表可查看文件夹内综合源.txt文件!")
|
||||||
|
|||||||
Reference in New Issue
Block a user