Create GAT.py

This commit is contained in:
frxz751113
2024-09-28 23:47:50 +08:00
committed by GitHub
parent e3cb246d85
commit 149058db21
+267
View File
@@ -0,0 +1,267 @@
import requests
from bs4 import BeautifulSoup
import re
import os
from opencc import OpenCC
from tqdm import tqdm
import cv2
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import time
keywords = ['1905', '凤凰卫视', '人间卫视', '亚洲卫视', '香港卫视', '神乐', '翡翠台', '凤凰香港', '凤凰中文', '凤凰资讯', 'AXN', 'AMC', '好莱坞', '东森', 'TVB', \
'龙华', '龙祥', '猪哥亮', '数位', 'AMC', '美亚', '番薯', '八大', '三立', '电影台', '戏剧台'] # 这里定义你的搜索关键词列表
output_file = 'gat.txt'
with open(output_file, 'w', encoding='utf-8') as f:
for keyword in keywords:
url = f'http://tonkiang.us/?&iqtv={keyword}'
response = requests.get(url)
if response.status_code == 200:
# 使用BeautifulSoup解析网页内容并提取文本
soup = BeautifulSoup(response.text, 'html.parser')
text_content = soup.get_text()
f.write(text_content + '\n')
else:
print(f'请求 {url} 失败,状态码:{response.status_code}')
with open('gat.txt', 'r', encoding='utf-8') as infile:
lines = infile.readlines()
new_lines = []
for i in range(len(lines)):
line = lines[i]
if 'http' in line:
# 找到当前行之前的非空行作为频道名称
for j in range(i - 1, -1, -1):
if lines[j].strip():
channel_name = lines[j].strip()
break
channel_url = line.strip()
new_lines.append(f'{channel_name},{channel_url}\n')
with open('gat.txt', 'w', encoding='utf-8') as outfile:
outfile.writelines(new_lines)
def remove_duplicates(input_file, output_file):
# 用于存储已经遇到的URL和包含genre的行
seen_urls = set()
seen_lines_with_genre = set()
# 用于存储最终输出的行
output_lines = []
# 打开输入文件并读取所有行
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
print("去重前的行数:", len(lines))
# 遍历每一行
for line in lines:
# 使用正则表达式查找URL和包含genre的行,默认最后一行
urls = re.findall(r'://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', line)
genre_line = re.search(r'\bgenre\b', line, re.IGNORECASE) is not None
# 如果找到URL并且该URL尚未被记录
if urls and urls[0] not in seen_urls:
seen_urls.add(urls[0])
output_lines.append(line)
# 如果找到包含genre的行,无论是否已被记录,都写入新文件
if genre_line:
output_lines.append(line)
# 将结果写入输出文件
with open(output_file, 'w', encoding='utf-8') as f:
f.writelines(output_lines)
print("去重后的行数:", len(output_lines))
# 使用方法
remove_duplicates('gat.txt', 'gat.txt')
print("处理完成,去重完成")
################简体转繁体
# 创建一个OpenCC对象,指定转换的规则为繁体字转简体字
converter = OpenCC('t2s.json')#繁转简
#converter = OpenCC('s2t.json')#简转繁
# 打开txt文件
with open('gat.txt', 'r', encoding='utf-8') as file:
traditional_text = file.read()
# 进行繁体字转简体字的转换
simplified_text = converter.convert(traditional_text)
# 将转换后的简体字写入txt文件
with open('gat.txt', 'w', encoding='utf-8') as file:
file.write(simplified_text)
print("处理完成,繁体转换完成")
def filter_lines(file_path):
with open('gat.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
filtered_lines = []
for line in lines:
if ',' in line:
if 'epg' not in line and 'mitv' not in line and 'udp' not in line and 'rtp' not in line \
and 'P2p' not in line and 'p2p' not in line and 'p3p' not in line and 'P2P' not in line and 'P3p' not in line and 'P3P' not in line:
filtered_lines.append(line)
return filtered_lines
def write_filtered_lines(output_file_path, filtered_lines):
with open(output_file_path, 'w', encoding='utf-8') as output_file:
output_file.writelines(filtered_lines)
if __name__ == "__main__":
input_file_path = 'gat.txt'
output_file_path = "gat.txt"
filtered_lines = filter_lines(input_file_path)
write_filtered_lines(output_file_path, filtered_lines)
print("/" * 80)
# ###########################################定义替换规则的字典,对整行内的内容进行替换
replacements = {
" ": "",
}
# 打开原始文件读取内容,并写入新文件
with open('gat.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 创建新文件并写入替换后的内容
with open('gat.txt', 'w', encoding='utf-8') as new_file:
for line in lines:
for old, new in replacements.items():
line = line.replace(old, new)
new_file.write(line)
# 函数:获取视频分辨率
def get_video_resolution(video_path, timeout=0.8):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return (width, height)
# 函数:处理每一行
def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines):
parts = line.strip().split(',')
if '#genre#' in line:
# 如果行包含 '#genre#',直接写入新文件
with threading.Lock():
output_file.write(line)
print(f"已写入genre行:{line.strip()}")
elif len(parts) == 2:
channel_name, channel_url = parts
resolution = get_video_resolution(channel_url, timeout=8)
if resolution and resolution[1] >= 720: # 检查分辨率是否大于等于720p
with threading.Lock():
output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n")
order_list.append((channel_name, resolution[1], channel_url))
valid_count[0] += 1
print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.")
else:
invalid_count[0] += 1
with threading.Lock():
print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%")
# 函数:多线程工作
def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines):
while True:
try:
line = task_queue.get(timeout=1)
process_line(line, output_file, order_list, valid_count, invalid_count, total_lines)
except Queue.Empty:
break
finally:
task_queue.task_done()
# 主函数
def main(source_file_path, output_file_path):
order_list = []
valid_count = [0]
invalid_count = [0]
task_queue = Queue()
# 读取源文件
with open(source_file_path, 'r', encoding='utf-8') as source_file:
lines = source_file.readlines()
with open(output_file_path + '.txt', 'w', encoding='utf-8') as output_file:
# 创建线程池
with ThreadPoolExecutor(max_workers=64) as executor:
# 创建并启动工作线程
for _ in range(64):
executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines))
# 将所有行放入队列
for line in lines:
task_queue.put(line)
# 等待队列中的所有任务完成
task_queue.join()
print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}")
if __name__ == "__main__":
source_file_path = 'gat.txt' # 替换为你的源文件路径
output_file_path = 'gat' # 替换为你的输出文件路径,不要后缀名
main(source_file_path, output_file_path)
with open('gat.txt', 'r', encoding='UTF-8') as f:
lines = f.readlines()
lines.sort()
with open('gat.txt', 'w', encoding='UTF-8') as f:
for line in lines:
f.write(line)
import re
from pypinyin import lazy_pinyin
# 打开一个utf-8编码的文本文件
with open("gat.txt", "r", encoding="utf-8") as file:
# 读取所有行并存储到列表中
lines = file.readlines()
# 定义一个函数,用于提取每行的第一个数字
def extract_first_number(line):
match = re.search(r'\d+', line)
return int(match.group()) if match else float('inf')
# 对列表中的行进行排序,按照第一个数字的大小排列,其余行按中文排序
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
# 将排序后的行写入新的utf-8编码的文本文件
with open("gat.txt", "w", encoding="utf-8") as file:
for line in sorted_lines:
file.write(line)
print("任务运行完毕,分类频道列表可查看文件夹内综合源.txt文件!")