245 lines
8.7 KiB
Python
245 lines
8.7 KiB
Python
#本程序主体构造如下
|
|
#搜素有效IP并生成文件追加写入到相应列表文件后去重
|
|
#检测组播列表所有文件中IP有效性
|
|
#合并整理自用直播源,与组播无关
|
|
#合并所有组播文件并过滤严重掉帧的视频以保证流畅性
|
|
#提取检测后的频道进行分类输出优选组播源
|
|
#提取优选组播源中分类追加到自用直播源
|
|
#后续整理
|
|
#没了!!!!!!!!!!!!
|
|
import time
|
|
from datetime import datetime, timedelta # 确保 timedelta 被导入
|
|
import concurrent.futures
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import requests
|
|
import re
|
|
import os
|
|
import threading
|
|
from queue import Queue
|
|
import queue
|
|
from datetime import datetime
|
|
import replace
|
|
import fileinput
|
|
from tqdm import tqdm
|
|
from pypinyin import lazy_pinyin
|
|
from opencc import OpenCC
|
|
import base64
|
|
import cv2
|
|
from bs4 import BeautifulSoup
|
|
from urllib.parse import urlparse
|
|
from translate import Translator # 导入Translator类,用于文本翻译
|
|
# -*- coding: utf-8 -*-
|
|
import random
|
|
from fake_useragent import UserAgent # 需要先安装:pip install fake-useragent
|
|
|
|
# 创建输出目录
|
|
os.makedirs('playlist', exist_ok=True)
|
|
|
|
# 配置参数
|
|
DELAY_RANGE = (3, 6) # 随机延迟时间范围(秒)
|
|
MAX_RETRIES = 3 # 最大重试次数
|
|
REQUEST_TIMEOUT = 10 # 请求超时时间(秒)
|
|
|
|
def get_random_header():
|
|
"""生成随机请求头"""
|
|
return {
|
|
'User-Agent': UserAgent().random,
|
|
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
|
'Referer': 'https://fofa.info/'
|
|
}
|
|
|
|
def safe_request(url):
|
|
"""带重试机制的请求函数"""
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
# 随机延迟防止被封
|
|
time.sleep(random.uniform(*DELAY_RANGE))
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=get_random_header(),
|
|
timeout=REQUEST_TIMEOUT
|
|
)
|
|
|
|
# 检查HTTP状态码
|
|
if response.status_code == 429:
|
|
wait_time = 30 # 遇到反爬等待30秒
|
|
print(f"遇到反爬机制,等待{wait_time}秒后重试")
|
|
time.sleep(wait_time)
|
|
continue
|
|
|
|
response.raise_for_status()
|
|
return response.text
|
|
|
|
except Exception as e:
|
|
print(f"请求失败(第{attempt+1}次重试): {str(e)}")
|
|
if attempt == MAX_RETRIES - 1:
|
|
raise
|
|
|
|
def validate_video(url, mcast):
|
|
"""验证视频流有效性"""
|
|
video_url = f"{url}/rtp/{mcast}"
|
|
print(f"正在验证: {video_url}")
|
|
|
|
try:
|
|
# 发送请求,尝试下载 1 千字节的数据
|
|
response = requests.get(video_url, headers=get_random_header(), timeout=REQUEST_TIMEOUT, stream=True)
|
|
response.raise_for_status()
|
|
|
|
content_length = 0
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
if chunk:
|
|
content_length += len(chunk)
|
|
if content_length >= 64:
|
|
break
|
|
return content_length >= 16
|
|
|
|
except Exception as e:
|
|
print(f"视频验证异常: {str(e)}")
|
|
return False
|
|
|
|
def main():
|
|
# 获取需要处理的文件列表
|
|
files = [f.split('.')[0] for f in os.listdir('rtp') if f.endswith('.txt')]
|
|
print(f"待处理频道列表: {files}")
|
|
|
|
for filename in files:
|
|
province_isp = filename.split('_')
|
|
if len(province_isp) != 2:
|
|
continue
|
|
|
|
province, isp = province_isp
|
|
print(f"\n正在处理: {province}{isp}")
|
|
|
|
# 读取组播地址
|
|
try:
|
|
with open(f'rtp/{filename}.txt', 'r', encoding='utf-8') as f:
|
|
mcast = f.readline().split('rtp://')[1].split()[0].strip()
|
|
except Exception as e:
|
|
print(f"文件读取失败: {str(e)}")
|
|
continue
|
|
|
|
# 构造搜索请求
|
|
search_txt = f'"udpxy" && country="CN" && region="{province}" && is_domain=true'
|
|
encoded_query = base64.b64encode(search_txt.encode()).decode()
|
|
search_url = f'https://fofa.info/result?qbase64={encoded_query}'
|
|
|
|
# 执行搜索
|
|
try:
|
|
html = safe_request(search_url)
|
|
except Exception as e:
|
|
print(f"搜索失败: {str(e)}")
|
|
continue
|
|
|
|
# 解析搜索结果,修改正则表达式以匹配IP和域名
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
pattern = re.compile(r"http://(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\w[\w.-]*\w):\d+")
|
|
found_urls = set(pattern.findall(html))
|
|
print(f"找到{len(found_urls)}个有效地址")
|
|
|
|
# 验证地址有效性
|
|
valid_urls = [url for url in found_urls if validate_video(url, mcast)]
|
|
print(f"验证通过{len(valid_urls)}个有效地址")
|
|
|
|
# 生成播放列表
|
|
if valid_urls:
|
|
output_file = f'playlist/{province}{isp}.txt'
|
|
with open(f'rtp/{filename}.txt', 'r') as src, open(output_file, 'a') as dst:
|
|
original_content = src.read()
|
|
for url in valid_urls:
|
|
modified = original_content.replace('rtp://', f'{url}/rtp/')
|
|
dst.write(modified + '\n')
|
|
print(f"已生成播放列表: {output_file}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|
|
print('对playlist文件夹里面的所有txt文件进行去重处理')
|
|
def remove_duplicates_keep_order(folder_path):
|
|
for filename in os.listdir(folder_path):
|
|
if filename.endswith('.txt'):
|
|
file_path = os.path.join(folder_path, filename)
|
|
lines = set()
|
|
unique_lines = []
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
for line in file:
|
|
if line not in lines:
|
|
unique_lines.append(line)
|
|
lines.add(line)
|
|
# 将保持顺序的去重后的内容写回原文件
|
|
with open(file_path, 'w', encoding='utf-8') as file:
|
|
file.writelines(unique_lines)
|
|
# 使用示例
|
|
folder_path = 'playlist' # 替换为你的文件夹路径
|
|
remove_duplicates_keep_order(folder_path)
|
|
print('文件去重完成!移除存储的旧文件!')
|
|
import os
|
|
import time
|
|
from tqdm import tqdm
|
|
import sys
|
|
import requests # 新增requests库
|
|
|
|
# 初始化字典以存储IP检测结果
|
|
detected_ips = {}
|
|
|
|
def get_ip_key(url):
|
|
"""从URL中提取IP地址或域名,并构造一个唯一的键"""
|
|
start = url.find('://') + 3
|
|
end = url.find('/', start)
|
|
if end == -1:
|
|
end = len(url)
|
|
return url[start:end].strip()
|
|
|
|
folder_path = 'playlist'
|
|
|
|
if not os.path.isdir(folder_path):
|
|
print("指定的文件夹不存在。")
|
|
sys.exit()
|
|
|
|
for filename in os.listdir(folder_path):
|
|
if filename.endswith('.txt'):
|
|
file_path = os.path.join(folder_path, filename)
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
lines = file.readlines()
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as output_file:
|
|
for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"):
|
|
parts = line.split(',', 1)
|
|
if len(parts) >= 2:
|
|
channel_name, url = parts
|
|
channel_name = channel_name.strip()
|
|
url = url.strip()
|
|
ip_key = get_ip_key(url)
|
|
|
|
if ip_key in detected_ips:
|
|
if detected_ips[ip_key]['status'] == 'ok':
|
|
output_file.write(line)
|
|
continue
|
|
|
|
# 修改后的下载检测逻辑
|
|
success = False
|
|
start_time = time.time()
|
|
try:
|
|
with requests.get(url, stream=True, timeout=8) as r: # 总超时8秒
|
|
r.raise_for_status()
|
|
downloaded = 0
|
|
for chunk in r.iter_content(chunk_size=1024):
|
|
if chunk: # 过滤保活帧
|
|
downloaded += len(chunk)
|
|
if downloaded >= 1024 * 1024: # 达到1024KB
|
|
success = True
|
|
break
|
|
if time.time() - start_time > 8: # 超时中断
|
|
break
|
|
except Exception as e:
|
|
pass
|
|
|
|
detected_ips[ip_key] = {'status': 'ok' if success else 'fail'}
|
|
if success:
|
|
output_file.write(line)
|
|
|
|
# 打印检测结果(保持不变)
|
|
for ip_key, result in detected_ips.items():
|
|
print(f"IP Key: {ip_key}, Status: {result['status']}")
|