Update iptv-cv2-check.py

This commit is contained in:
frxz751113
2024-08-11 01:03:06 +08:00
committed by GitHub
parent d46f14c3db
commit d16e213f0f
+36 -14
View File
@@ -527,7 +527,7 @@ def filter_lines(input_file, output_file):
filtered_lines = [] filtered_lines = []
for line in lines: for line in lines:
#if ('hls' in line and 'm3u' in line) or ('tsfile' in line and 'm3u' in line): #行中包含m3u的同时还要包含hls或者tsfile #if ('hls' in line and 'm3u' in line) or ('tsfile' in line and 'm3u' in line): #行中包含m3u的同时还要包含hls或者tsfile
if '凤凰' in line or '东森' in line or '天映' in line or '酒店' in line or '龙祥' in line or '美亚' in line or '好莱坞' in line: #仅提取港澳频道 if '凤凰' in line or '东森' in line or '天映' in line or '酒店' in line or '龙祥' in line or '美亚' in line or '好莱坞' in line or '珠江' in line: #仅提取港澳频道
if 'udp' not in line and 'rtp' not in line and 'CCTV' not in line and '卫视' not in line: # 排除组播地址及央卫 if 'udp' not in line and 'rtp' not in line and 'CCTV' not in line and '卫视' not in line: # 排除组播地址及央卫
filtered_lines.append(line) filtered_lines.append(line)
with open(output_file, 'w', encoding='utf-8') as output_file: with open(output_file, 'w', encoding='utf-8') as output_file:
@@ -575,73 +575,95 @@ deduplicate_lines(input_file_path, output_file_path)
#################################################### 对整理好的频道列表测试HTTP连接 #################################################### 对整理好的频道列表测试HTTP连接
# 函数:获取视频分辨率 # 函数:获取视频分辨率
def get_video_resolution(video_path, timeout=2): def get_video_resolution(video_path, timeout=2):
# 使用OpenCV创建视频捕获对象
cap = cv2.VideoCapture(video_path) cap = cv2.VideoCapture(video_path)
# 检查视频是否成功打开
if not cap.isOpened(): if not cap.isOpened():
return None return None
# 获取视频的宽度和高度
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 释放视频捕获对象
cap.release() cap.release()
# 返回视频的分辨率
return (width, height) return (width, height)
# 函数:处理每一行 # 函数:处理每一行
def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines): def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines):
# 去除行尾的空白字符并按逗号分割行
parts = line.strip().split(',') parts = line.strip().split(',')
# 如果行包含特定的标签'#genre#',则直接写入新文件
if '#genre#' in line: if '#genre#' in line:
# 如果行包含 '#genre#',直接写入新文件 with threading.Lock(): # 使用线程锁保证写入操作的原子性
with threading.Lock():
output_file.write(line) output_file.write(line)
print(f"已写入genre行:{line.strip()}") print(f"已写入genre行:{line.strip()}")
# 如果分割后的部分数量为2,则继续处理
elif len(parts) == 2: elif len(parts) == 2:
channel_name, channel_url = parts channel_name, channel_url = parts
# 获取视频的分辨率
resolution = get_video_resolution(channel_url, timeout=2) resolution = get_video_resolution(channel_url, timeout=2)
if resolution and resolution[1] >= 720: # 检查分辨率是否大于等于720p # 如果分辨率有效且高度大于等于720p
with threading.Lock(): if resolution and resolution[1] >= 720:
with threading.Lock(): # 使用线程锁
output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n") output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n")
# 将频道名、分辨率和URL添加到列表中
order_list.append((channel_name, resolution[1], channel_url)) order_list.append((channel_name, resolution[1], channel_url))
# 有效计数增加
valid_count[0] += 1 valid_count[0] += 1
print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.") print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.")
else: else:
# 如果分辨率不满足条件,无效计数增加
invalid_count[0] += 1 invalid_count[0] += 1
# 打印当前处理进度
with threading.Lock(): with threading.Lock():
print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%") print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%")
# 函数:多线程工作 # 函数:多线程工作
def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines): def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines):
# 循环直到队列为空
while True: while True:
try: try:
# 从队列中获取任务,超时时间为1秒
line = task_queue.get(timeout=1) line = task_queue.get(timeout=1)
# 处理获取的任务
process_line(line, output_file, order_list, valid_count, invalid_count, total_lines) process_line(line, output_file, order_list, valid_count, invalid_count, total_lines)
except Queue.Empty: except Queue.Empty: # 如果队列为空,捕获异常
break break
finally: finally:
# 标记任务已完成
task_queue.task_done() task_queue.task_done()
# 主函数 # 主函数
def main(source_file_path, output_file_path): def main(source_file_path, output_file_path):
# 初始化列表和计数器
order_list = [] order_list = []
valid_count = [0] valid_count = [0]
invalid_count = [0] invalid_count = [0]
task_queue = Queue() task_queue = Queue()
# 读取源文件 # 使用with语句打开源文件并读取所有行
with open(source_file_path, 'r', encoding='utf-8') as source_file: with open(source_file_path, 'r', encoding='utf-8') as source_file:
lines = source_file.readlines() lines = source_file.readlines()
# 使用with语句打开输出文件准备写入
with open(output_file_path, 'w', encoding='utf-8') as output_file: with open(output_file_path, 'w', encoding='utf-8') as output_file:
# 创建线程池 # 创建线程池,最大工作线程数为64
with ThreadPoolExecutor(max_workers=64) as executor: with ThreadPoolExecutor(max_workers=64) as executor:
# 创建并启动工作线程 # 为线程池中的每个线程提交worker函数
for _ in range(64): for _ in range(64):
executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines)) executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines))
# 将所有行放入队列 # 将所有行放入任务队列
for line in lines: for line in lines:
task_queue.put(line) task_queue.put(line)
# 等待队列中的所有任务完成 # 等待队列中的所有任务完成
task_queue.join() task_queue.join()
# 打印任务完成的统计信息
print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}") print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}")
# 程序入口点
if __name__ == "__main__": if __name__ == "__main__":
# 定义源文件和输出文件的路径
source_file_path = 'iptv.txt' # 替换为你的源文件路径 source_file_path = 'iptv.txt' # 替换为你的源文件路径
output_file_path = '检测结果.txt' # 替换为你的输出文件路径,不要后缀名 output_file_path = '检测结果.txt' # 替换为你的输出文件路径
# 调用主函数
main(source_file_path, output_file_path) main(source_file_path, output_file_path)