Files
iptv/py/收集.py
T
2024-08-26 17:14:56 +08:00

359 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from lxml import etree
import time
from datetime import datetime, timedelta # 确保 timedelta 被导入
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from concurrent.futures import ThreadPoolExecutor
import requests
import re
import os
import threading
from queue import Queue
import queue
from datetime import datetime
import replace
import fileinput
from tqdm import tqdm
from pypinyin import lazy_pinyin
from opencc import OpenCC
import base64
import cv2
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from translate import Translator # 导入Translator类,用于文本翻译
# 定义txt文件的URL列表
urls = [
'http://124.223.177.85:88/svip/%E7%9B%B4%E6%92%AD%E6%8C%BA%E5%BF%AB.txt',
'',
'https://jihulab.com/jiayan/tv/-/raw/main/zhibo.txt?ref_type=heads',
'https://raw.githubusercontent.com/kimwang1978/tvbox/main/%E5%A4%A9%E5%A4%A9%E5%BC%80%E5%BF%83/lives/%E2%91%AD%E5%BC%80%E5%BF%83%E7%BA%BF%E8%B7%AF.txt',#################
'https://raw.githubusercontent.com/pxiptv/live/main/iptv.txt', #ADD 【2024-08-02 16:48:40】#每日更新1次
'https://notabug.org/vnjd/yydu/raw/master/yyfug.txt', #ADD 【2024-08-06】
'https://pan.beecld.com/f/OXMcA/%E6%98%A5%E8%B5%A2%E5%A4%A9%E4%B8%8B.txt', #ADD 【2024-08-06】
'https://raw.githubusercontent.com/yuanzl77/IPTV/main/live.txt', #ADD 2024-08-05 每天更新一次,量太多转到blacklist处理
'http://mywlkj.ddns.net:5212/f/EErCL/%E5%8F%B0%E6%B9%BE%E7%94%B5%E8%A7%86TV.txt', #ADD 【2024-08-10】
'https://raw.githubusercontent.com/Guovin/TV/gd/result.txt', #每天自动更新1次
'https://raw.githubusercontent.com/ssili126/tv/main/itvlist.txt', #每天自动更新1次
'https://raw.githubusercontent.com/mlvjfchen/TV/main/iptv_list.txt', #每天早晚各自动更新1次 2024-06-03 17:50
'https://raw.githubusercontent.com/fenxp/iptv/main/live/ipv6.txt', #1小时自动更新1次11:11 2024/05/13
'https://raw.githubusercontent.com/fenxp/iptv/main/live/tvlive.txt', #1小时自动更新1次11:11 2024/05/13
'https://raw.githubusercontent.com/zwc456baby/iptv_alive/master/live.txt', #每天自动更新1次 2024-06-24 16:37
'http://ttkx.live:55/lib/kx2024.txt', #ADD 2024-08-11 每天更新3次
'https://raw.githubusercontent.com/vbskycn/iptv/master/tv/iptv4.txt', #ADD 2024-08-12 每天更新3次
]
# 合并文件的函数
def merge_txt_files(urls, output_filename='汇总.txt'):
try:
# 打开文件准备写入
with open(output_filename, 'w', encoding='utf-8') as outfile:
for url in urls:
try:
# 发送HTTP GET请求
response = requests.get(url)
# 检查请求是否成功
response.raise_for_status()
# 读取内容并写入输出文件
outfile.write(response.text + '\n')
except requests.RequestException as e:
# 打印错误信息并继续下一个循环
print(f'Error downloading {url}: {e}')
except IOError as e:
# 处理文件写入错误
print(f'Error writing to file: {e}')
# 调用函数
merge_txt_files(urls)
# 打开文本文件并读取内容
def process_file(file_path, encodings=['utf-8', 'gbk', 'latin1']):
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as file:
lines = file.readlines()
# 如果没有异常发生,说明找到了正确的编码
break
except UnicodeDecodeError:
# 如果出现解码错误,尝试下一种编码
continue
# 处理每一行,去除每一行中第一个$及其后的所有内容
processed_lines = [line.split('$', 1)[0].rstrip('\n') + '\n' for line in lines]
# 将处理后的内容写回到文件,使用找到的正确编码
with open(file_path, 'w', encoding=encoding) as file:
file.writelines(processed_lines)
# 调用函数并传入文件路径
file_path = '汇总.txt' # 替换为你的文件路径
process_file(file_path)
########################################################################################################
def remove_duplicates(input_file, output_file):
# 用于存储已经遇到的URL和包含genre的行
seen_urls = set()
seen_lines_with_genre = set()
# 用于存储最终输出的行
output_lines = []
# 打开输入文件并读取所有行
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
print("去重前的行数:", len(lines))
# 遍历每一行
for line in lines:
# 使用正则表达式查找URL和包含genre的行,默认最后一行
urls = re.findall(r'[https]?[http]?[rtsp]?[rtmp]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', line)
genre_line = re.search(r'\bgenre\b', line, re.IGNORECASE) is not None
# 如果找到URL并且该URL尚未被记录
if urls and urls[0] not in seen_urls:
seen_urls.add(urls[0])
output_lines.append(line)
# 如果找到包含genre的行,无论是否已被记录,都写入新文件
if genre_line:
output_lines.append(line)
# 将结果写入输出文件
with open(output_file, 'w', encoding='utf-8') as f:
f.writelines(output_lines)
print("去重后的行数:", len(output_lines))
# 使用方法
remove_duplicates('汇总.txt', '汇总.txt')
# 字符替换字典
replacement_dict = {
"CCTV-1高清测试": "",
"CCTV-2高清测试": "",
"CCTV-7高清测试": "",
"CCTV-10高清测试": "",
"中央": "CCTV",
"高清""": "",
"HD": "",
"标清": "",
"-": "",
"超清": "",
"频道": "",
"-": "",
"-": "",
"CCTV_": "CCTV",
" ": "",
"CCTV风云剧场": "风云剧场",
"CCTV第一剧场": "第一剧场",
"CCTV怀旧剧场": "怀旧剧场",
"熊猫影院": "熊猫电影",
"熊猫爱生活": "熊猫生活",
"爱宠宠物": "宠物生活",
"[ipv6]": "",
"专区": "",
"卫视超": "卫视",
"CCTV风云剧场": "风云剧场",
"CCTV第一剧场": "第一剧场",
"CCTV怀旧剧场": "怀旧剧场",
"IPTV": "",
"PLUS": "+",
"": "+",
"(": "",
")": "",
"CAV": "",
"美洲": "",
"北美": "",
"12M": "",
"高清测试CCTV-1": "",
"高清测试CCTV-2": "",
"高清测试CCTV-7": "",
"高清测试CCTV-10": "",
"LD": "",
"HEVC20M": "",
"S,": ",",
"测试": "",
"CCTW": "CCTV",
"试看": "",
"测试": "",
"测试cctv": "CCTV",
"CCTV1综合": "CCTV1",
"CCTV2财经": "CCTV2",
"CCTV3综艺": "CCTV3",
"CCTV4国际": "CCTV4",
"CCTV4中文国际": "CCTV4",
"CCTV4欧洲": "CCTV4",
"CCTV5体育": "CCTV5",
"CCTV5+体育": "CCTV5+",
"CCTV6电影": "CCTV6",
"CCTV7军事": "CCTV7",
"CCTV7军农": "CCTV7",
"CCTV7农业": "CCTV7",
"CCTV7国防军事": "CCTV7",
"CCTV8电视剧": "CCTV8",
"CCTV8影视": "CCTV8",
"CCTV8纪录": "CCTV9",
"CCTV9记录": "CCTV9",
"CCTV9纪录": "CCTV9",
"CCTV10科教": "CCTV10",
"CCTV11戏曲": "CCTV11",
"CCTV12社会与法": "CCTV12",
"CCTV13新闻": "CCTV13",
"CCTV新闻": "CCTV13",
"CCTV14少儿": "CCTV14",
"央视14少儿": "CCTV14",
"CCTV少儿超": "CCTV14",
"CCTV15音乐": "CCTV15",
"CCTV音乐": "CCTV15",
"CCTV16奥林匹克": "CCTV16",
"CCTV17农业农村": "CCTV17",
"CCTV17军农": "CCTV17",
"CCTV17农业": "CCTV17",
"CCTV5+体育赛视": "CCTV5+",
"CCTV5+赛视": "CCTV5+",
"CCTV5+体育赛事": "CCTV5+",
"CCTV5+赛事": "CCTV5+",
"CCTV5+体育": "CCTV5+",
"CCTV5赛事": "CCTV5+",
"凤凰中文台": "凤凰中文",
"凤凰资讯台": "凤凰资讯",
"(CCTV4K测试)": "CCTV4K",
"上海东方卫视": "上海卫视",
"东方卫视": "上海卫视",
"内蒙卫视": "内蒙古卫视",
"福建东南卫视": "东南卫视",
"广东南方卫视": "南方卫视",
"湖南金鹰卡通": "金鹰卡通",
"炫动卡通": "哈哈炫动",
"卡酷卡通": "卡酷少儿",
"卡酷动画": "卡酷少儿",
"BRTVKAKU少儿": "卡酷少儿",
"优曼卡通": "优漫卡通",
"优曼卡通": "优漫卡通",
"嘉佳卡通": "佳嘉卡通",
"世界地理": "地理世界",
"CCTV世界地理": "地理世界",
"BTV北京卫视": "北京卫视",
"BTV冬奥纪实": "冬奥纪实",
"东奥纪实": "冬奥纪实",
"卫视台": "卫视",
"湖南电视台": "湖南卫视",
"少儿科教": "少儿",
"影视剧": "影视",
"电视剧": "影视",
"CCTV1CCTV1": "CCTV1",
"CCTV2CCTV2": "CCTV2",
"CCTV7CCTV7": "CCTV7",
"CCTV10CCTV10": "CCTV10"
}
# 原始文件名
input_filename = '汇总.txt'
# 新文件名
output_filename = '2.txt'
# 打开原始文件和新文件
with open(input_filename, 'r') as infile, open(output_filename, 'w') as outfile:
# 逐行读取文件
for line in infile:
# 分割每一行
parts = line.strip().split(',')
# 对逗号前面的部分进行字符替换
modified_part = ''.join(replacement_dict.get(char, char) for char in parts[0])
# 写入新文件
outfile.write(f"{modified_part},{parts[1]}\n")
print(f"文件 {input_filename} 已处理,结果保存在 {output_filename}")
########################################################################################################
import re
import os
def filter_lines(file_path):
with open(input_file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
filtered_lines = []
for line in lines:
if ',' in line:
if 'epg' not in line and 'mitv' not in line and 'udp' not in line and 'rtp' not in line and 'tsfile' not in line and '/hls/' not in line and '嘻嘻嘻' not in line \
and 'P2p' not in line and 'p2p' not in line and 'p3p' not in line and 'P2P' not in line and 'P3p' not in line and 'P3P' not in line and '' not in line and '' not in line and '春节' not in line:
filtered_lines.append(line)
return filtered_lines
def write_filtered_lines(output_file_path, filtered_lines):
with open(output_file_path, 'w', encoding='utf-8') as output_file:
output_file.writelines(filtered_lines)
if __name__ == "__main__":
input_file_path = "2.txt"
output_file_path = "2.txt"
filtered_lines = filter_lines(input_file_path)
write_filtered_lines(output_file_path, filtered_lines)
print(f"文件已过滤完成")
import re
from pypinyin import lazy_pinyin
# 打开一个utf-8编码的文本文件
with open("2.txt", "r", encoding="utf-8") as file:
# 读取所有行并存储到列表中
lines = file.readlines()
# 定义一个函数,用于提取每行的第一个数字
def extract_first_number(line):
match = re.search(r'\d+', line)
return int(match.group()) if match else float('inf')
# 对列表中的行进行排序,按照第一个数字的大小排列,其余行按中文排序
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
# 将排序后的行写入新的utf-8编码的文本文件
with open("2.txt", "w", encoding="utf-8") as file:
for line in sorted_lines:
file.write(line)
import re
def parse_file(input_file_path, output_file_name):
# 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分
ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)')
# 用于存储每个IP或域名及其对应的行列表
ip_or_domain_to_lines = {}
# 读取原始文件内容
with open(input_file_path, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip()
# 如果行是分类标签行,则跳过
if ",#genre#" in line:
continue
# 检查行是否包含IP或域名
match = ip_or_domain_pattern.search(line)
if match:
# 提取匹配到的IP或域名
matched_text = match.group(1)
# 去除IP或域名后的剩余部分,只保留匹配到的IP或域名
ip_or_domain = matched_text.split('://')[-1].split('/')[0].split('::')[0]
# 将行添加到对应的IP或域名列表中
if ip_or_domain not in ip_or_domain_to_lines:
ip_or_domain_to_lines[ip_or_domain] = []
ip_or_domain_to_lines[ip_or_domain].append(line)
# 过滤掉小于5000字节的IP或域名段
filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items()
if sum(len(line) for line in lines) >= 3000}
# 如果没有满足条件的IP或域名段,则不生成文件
if not filtered_ip_or_domain_to_lines:
print("没有满足条件的IP或域名段,不生成文件。")
return
# 合并所有满足条件的IP或域名的行到一个文件
with open(output_file_name, 'w', encoding='utf-8') as output_file:
for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items():
# 写入IP或域名及其对应的行到输出文件
output_file.write(f"{ip_or_domain},#genre#\n")
for line in lines:
output_file.write(line + '\n')
output_file.write('\n') # 在每个小段后添加一个空行作为分隔
# 调用函数并传入文件路径和输出文件名
parse_file('2.txt', '网络收集.txt')
################################################################################################任务结束,删除不必要的过程文件
files_to_remove = ["2.txt", "汇总.txt"]
for file in files_to_remove:
if os.path.exists(file):
os.remove(file)
else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file} 不存在,跳过删除。")
print("任务运行完毕,分类频道列表可查看文件夹内综合源.txt文件!")