Update 测绘站采集.py

This commit is contained in:
frxz751113
2025-05-06 16:08:11 +08:00
committed by GitHub
parent e655aad4f6
commit 5cf7581a4f
+468 -597
View File
@@ -1,10 +1,15 @@
from lxml import etree #本程序主体构造如下
#搜素有效IP并生成文件追加写入到相应列表文件后去重
#检测组播列表所有文件中IP有效性
#合并整理自用直播源,与组播无关
#合并所有组播文件并过滤严重掉帧的视频以保证流畅性
#提取检测后的频道进行分类输出优选组播源
#提取优选组播源中分类追加到自用直播源
#后续整理
#没了!!!!!!!!!!!!
import time import time
import datetime
from datetime import datetime, timedelta # 确保 timedelta 被导入 from datetime import datetime, timedelta # 确保 timedelta 被导入
import concurrent.futures import concurrent.futures
#from selenium import webdriver
#from selenium.webdriver.chrome.options import Options
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import requests import requests
import re import re
@@ -13,6 +18,7 @@ import threading
from queue import Queue from queue import Queue
import queue import queue
from datetime import datetime from datetime import datetime
import replace
import fileinput import fileinput
from tqdm import tqdm from tqdm import tqdm
from pypinyin import lazy_pinyin from pypinyin import lazy_pinyin
@@ -22,479 +28,436 @@ import cv2
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from urllib.parse import urlparse from urllib.parse import urlparse
from translate import Translator # 导入Translator类,用于文本翻译 from translate import Translator # 导入Translator类,用于文本翻译
## 定义txt文件的URL列表 # -*- coding: utf-8 -*-
urls = [ import random
'https://dimaston.github.io/live.m3u', #假m3u from fake_useragent import UserAgent # 需要先安装:pip install fake-useragent
'https://raw.githubusercontent.com/xiongjian83/TvBox/refs/heads/main/live.txt',
'https://raw.github.com/ddhola/file/d7afb504b1ba4fef31813e1166cb892215a9c063/0609test',
'https://raw.github.com/vbskycn/iptv/2738b3bec8c298f57e0e2052b155846ab6ea3787/dsyy/hd.txt',
'https://raw.github.com/frxz751113/AAAAA/main/IPTV/TW.txt',
'https://raw.github.com/ljlfct01/ljlfct01.github.io/main/list.%E8%87%AA%E7%94%A8',
'https://notabug.org/qcfree/TVBox-api/raw/main/live.txt', #通用源
'https://raw.github.com/KAN314go/A/e81a1c22cd1b9f459bc363bd916c13133e235510/tv/%E5%AE%89%E5%8D%9A8K.txt',
'https://gitlab.com/tvtg/vip/-/raw/main/log.txt',
'https://raw.github.com/frxz751113/IPTVzb1/main/%E7%BB%BC%E5%90%88%E6%BA%90.txt',
'https://raw.github.com/ssili126/tv/main/itvlist.txt',
'https://raw.github.com/Supprise0901/TVBox_live/main/live.txt',
'https://raw.github.com/Guovin/TV/gd/result.txt',
'https://raw.github.com/gaotianliuyun/gao/master/list.txt',
'https://gitee.com/xxy002/zhiboyuan/raw/master/zby.txt',
'https://raw.github.com/mlvjfchen/TV/main/iptv_list.txt',
'https://raw.github.com/fenxp/iptv/main/live/tvlive.txt',
'https://raw.github.com/zwc456baby/iptv_alive/master/live.txt',
'https://gitlab.com/p2v5/wangtv/-/raw/main/lunbo.txt',
'https://raw.github.com/PizazzGY/TVBox/main/live.txt',
'https://gitcode.net/MZ011/BHJK/-/raw/master/BHZB1.txt',
'https://raw.github.com/vbskycn/iptv/master/tv/iptv4.txt',
'https://raw.github.com/junge3333/juds6/main/yszb1.txt',
'https://raw.github.com/zzmaze/iptv/main/iptv.txt',
'https://raw.github.com/kimwang1978/collect-tv-txt/main/others_output.txt',
'https://raw.github.com/newrecha/TVBOX/5cdd7dcc228f14e4c7f343278e330481aae84eee/live/free.txt',
'https://bbs.ysctv.cn/tv/live.txt',
'https://axzzz-my.sharepoint.com/personal/axzzzpan4_axzzz_top/_layouts/15/download.aspx?UniqueId=a357e310-4e3d-45ee-aa68-6913ec533fcc&Translate=false&tempauth=v1.eyJzaXRlaWQiOiIzZGU4M2VmZS0xYWNlLTQ1YWEtODVhNS02YmVlMWM4MTVhZjMiLCJhcHBfZGlzcGxheW5hbWUiOiJBeHp6euS6keebmChvZDQpIiwiYXBwaWQiOiI2NjM3NjZiNi1kYjgwLTQwZjYtYjRkMi1kNWM2NDY0NmQxMjYiLCJhdWQiOiIwMDAwMDAwMy0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDAvYXh6enotbXkuc2hhcmVwb2ludC5jb21AYzdhMzU1YTUtODgxMS00YzQwLWI1NTktMDk0ZDIzMDMxMjdiIiwiZXhwIjoiMTcyNzkyMjY5MCJ9.CgoKBHNuaWQSAjY0EgsI0ITAwqTVsT0QBRoOMjAuMTkwLjE0NC4xNzEqLGpUa2s3dW1UWFJxYUdMbm9KSVdYS28wbzZnUjJFSWdHemRmcGlpNkliSVk9MJQBOAFCEKFWItxKYAAw1b_et5n_RcNKEGhhc2hlZHByb29mdG9rZW5yKTBoLmZ8bWVtYmVyc2hpcHwxMDAzMjAwMzJjNzIwZWQ2QGxpdmUuY29tegEyggESCaVVo8cRiEBMEbVZCU0jAxJ7kgEEcGFuNJoBBUF4enp6ogETYXh6enpwYW40QGF4enp6LnRvcKoBEDEwMDMyMDAzMkM3MjBFRDayAQ5hbGxmaWxlcy53cml0ZcgBAQ._ZmDMLJ917RtVteaBChKC03v2IfufJb1x2SSLpDihu4&ApiVersion=2.0',
'https://d.kstore.space/download/8209/港澳台终极版.txt',
'http://wp.wadg.pro/down.php/c7a364c3b23d0b7d3b01e9e731414efc.txt;',
'https://aiwoa2003.oss-cn-hangzhou.aliyuncs.com/%E5%A4%A9%E5%91%BD%E4%BA%BA.txt',
'https://fs-im-kefu.7moor-fs1.com/ly/4d2c3f00-7d4c-11e5-af15-41bf63ae4ea0/1715581924111/live1.txt',
'https://gitea.moe/xiangjiao-ge/yifa169/raw/branch/main/yiyifafa.txt',
'https://d.kstore.space/download/3701/%E8%8A%B8%E6%B1%90%E7%94%B5%E8%A7%86.txt',
'https://bbs.ysctv.cn/tv/live.txt',
'https://d.kstore.space/download/8209/%E6%B8%AF%E6%BE%B3%E5%8F%B0%E7%BB%88%E6%9E%81%E7%89%88.txt',
'https://fs-im-kefu.7moor-fs1.com/ly/4d2c3f00-7d4c-11e5-af15-41bf63ae4ea0/1724505579710/%E6%80%BB%E7%BB%9F%E7%94%B5%E8%A7%8627.txt',
'https://raw.bgithub.xyz/luoye20230624/NETZB/6d8798a6a9aef8fce48597b283011e123e0caa2f/%E7%BB%BC%E5%90%88%E6%BA%90.txt',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
''
]
# 合并文件的函数
def merge_txt_files(urls, output_filename='汇总.txt'):
try:
with open(output_filename, 'w', encoding='utf-8') as outfile:
for url in urls:
try:
response = requests.get(url)
response.raise_for_status() # 确保请求成功
# 尝试将响应内容解码为UTF-8,如果失败则尝试其他编码
try:
content = response.content.decode('utf-8')
except UnicodeDecodeError:
content = response.content.decode('gbk') # 尝试GBK编码
outfile.write(content + '\n')
except requests.RequestException as e:
print(f'Error downloading {url}: {e}')
except IOError as e:
print(f'Error writing to file: {e}')
# 调用函数 # 创建输出目录
merge_txt_files(urls) os.makedirs('playlist', exist_ok=True)
# 配置参数
DELAY_RANGE = (3, 6) # 随机延迟时间范围(秒)
MAX_RETRIES = 3 # 最大重试次数
REQUEST_TIMEOUT = 10 # 请求超时时间(秒)
def get_random_header():
"""生成随机请求头"""
return {
'User-Agent': UserAgent().random,
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
#简体转繁体 'Referer': 'https://fofa.info/'
# 创建一个OpenCC对象,指定转换的规则为繁体字转简体字
converter = OpenCC('t2s.json')#繁转简
#converter = OpenCC('s2t.json')#简转繁
# 打开txt文件
with open('汇总.txt', 'r', encoding='utf-8') as file:
traditional_text = file.read()
# 进行繁体字转简体字的转换
simplified_text = converter.convert(traditional_text)
# 将转换后的简体字写入txt文件
with open('汇总.txt', 'w', encoding='utf-8') as file:
file.write(simplified_text)
with open('汇总.txt', 'r', encoding="utf-8") as file:
# 读取所有行并存储到列表中
lines = file.readlines()
#定义替换规则的字典对频道名替换
replacements = {
"CCTV-1高清测试": "",
"CCTV-2高清测试": "",
"CCTV-7高清测试": "",
"CCTV-10高清测试": "",
"中央": "CCTV",
"高清""": "",
"HD": "",
"标清": "",
"amc": "AMC",
"CCTV1综合": "CCTV1",
"CCTV2财经": "CCTV2",
"CCTV3综艺": "CCTV3",
"国际": "",
"5体育": "5",
"6电影": "6",
"军农": "",
"8影视": "8",
"9纪录": "9",
"0科教": "0",
"2社会与法": "2",
"3新闻": "3",
"4少儿": "4",
"5音乐": "5",
"": "",
"": "",
"": "",
"": "",
"": "",
"": "",
"咪咕": "",
"": "",
"超清": "",
"频道": "",
"CCTV-": "CCTV",
"CCTV_": "CCTV",
" ": "",
"CCTV风云剧场": "风云剧场",
"CCTV第一剧场": "第一剧场",
"CCTV怀旧剧场": "怀旧剧场",
"熊猫影院": "熊猫电影",
"熊猫爱生活": "熊猫生活",
"爱宠宠物": "宠物生活",
"[ipv6]": "",
"专区": "",
"卫视超": "卫视",
"CCTV风云剧场": "风云剧场",
"CCTV第一剧场": "第一剧场",
"CCTV怀旧剧场": "怀旧剧场",
"IPTV": "",
"PLUS": "+",
"": "+",
"(": "",
")": "",
"CAV": "",
"美洲": "",
"北美": "",
"12M": "",
"高清测试CCTV-1": "",
"高清测试CCTV-2": "",
"高清测试CCTV-7": "",
"高清测试CCTV-10": "",
"LD": "",
"HEVC20M": "",
"S,": ",",
"测试": "",
"CCTW": "CCTV",
"试看": "",
"测试": "",
" ": "",
"测试cctv": "CCTV",
"CCTV1综合": "CCTV1",
"CCTV2财经": "CCTV2",
"CCTV3综艺": "CCTV3",
"CCTV4国际": "CCTV4",
"CCTV4中文国际": "CCTV4",
"CCTV4欧洲": "CCTV4",
"CCTV5体育": "CCTV5",
"CCTV5+体育": "CCTV5+",
"CCTV6电影": "CCTV6",
"CCTV7军事": "CCTV7",
"CCTV7军农": "CCTV7",
"CCTV7农业": "CCTV7",
"CCTV7国防军事": "CCTV7",
"CCTV8电视剧": "CCTV8",
"CCTV8影视": "CCTV8",
"CCTV8纪录": "CCTV9",
"CCTV9记录": "CCTV9",
"CCTV9纪录": "CCTV9",
"CCTV10科教": "CCTV10",
"CCTV11戏曲": "CCTV11",
"CCTV12社会与法": "CCTV12",
"CCTV13新闻": "CCTV13",
"CCTV新闻": "CCTV13",
"CCTV14少儿": "CCTV14",
"央视14少儿": "CCTV14",
"CCTV少儿超": "CCTV14",
"CCTV15音乐": "CCTV15",
"CCTV音乐": "CCTV15",
"CCTV16奥林匹克": "CCTV16",
"CCTV17农业农村": "CCTV17",
"CCTV17军农": "CCTV17",
"CCTV17农业": "CCTV17",
"CCTV5+体育赛视": "CCTV5+",
"CCTV5+赛视": "CCTV5+",
"CCTV5+体育赛事": "CCTV5+",
"CCTV5+赛事": "CCTV5+",
"CCTV5+体育": "CCTV5+",
"CCTV5赛事": "CCTV5+",
"凤凰中文台": "凤凰中文",
"凤凰资讯台": "凤凰资讯",
"(CCTV4K测试)": "CCTV4K",
"上海东方卫视": "上海卫视",
"东方卫视": "上海卫视",
"内蒙卫视": "内蒙古卫视",
"福建东南卫视": "东南卫视",
"广东南方卫视": "南方卫视",
"湖南金鹰卡通": "金鹰卡通",
"炫动卡通": "哈哈炫动",
"卡酷卡通": "卡酷少儿",
"卡酷动画": "卡酷少儿",
"BRTVKAKU少儿": "卡酷少儿",
"优曼卡通": "优漫卡通",
"优曼卡通": "优漫卡通",
"嘉佳卡通": "佳嘉卡通",
"世界地理": "地理世界",
"CCTV世界地理": "地理世界",
"BTV北京卫视": "北京卫视",
"BTV冬奥纪实": "冬奥纪实",
"东奥纪实": "冬奥纪实",
"卫视台": "卫视",
"湖南电视台": "湖南卫视",
"少儿科教": "少儿",
"影视剧": "影视",
"电视剧": "影视",
"CCTV1CCTV1": "CCTV1",
"CCTV2CCTV2": "CCTV2",
"CCTV7CCTV7": "CCTV7",
"CCTV10CCTV10": "CCTV10"
} }
with open('汇总.txt', 'w', encoding='utf-8') as new_file:
for line in lines:
# 去除行尾的换行符
line = line.rstrip('\n')
# 分割行,获取逗号前的字符串
parts = line.split(',', 1)
if len(parts) > 0:
# 替换逗号前的字符串
before_comma = parts[0]
for old, new in replacements.items():
before_comma = before_comma.replace(old, new)
# 将替换后的逗号前部分和逗号后部分重新组合成一行,并写入新文件
new_line = f'{before_comma},{parts[1]}\n' if len(parts) > 1 else f'{before_comma}\n'
new_file.write(new_line)
def safe_request(url):
"""带重试机制的请求函数"""
for attempt in range(MAX_RETRIES):
# 打开文本文件进行读取
def read_and_process_file(input_filename, output_filename, encodings=['utf-8', 'gbk']):
for encoding in encodings:
try: try:
with open(input_filename, 'r', encoding=encoding) as file: # 随机延迟防止被封
lines = file.readlines() time.sleep(random.uniform(*DELAY_RANGE))
response = requests.get(
url,
headers=get_random_header(),
timeout=REQUEST_TIMEOUT
)
# 检查HTTP状态码
if response.status_code == 429:
wait_time = 30 # 遇到反爬等待30秒
print(f"遇到反爬机制,等待{wait_time}秒后重试")
time.sleep(wait_time)
continue
response.raise_for_status()
return response.text
except Exception as e:
print(f"请求失败(第{attempt+1}次重试): {str(e)}")
if attempt == MAX_RETRIES - 1:
raise
def validate_video(url, mcast):
"""验证视频流有效性"""
video_url = f"{url}/rtp/{mcast}"
print(f"正在验证: {video_url}")
try:
# 发送请求,尝试下载 1 千字节的数据
response = requests.get(video_url, headers=get_random_header(), timeout=REQUEST_TIMEOUT, stream=True)
response.raise_for_status()
content_length = 0
for chunk in response.iter_content(chunk_size=1024):
if chunk:
content_length += len(chunk)
if content_length >= 64:
break break
except UnicodeDecodeError: return content_length >= 16
except Exception as e:
print(f"视频验证异常: {str(e)}")
return False
def main():
# 获取需要处理的文件列表
files = [f.split('.')[0] for f in os.listdir('rtp') if f.endswith('.txt')]
print(f"待处理频道列表: {files}")
for filename in files:
province_isp = filename.split('_')
if len(province_isp) != 2:
continue continue
else:
raise ValueError(f"Cannot decode file '{input_filename}' with any of the provided encodings")
with open(output_filename, 'w', encoding='utf-8') as outfile: province, isp = province_isp
for line in lines: print(f"\n正在处理: {province}{isp}")
if '$' in line:
processed_line = line.split('$')[0].rstrip('\n')
outfile.write(processed_line + '\n')
else:
outfile.write(line)
# 调用函数 # 读取组播地址
read_and_process_file('汇总.txt', '汇总.txt') # 修改输出文件名以避免覆盖原始文件 try:
with open(f'rtp/{filename}.txt', 'r', encoding='utf-8') as f:
mcast = f.readline().split('rtp://')[1].split()[0].strip()
except Exception as e:
print(f"文件读取失败: {str(e)}")
continue
###################################################################去重##################################### # 构造搜索请求
def remove_duplicates(input_file, output_file): search_txt = f'"udpxy" && country="CN" && region="{province}"'
# 用于存储已经遇到的URL和包含genre的行 encoded_query = base64.b64encode(search_txt.encode()).decode()
seen_urls = set() search_url = f'https://fofa.info/result?qbase64={encoded_query}'
seen_lines_with_genre = set()
# 用于存储最终输出的行 # 执行搜索
output_lines = [] try:
# 打开输入文件并读取所有行 html = safe_request(search_url)
with open(input_file, 'r', encoding='utf-8') as f: except Exception as e:
lines = f.readlines() print(f"搜索失败: {str(e)}")
print("去重前的行数:", len(lines)) continue
# 遍历每一行
for line in lines: # 解析搜索结果,修改正则表达式以匹配IP和域名
# 使用正则表达式查找URL和包含genre的行,默认最后一行 soup = BeautifulSoup(html, 'html.parser')
urls = re.findall(r'[https]?[http]?[rtsp]?[rtmp]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', line) pattern = re.compile(r"http://(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\w[\w.-]*\w):\d+")
genre_line = re.search(r'\bgenre\b', line, re.IGNORECASE) is not None found_urls = set(pattern.findall(html))
# 如果找到URL并且该URL尚未被记录 print(f"找到{len(found_urls)}个有效地址")
if urls and urls[0] not in seen_urls:
seen_urls.add(urls[0]) # 验证地址有效性
output_lines.append(line) valid_urls = [url for url in found_urls if validate_video(url, mcast)]
# 如果找到包含genre的行,无论是否已被记录,都写入新文件 print(f"验证通过{len(valid_urls)}个有效地址")
if genre_line:
output_lines.append(line) # 生成播放列表
# 将结果写入输出文件 if valid_urls:
with open(output_file, 'w', encoding='utf-8') as f: output_file = f'playlist/{province}{isp}.txt'
f.writelines(output_lines) with open(f'rtp/{filename}.txt', 'r') as src, open(output_file, 'w') as dst:
print("去重后的行数:", len(output_lines)) original_content = src.read()
# 使用方法 for url in valid_urls:
remove_duplicates('汇总.txt', '2.txt') modified = original_content.replace('rtp://', f'{url}/rtp/')
dst.write(modified + '\n')
print(f"已生成播放列表: {output_file}")
if __name__ == '__main__':
main()
print('对playlist文件夹里面的所有txt文件进行去重处理')
def remove_duplicates_keep_order(folder_path):
for filename in os.listdir(folder_path):
if filename.endswith('.txt'):
######################################################################################提取goodiptv file_path = os.path.join(folder_path, filename)
import re lines = set()
import os unique_lines = []
# 定义一个包含所有要排除的关键词的列表 with open(file_path, 'r', encoding='utf-8') as file:
excluded_keywords = ['epg', 'mitv', 'udp', 'rtp', 'P2p', 'p2p', 'p3p', 'P2P', '新闻综合', 'P3p', 'jdshipin#', '9930/qilu', 'gitcode.net', '151:99', '21dtv', 'txmov2', 'gcw.bdcdn', 'metshop',
'shandong', 'goodiptv', '购物', '[', 'P3P', '', '', '//1', '/hls/', '春节', '网络收集', '95.179', 'hlspull', 'github', 'lunbo', 'tw.ts138', '114:8278', '//tvb', 'extraott',
'22:8891', 'fanmingming', '43:22222', 'etv.xhgvip', 'free.xiptv', 'www.zhixun', 'xg.52sw', 'iptv.yjxfz.com', 'zb.qc', 'CHC', '/vd', '/TV2/'] #, 'CHC', '/TV2/'
# 定义一个包含所有要提取的关键词的列表
extract_keywords = ['1905', '凤凰卫视', '人间卫视', '亚洲卫视', '香港卫视', '神乐', '翡翠台', '凤凰香港', '凤凰中文', '凤凰资讯', 'AXN', 'AMC', '电影台', '大爱', '东森',
'华视', '中天', '天良', '美亚', '星影', '纬来', '天映', '无线', '华剧台', '华丽台', '剧台', '三立', '八大', '采昌', '民视', '数位', '影视2',
'影视3', '中视', '豬哥亮', 'TVB', '公视', '寰宇', '戏剧', '靖天', '靖洋', '龙华', '龙祥', '猪哥亮', '影迷', '影剧', '电视剧',
'中华小当家', '中天娱乐', '公视戏剧', '珍珠台', '台视', '华视', '环球电视', '美亚C+', '番薯']
# 读取文件并处理每一行
with open('2.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 创建或打开一个输出文件用于写入处理后的数据
with open('网络收集.txt', 'w', encoding='utf-8') as outfile:
for line in lines:
# 首先检查行是否包含任何提取关键词
if any(keyword in line for keyword in extract_keywords):
# 如果包含提取关键词,进一步检查行是否不包含任何排除关键词
if not any(keyword in line for keyword in excluded_keywords):
outfile.write(line) # 写入符合条件的行到文件
###############################################################
import re
def parse_file(input_file_path, output_file_name):
# 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分
ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)')
# 用于存储每个IP或域名及其对应的行列表
ip_or_domain_to_lines = {}
# 读取原始文件内容
with open(input_file_path, 'r', encoding='utf-8') as file:
for line in file: for line in file:
line = line.strip() if line not in lines:
# 如果行是分类标签行,则跳过 unique_lines.append(line)
if ",#genre#" in line: lines.add(line)
continue # 将保持顺序的去重后的内容写回原文件
# 检查行是否包含IP或域名 with open(file_path, 'w', encoding='utf-8') as file:
match = ip_or_domain_pattern.search(line) file.writelines(unique_lines)
if match: # 使用示例
# 提取匹配到的IP或域名 folder_path = 'playlist' # 替换为你的文件夹路径
matched_text = match.group(1) remove_duplicates_keep_order(folder_path)
# 去除IP或域名后的剩余部分,只保留匹配到的IP或域名 print('文件去重完成!移除存储的旧文件!')
ip_or_domain = matched_text.split('://')[-1].split('/')[0].split('::')[0]
# 将行添加到对应的IP或域名列表中
if ip_or_domain not in ip_or_domain_to_lines:
ip_or_domain_to_lines[ip_or_domain] = []
ip_or_domain_to_lines[ip_or_domain].append(line)
############################################################################### 过滤掉小于1500字节的IP或域名段
filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items()
if sum(len(line) for line in lines) >= 300}
# 如果没有满足条件的IP或域名段,则不生成文件
if not filtered_ip_or_domain_to_lines:
print("没有满足条件的IP或域名段,不生成文件。")
return
# 合并所有满足条件的IP或域名的行到一个文件
with open(output_file_name, 'w', encoding='utf-8') as output_file:
for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items():
# 写入IP或域名及其对应的行到输出文件
output_file.write(f"频道,#genre#\n")
for line in lines:
output_file.write(line + '\n')
output_file.write('\n') # 在每个小段后添加一个空行作为分隔
# 调用函数并传入文件路径和输出文件名
parse_file('网络收集.txt', '网络收集.txt')
######################################################
#####################################################
######################################################################################################################
#################################################################################
###############检测playlist文件夹内所有txt文件内的组播
###############检测playlist文件夹内所有txt文件内的组播
###############检测playlist文件夹内所有txt文件内的组播
import os
import cv2 import cv2
import time import time
from tqdm import tqdm from tqdm import tqdm
import os import sys
# 存储文件路径 # 初始化字典以存储IP检测结果
file_path = "网络收集.txt" detected_ips = {}
output_file_path = "网络收集.txt"
def get_ip_key(url): def get_ip_key(url):
""" URL 中提取 IP 地址,并构造一个唯一的键""" """从URL中提取IP地址或域名,并构造一个唯一的键"""
start = url.find('://') + 3 start = url.find('://') + 3
end = url.find('/', start) end = url.find('/', start)
return url[start:end] if end!= -1 else None if end == -1:
end = len(url)
return url[start:end].strip()
def merge_and_filter(): # 设置固定的文件夹路径
folder_path = 'playlist'
# 确保文件夹路径存在
if not os.path.isdir(folder_path):
print("指定的文件夹不存在。")
sys.exit()
# 遍历文件夹中的所有.txt文件
for filename in os.listdir(folder_path):
if filename.endswith('.txt'):
file_path = os.path.join(folder_path, filename)
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines() lines = file.readlines()
total_lines = len(lines) # 准备写回文件
with open(file_path, 'w', encoding='utf-8') as output_file:
# 处理输入文件中的数据并进行检测 # 使用 tqdm 显示进度条
with open(output_file_path, 'a', encoding='utf-8') as output_file: for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"):
for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'):
if 'genre' in line:
output_file.write(line)
continue
parts = line.split(',', 1) parts = line.split(',', 1)
if len(parts) == 2: if len(parts) >= 2:
channel_name, url = parts channel_name, url = parts
channel_name = channel_name.strip() channel_name = channel_name.strip()
url = url.strip() url = url.strip()
ip_key = get_ip_key(url) ip_key = get_ip_key(url)
if ip_key and ip_key in detected_ips:
# 检查IP或域名是否已经被检测过
if ip_key in detected_ips:
# 如果之前检测成功,则写入该行
if detected_ips[ip_key]['status'] == 'ok': if detected_ips[ip_key]['status'] == 'ok':
output_file.write(line) output_file.write(line)
elif ip_key: continue # 无论之前检测结果如何,都不重新检测
# 初始化帧计数器和成功标志
frame_count = 0
success = False
# 尝试打开视频流
cap = cv2.VideoCapture(url) cap = cv2.VideoCapture(url)
start_time = time.time() start_time = time.time()
frame_count = 0 while (time.time() - start_time) < 8:
while frame_count < 50 and (time.time() - start_time) < 3:
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
break break
frame_count += 1 frame_count += 1
# 如果在8秒内读取到1帧以上,设置成功标志
if frame_count >= 1:
success = True
break
cap.release() cap.release()
if frame_count >= 50:
# 根据检测结果更新字典
if success:
detected_ips[ip_key] = {'status': 'ok'} detected_ips[ip_key] = {'status': 'ok'}
output_file.write(line) output_file.write(line)
else: else:
detected_ips[ip_key] = {'status': 'fail'} detected_ips[ip_key] = {'status': 'fail'}
# 合并任意字符加上网络收集.txt 的文件 # 打印检测结果
all_files = [f for f in os.listdir(os.getcwd()) if f.endswith('网络收集.txt')]
with open(output_file_path, 'a', encoding='utf-8') as main_output:
for file_name in all_files:
if file_name!= output_file_path:
with open(file_name, 'r', encoding='utf-8') as other_file:
content = other_file.read()
if content:
main_output.write('\n')
main_output.write(content)
detected_ips = {}
merge_and_filter()
for ip_key, result in detected_ips.items(): for ip_key, result in detected_ips.items():
print(f"IP Key: {ip_key}, Status: {result['status']}") print(f"IP Key: {ip_key}, Status: {result['status']}")
######################################################################################################################
######################################################################################################################
# 获取远程直播源文件,打开文件并输出临时文件
url = "https://raw.githubusercontent.com/frxz751113/AAAAA/refs/heads/main/IPTV/%E6%B1%87%E6%B1%87.txt" #源采集地址
r = requests.get(url)
open('综合源.txt','wb').write(r.content) #打开源文件并临时写入
#简体转繁体#
#简体转繁体
# 创建一个OpenCC对象,指定转换的规则为繁体字转简体字
converter = OpenCC('t2s.json')#繁转简
#converter = OpenCC('s2t.json')#简转繁
# 打开txt文件
with open('综合源.txt', 'r', encoding='utf-8') as file:
traditional_text = file.read()
# 进行繁体字转简体字的转换
simplified_text = converter.convert(traditional_text)
# 将转换后的简体字写入txt文件
with open('综合源.txt', 'w', encoding='utf-8') as file:
file.write(simplified_text)
#任务结束,删除不必要的过程文件#
files_to_remove = ['组播源.txt', "TW.txt", "a.txt", "主.txt", "b.txt", "b1.txt", "港澳.txt"]
for file in files_to_remove:
if os.path.exists(file):
os.remove(file)
else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file} 不存在,跳过删除。")
print("任务运行完毕,分类频道列表可查看文件夹内综合源.txt文件!")
######################################################################################################################
######################################################################################################################
######################################################################################################################
######################################################################################################################
######################################################################################################################
# 合并自定义频道文件,优选源整理
# 假设filter_files是一个自定义函数,它返回playlist目录下所有.txt文件的路径列表
def filter_files(directory, extension):
return [f for f in os.listdir(directory) if f.endswith(extension)]
# 获取playlist目录下的所有.txt文件
files = filter_files('playlist', '.txt')
# 打开输出文件
with open("4.txt", "w", encoding="utf-8") as output:
for file_path in files:
with open(os.path.join('playlist', file_path), 'r', encoding="utf-8") as file:
content = file.read()
output.write(content + '\n\n')
print("电视频道成功写入")
#################文本排序
# 打开原始文件读取内容,并写入新文件
with open('4.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 定义一个函数,用于提取每行的第一个数字
def extract_first_number(line):
match = re.search(r'\d+', line)
return int(match.group()) if match else float('inf')
# 对列表中的行进行排序
# 按照第一个数字的大小排列,如果不存在数字则按中文拼音排序
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
# 将排序后的行写入新的utf-8编码的文本文件,文件名基于原文件名
output_file_path = "sorted_" + os.path.basename(file_path)
# 写入新文件
with open('5.txt', "w", encoding="utf-8") as file:
for line in sorted_lines:
file.write(line)
print(f"文件已排序并保存为: {output_file_path}")
import cv2
import time
from tqdm import tqdm
# 初始化酒店源字典
detected_ips = {}
# 存储文件路径
file_path = "5.txt"
output_file_path = "2.txt"
def get_ip_key(url):
"""从URL中提取IP地址,并构造一个唯一的键"""
# 找到'//'到第三个'.'之间的字符串
start = url.find('://') + 3 # '://'.length 是 3
end = start
dot_count = 0
while dot_count < 3:
end = url.find('.', end)
if end == -1: # 如果没有找到第三个'.',就结束
break
dot_count += 1
return url[start:end] if dot_count == 3 else None
# 打开输入文件和输出文件
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 获取总行数用于进度条
total_lines = len(lines)
# 写入通过检测的行到新文件
with open(output_file_path, 'w', encoding='utf-8') as output_file:
# 使用tqdm显示进度条
for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'):
# 检查是否包含 'genre'
if 'genre' in line:
output_file.write(line)
continue
# 分割频道名称和URL,并去除空白字符
parts = line.split(',', 1)
if len(parts) == 2:
channel_name, url = parts
channel_name = channel_name.strip()
url = url.strip()
# 构造IP键
ip_key = get_ip_key(url)
if ip_key and ip_key in detected_ips:
# 如果IP键已存在,根据之前的结果决定是否写入新文件
if detected_ips[ip_key]['status'] == 'ok':
output_file.write(line)
elif ip_key: # 新IP键,进行检测
# 进行检测
cap = cv2.VideoCapture(url)
start_time = time.time()
frame_count = 0
# 尝试捕获10秒内的帧
while frame_count < 200 and (time.time() - start_time) < 10:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# 释放资源
cap.release()
# 根据捕获的帧数判断状态并记录结果
if frame_count >= 200: #10秒内超过230帧则写入
detected_ips[ip_key] = {'status': 'ok'}
output_file.write(line) # 写入检测通过的行
else:
detected_ips[ip_key] = {'status': 'fail'}
# 打印酒店源
for ip_key, result in detected_ips.items():
print(f"IP Key: {ip_key}, Status: {result['status']}")
########################################################################################################################################################################################
################################################################定义关键词分割规则
def check_and_write_file(input_file, output_file, keywords):
# 使用 split(', ') 来分割关键词
keywords_list = keywords.split(', ')
pattern = '|'.join(re.escape(keyword) for keyword in keywords_list)
# 读取输入文件并提取包含关键词的行
extracted_lines = []
with open(input_file, 'r', encoding='utf-8') as file:
for line in file:
if "genre" not in line:
if re.search(pattern, line):
extracted_lines.append(line)
# 如果至少提取到一行,写入头部信息和提取的行到输出文件
if extracted_lines:
with open(output_file, 'w', encoding='utf-8') as out_file:
out_file.write(f"{keywords_list[0]},#genre#\n") # 写入头部信息
out_file.writelines(extracted_lines) # 写入提取的行
# 获取头部信息的大小
header_size = len(f"{keywords_list[0]},#genre#\n")
# 检查文件的总大小
file_size = os.path.getsize(output_file)
# 如果文件大小小于30字节(假设的最小文件大小),删除文件
if file_size < 200:
os.remove(output_file)
print(f"文件只包含头部信息,{output_file} 已被删除。")
else:
print(f"文件已提取关键词并保存为: {output_file}")
else:
print(f"未提取到关键词,不创建输出文件 {output_file}")
# 按类别提取关键词并写入文件
check_and_write_file('2.txt', 'a.txt', keywords="央视频道, CCTV, CHC, 全球大片, 星光院线, 8K, 4K, 4k")
check_and_write_file('2.txt', 'b.txt', keywords="卫视频道, 卫视, 凤凰, 星空")
check_and_write_file('2.txt', 'c0.txt', keywords="组播剧场, 第一剧场, 怀旧剧场, 风云音乐, 风云剧场, 欢笑剧场, 都市剧场, 高清电影, 家庭影院, 动作电影, 影迷, 峨眉, 重温, 女性, 地理")
check_and_write_file('2.txt', 'c.txt', keywords="组播剧场, 爱动漫, SiTV, 爱怀旧, 爱经典, 爱科幻, 爱青春, 爱悬疑, 爱幼教, 爱院线")
check_and_write_file('2.txt', 'd.txt', keywords="北京频道, 北京")
###############################################################################################################################################################################################################################
##############################################################对生成的文件进行合并
file_contents = []
file_paths = ["a.txt", "b.txt", "c0.txt", "c.txt", "d.txt"] # 替换为实际的文件路径列表
for file_path in file_paths:
if os.path.exists(file_path):
with open(file_path, 'r', encoding="utf-8") as file:
content = file.read()
file_contents.append(content)
else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file_path} 不存在,跳过")
# 写入合并后的文件
with open("去重.txt", "w", encoding="utf-8") as output:
output.write('\n'.join(file_contents))
###############################################################################################################################################################################################################################
##############################################################对生成的文件进行网址及文本去重复,避免同一个频道出现在不同的类中
def remove_duplicates(input_file, output_file): def remove_duplicates(input_file, output_file):
# 用于存储已经遇到的URL和包含genre的行 # 用于存储已经遇到的URL和包含genre的行
seen_urls = set() seen_urls = set()
@@ -522,186 +485,94 @@ def remove_duplicates(input_file, output_file):
f.writelines(output_lines) f.writelines(output_lines)
print("去重后的行数:", len(output_lines)) print("去重后的行数:", len(output_lines))
# 使用方法 # 使用方法
remove_duplicates('网络收集.txt', '网络收集.txt') remove_duplicates('去重.txt', '分类.txt')
#从整理好的文本中进行特定关键词替换以规范频道名#
for line in fileinput.input("分类.txt", inplace=True): #打开临时文件原地替换关键字
line = line.replace("CCTV1,", "CCTV1-综合,")
line = line.replace("CCTV2,", "CCTV2-财经,")
line = line.replace("CCTV3,", "CCTV3-综艺,")
line = line.replace("CCTV4,", "CCTV4-国际,")
line = line.replace("CCTV5,", "CCTV5-体育,")
line = line.replace("CCTV5+,", "CCTV5-体育plus,")
line = line.replace("CCTV6,", "CCTV6-电影,")
line = line.replace("CCTV7,", "CCTV7-军事,")
line = line.replace("CCTV8,", "CCTV8-电视剧,")
line = line.replace("CCTV9,", "CCTV9-纪录,")
line = line.replace("CCTV10,", "CCTV10-科教,")
line = line.replace("CCTV11,", "CCTV11-戏曲,")
line = line.replace("CCTV11+,", "CCTV11-戏曲,")
line = line.replace("CCTV12,", "CCTV12-社会与法,")
line = line.replace("CCTV13,", "CCTV13-新闻,")
line = line.replace("CCTV14,", "CCTV14-少儿,")
line = line.replace("CCTV15,", "CCTV15-音乐,")
line = line.replace("CCTV16,", "CCTV16-奥林匹克,")
line = line.replace("CCTV17,", "CCTV17-农业农村,")
line = line.replace("CHC", "")
print(line, end="")
# 打开文档并读取所有行
with open('分类.txt', 'r', encoding="utf-8") as file:
######################连通性检测
import requests
import time
import cv2
from urllib.parse import urlparse
from tqdm import tqdm
# 测试HTTP连接并尝试下载数据
def test_connectivity_and_download(url, initial_timeout=1, retry_timeout=1):
parsed_url = urlparse(url)
if parsed_url.scheme not in ['http', 'https']:
# 非HTTP(s)协议,尝试RTSP检测
return test_rtsp_connectivity(url, retry_timeout)
else:
# HTTP(s)协议,使用原始方法
try:
with requests.get(url, stream=True, timeout=initial_timeout) as response:
if response.status_code == 200:
start_time = time.time()
while time.time() - start_time < initial_timeout:
chunk = response.raw.read(51200) # 尝试下载1KB数据
if chunk:
return True # 成功下载数据
except requests.RequestException as e:
print(f"请求异常: {e}")
pass #这行删掉则会在下载不到数据流的时候进行连通性测试
return False # 默认返回False
print("/" * 80)
# 测试RTSP连接并尝试读取流
def test_rtsp_connectivity(url, timeout=3):
cap = cv2.VideoCapture(url)
if not cap.isOpened():
return False
start_time = time.time()
while time.time() - start_time < timeout:
ret, _ = cap.read()
if ret:
return True # 成功读取帧
cap.release()
return False
# 主函数
def main(输入, 输出):
with open(输入, "r", encoding="utf-8") as source_file:
lines = source_file.readlines()
results = []
for line_number, line in enumerate(tqdm(lines, desc="检测中")):
parts = line.strip().split(",")
if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空
channel_name, channel_url = parts
try:
is_valid = test_connectivity_and_download(channel_url)
except Exception as e:
print(f"检测URL {channel_url} 时发生错误: {e}")
is_valid = False # 将异常的URL视为无效
status = "有效" if is_valid else "无效"
if "genre" in line.lower() or status == "有效":
results.append((channel_name.strip(), channel_url.strip(), status))
# 写入文件
with open(输出, "w", encoding="utf-8") as output_file:
for channel_name, channel_url, status in results:
output_file.write(f"{channel_name},{channel_url}\n")
print(f"任务完成, 有效源数量: {len([x for x in results if x[2] == '有效'])}, 无效源数量: {len([x for x in results if x[2] == '无效'])}")
if __name__ == "__main__":
输入 = "网络收集.txt" #input('请输入utf-8编码的直播源文件路径:')
输出 = "网络收集.txt"
main(输入, 输出)
import re
from pypinyin import lazy_pinyin
# 打开一个utf-8编码的文本文件
with open("网络收集.txt", "r", encoding="utf-8") as file:
# 读取所有行并存储到列表中
lines = file.readlines() lines = file.readlines()
# 定义一个函数,用于提取每行的第一个数字 # 使用列表来存储唯一的行的顺序
def extract_first_number(line): unique_lines = []
match = re.search(r'\d+', line) seen_lines = set()
return int(match.group()) if match else float('inf') # 遍历每一行,如果是新的就加入unique_lines
# 对列表中的行进行排序,按照第一个数字的大小排列,其余行按中文排序
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
# 将排序后的行写入新的utf-8编码的文本文件
with open("网络收集.txt", "w", encoding="utf-8") as file:
for line in sorted_lines:
file.write(line)
def parse_file(input_file_path, output_file_name): #
# 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分
ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)')
# 用于存储每个IP或域名及其对应的行列表
ip_or_domain_to_lines = {}
# 用于生成分类名的字母和数字计数器
alphabet_counter = 0 # 字母计数器,从0开始
number_counter = 1 # 数字计数器,从1开始
# 读取原始文件内容
with open(input_file_path, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip()
# 如果行是分类标签行,则跳过
if ",#genre#" in line:
continue
# 检查行是否包含IP或域名
match = ip_or_domain_pattern.search(line)
if match:
# 提取匹配到的IP或域名
matched_text = match.group(1)
# 去除IP或域名后的剩余部分,只保留匹配到的IP或域名
ip_or_domain = matched_text.split('://')[-1].split('/')[0].split('::')[0]
# 将行添加到对应的IP或域名列表中
if ip_or_domain not in ip_or_domain_to_lines:
ip_or_domain_to_lines[ip_or_domain] = []
ip_or_domain_to_lines[ip_or_domain].append(line)
# 过滤掉小于1000字节的IP或域名段
filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items()
if sum(len(line) for line in lines) >= 250} # 过滤掉小于1000字节的IP或域名段
# 如果没有满足条件的IP或域名段,则不生成文件
if not filtered_ip_or_domain_to_lines:
print("没有满足条件的IP或域名段,不生成文件。")
return
# 合并所有满足条件的IP或域名的行到一个文件
############################################################
with open(output_file_name, 'w', encoding='utf-8') as output_file: #output_
for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items():
# 检查是否需要递增数字计数器
if alphabet_counter >= 26:
number_counter += 1
alphabet_counter = 0 # 重置字母计数器
######################################################
# 生成分类名
genre_name = chr(65 + alphabet_counter)# + str(number_counter)
output_file.write(f"港澳{genre_name}组,#genre#\n")
for line in lines: for line in lines:
output_file.write(line + '\n') if line not in seen_lines:
output_file.write('\n') # 在每个小段后添加一个空行作为分隔 unique_lines.append(line)
alphabet_counter += 1 # 递增字母计数器 seen_lines.add(line)
# 调用函数并传入文件路径和输出文件名
parse_file('网络收集.txt', '网络收集.txt') # 将唯一的行写入第一个文件
with open('组播优选.txt', 'w', encoding="utf-8") as file:
for line in unique_lines:
file.write(line) # 确保每行后面有换行符 + '\n'
# 将唯一的行追加到第二个文件
#with open('综合源.txt', 'a', encoding="utf-8") as file:
#for line in unique_lines:
#file.write(line) # 确保每行后面有换行符 + '\n'
# 定义要排除的关键词列表
excluded_keywords = ['CCTV', '卫视', '关键词3']
# 定义例外关键词列表,即使它们在排除列表中,也应该被保留
exception_keywords = ['4K', '8K', '例外关键词']
# 打开原始文本文件并读取内容
with open('组播优选.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 过滤掉包含关键词的行,但是允许含有例外关键词的行
filtered_lines = []
for line in lines:
# 检查行是否包含排除关键词
contains_excluded = any(keyword in line for keyword in excluded_keywords)
# 检查行是否包含例外关键词
contains_exception = any(keyword in line for keyword in exception_keywords)
# 如果行包含排除关键词,但是不包含例外关键词,则过滤掉该行
if contains_excluded and not contains_exception:
continue
else:
# 如果行不包含排除关键词,或者同时包含排除关键词和例外关键词,则保留该行
filtered_lines.append(line)
# 将过滤后的内容追加写入新的文本文件
with open('综合源.txt', 'a', encoding='utf-8') as file:
file.writelines(filtered_lines)
#从整理好的文本中进行特定关键词替换以规范频道名#
for line in fileinput.input("综合源.txt", inplace=True): #打开临时文件原地替换关键字
line = line.replace("CCTV164K", "CCTV16-4K")
line = line.replace("CCTV4K", "CCTV-4K")
print(line, end="")
################################################################################################任务结束,删除不必要的过程文件
files_to_remove = ['去重.txt', '分类.txt', "2.txt", "4.txt", "5.txt", "a.txt", "b.txt", "c0.txt", "c.txt", "d.txt"]
import datetime
now = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
current_time = now.strftime("%Y/%m/%d %H:%M")
# 打开文本文件并将时间添加到开头
file_path = "网络收集.txt"
with open(file_path, 'r+', encoding='utf-8') as f:
content = f.read()
f.seek(0, 0)
f.write(f'{current_time}更新,#genre#\n')
f.write(f'虚情的爱,https://vd2.bdstatic.com/mda-mi1dd05gmhwejdwn/sc/cae_h264/1630576203346678103/mda-mi1dd05gmhwejdwn.mp4\n{content}')
################################################################################################任务结束,删除不必要的过程文件
files_to_remove = ["2.txt", "汇总.txt"]
for file in files_to_remove: for file in files_to_remove:
if os.path.exists(file): if os.path.exists(file):
os.remove(file) os.remove(file)
else: # 如果文件不存在则提示异常并打印提示信息 else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file} 不存在跳过删除。") print(f"文件 {file} 不存在,跳过删除。")
print("任务运行完毕频道列表可查看文件夹内源.txt文件!") print("任务运行完毕,分类频道列表可查看文件夹内综合源.txt文件!")
# 打印酒店源
for ip_key, result in detected_ips.items():
print(f"IP Key: {ip_key}, Status: {result['status']}")