From ebd74e8721c2a6618d6be9f5877959aaf57cd286 Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:29:12 +0800 Subject: [PATCH] =?UTF-8?q?Delete=20py/=E7=BD=91=E7=BB=9C=E6=94=B6?= =?UTF-8?q?=E9=9B=86.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/网络收集.py | 795 ------------------------------------------------- 1 file changed, 795 deletions(-) delete mode 100644 py/网络收集.py diff --git a/py/网络收集.py b/py/网络收集.py deleted file mode 100644 index 3bad929..0000000 --- a/py/网络收集.py +++ /dev/null @@ -1,795 +0,0 @@ -from lxml import etree -import time -import datetime -from datetime import datetime, timedelta # 确保 timedelta 被导入 -import concurrent.futures -from selenium import webdriver -from selenium.webdriver.chrome.options import Options -from concurrent.futures import ThreadPoolExecutor -import requests -import re -import os -import threading -from queue import Queue -import queue -from datetime import datetime -import fileinput -from tqdm import tqdm -from pypinyin import lazy_pinyin -from opencc import OpenCC -import base64 -import cv2 -from bs4 import BeautifulSoup -from urllib.parse import urlparse -from translate import Translator # 导入Translator类,用于文本翻译 -## 定义txt文件的URL列表 -urls = [ - 'https://dimaston.github.io/live.m3u', #假m3u - 'https://raw.githubusercontent.com/gaotianliuyun/gao/master/list.txt', #暂时保留 - 'https://raw.githubusercontent.com/Fairy8o/IPTV/main/PDX-V4.txt', - 'https://raw.githubusercontent.com/ddhola/file/d7afb504b1ba4fef31813e1166cb892215a9c063/0609test', - 'https://raw.githubusercontent.com/vbskycn/iptv/2738b3bec8c298f57e0e2052b155846ab6ea3787/dsyy/hd.txt', - 'https://raw.githubusercontent.com/frxz751113/AAAAA/main/IPTV/TW.txt', - 'https://raw.githubusercontent.com/ljlfct01/ljlfct01.github.io/main/list.%E8%87%AA%E7%94%A8', - 'https://notabug.org/qcfree/TVBox-api/raw/main/live.txt', #通用源 - 'https://raw.githubusercontent.com/KAN314go/A/e81a1c22cd1b9f459bc363bd916c13133e235510/tv/%E5%AE%89%E5%8D%9A8K.txt', - 'https://gitlab.com/tvtg/vip/-/raw/main/log.txt', - 'https://raw.githubusercontent.com/frxz751113/IPTVzb1/main/%E7%BB%BC%E5%90%88%E6%BA%90.txt', - 'https://raw.githubusercontent.com/ssili126/tv/main/itvlist.txt', - 'https://raw.githubusercontent.com/Supprise0901/TVBox_live/main/live.txt', - 'https://raw.githubusercontent.com/Guovin/TV/gd/result.txt', - 'https://raw.githubusercontent.com/gaotianliuyun/gao/master/list.txt', - 'https://gitee.com/xxy002/zhiboyuan/raw/master/zby.txt', - 'https://raw.githubusercontent.com/mlvjfchen/TV/main/iptv_list.txt', - 'https://raw.githubusercontent.com/fenxp/iptv/main/live/tvlive.txt', - 'https://raw.githubusercontent.com/zwc456baby/iptv_alive/master/live.txt', - 'https://gitlab.com/p2v5/wangtv/-/raw/main/lunbo.txt', - 'https://raw.githubusercontent.com/PizazzGY/TVBox/main/live.txt', - 'https://gitcode.net/MZ011/BHJK/-/raw/master/BHZB1.txt', - 'https://raw.githubusercontent.com/vbskycn/iptv/master/tv/iptv4.txt', - 'https://raw.githubusercontent.com/junge3333/juds6/main/yszb1.txt', - 'https://raw.githubusercontent.com/zzmaze/iptv/main/iptv.txt', - 'https://raw.githubusercontent.com/kimwang1978/collect-tv-txt/main/others_output.txt', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '' -] -# 合并文件的函数 -def merge_txt_files(urls, output_filename='汇总.txt'): - try: - with open(output_filename, 'w', encoding='utf-8') as outfile: - for url in urls: - try: - response = requests.get(url) - response.raise_for_status() # 确保请求成功 - # 尝试将响应内容解码为UTF-8,如果失败则尝试其他编码 - try: - content = response.content.decode('utf-8') - except UnicodeDecodeError: - content = response.content.decode('gbk') # 尝试GBK编码 - outfile.write(content + '\n') - except requests.RequestException as e: - print(f'Error downloading {url}: {e}') - except IOError as e: - print(f'Error writing to file: {e}') - -# 调用函数 -merge_txt_files(urls) - - - - - - - -#简体转繁体 -# 创建一个OpenCC对象,指定转换的规则为繁体字转简体字 -converter = OpenCC('t2s.json')#繁转简 -#converter = OpenCC('s2t.json')#简转繁 -# 打开txt文件 -with open('汇总.txt', 'r', encoding='utf-8') as file: - traditional_text = file.read() -# 进行繁体字转简体字的转换 -simplified_text = converter.convert(traditional_text) -# 将转换后的简体字写入txt文件 -with open('汇总.txt', 'w', encoding='utf-8') as file: - file.write(simplified_text) - - - -with open('汇总.txt', 'r', encoding="utf-8") as file: - # 读取所有行并存储到列表中 - lines = file.readlines() -#定义替换规则的字典对频道名替换 -replacements = { - "CCTV-1高清测试": "", - "CCTV-2高清测试": "", - "CCTV-7高清测试": "", - "CCTV-10高清测试": "", - "中央": "CCTV", - "高清""": "", - "HD": "", - "标清": "", - "amc": "AMC", - "CCTV1综合": "CCTV1", - "CCTV2财经": "CCTV2", - "CCTV3综艺": "CCTV3", - "国际": "", - "5体育": "5", - "6电影": "6", - "军农": "", - "8影视": "8", - "9纪录": "9", - "0科教": "0", - "2社会与法": "2", - "3新闻": "3", - "4少儿": "4", - "5音乐": "5", - "": "", - "": "", - "": "", - "": "", - "": "", - "": "", - "咪咕": "", - "": "", - "超清": "", - "频道": "", - "CCTV-": "CCTV", - "CCTV_": "CCTV", - " ": "", - "CCTV风云剧场": "风云剧场", - "CCTV第一剧场": "第一剧场", - "CCTV怀旧剧场": "怀旧剧场", - "熊猫影院": "熊猫电影", - "熊猫爱生活": "熊猫生活", - "爱宠宠物": "宠物生活", - "[ipv6]": "", - "专区": "", - "卫视超": "卫视", - "CCTV风云剧场": "风云剧场", - "CCTV第一剧场": "第一剧场", - "CCTV怀旧剧场": "怀旧剧场", - "IPTV": "", - "PLUS": "+", - "+": "+", - "(": "", - ")": "", - "CAV": "", - "美洲": "", - "北美": "", - "12M": "", - "高清测试CCTV-1": "", - "高清测试CCTV-2": "", - "高清测试CCTV-7": "", - "高清测试CCTV-10": "", - "LD": "", - "HEVC20M": "", - "S,": ",", - "测试": "", - "CCTW": "CCTV", - "试看": "", - "测试": "", - " ": "", - "测试cctv": "CCTV", - "CCTV1综合": "CCTV1", - "CCTV2财经": "CCTV2", - "CCTV3综艺": "CCTV3", - "CCTV4国际": "CCTV4", - "CCTV4中文国际": "CCTV4", - "CCTV4欧洲": "CCTV4", - "CCTV5体育": "CCTV5", - "CCTV5+体育": "CCTV5+", - "CCTV6电影": "CCTV6", - "CCTV7军事": "CCTV7", - "CCTV7军农": "CCTV7", - "CCTV7农业": "CCTV7", - "CCTV7国防军事": "CCTV7", - "CCTV8电视剧": "CCTV8", - "CCTV8影视": "CCTV8", - "CCTV8纪录": "CCTV9", - "CCTV9记录": "CCTV9", - "CCTV9纪录": "CCTV9", - "CCTV10科教": "CCTV10", - "CCTV11戏曲": "CCTV11", - "CCTV12社会与法": "CCTV12", - "CCTV13新闻": "CCTV13", - "CCTV新闻": "CCTV13", - "CCTV14少儿": "CCTV14", - "央视14少儿": "CCTV14", - "CCTV少儿超": "CCTV14", - "CCTV15音乐": "CCTV15", - "CCTV音乐": "CCTV15", - "CCTV16奥林匹克": "CCTV16", - "CCTV17农业农村": "CCTV17", - "CCTV17军农": "CCTV17", - "CCTV17农业": "CCTV17", - "CCTV5+体育赛视": "CCTV5+", - "CCTV5+赛视": "CCTV5+", - "CCTV5+体育赛事": "CCTV5+", - "CCTV5+赛事": "CCTV5+", - "CCTV5+体育": "CCTV5+", - "CCTV5赛事": "CCTV5+", - "凤凰中文台": "凤凰中文", - "凤凰资讯台": "凤凰资讯", - "(CCTV4K测试)": "CCTV4K", - "上海东方卫视": "上海卫视", - "东方卫视": "上海卫视", - "内蒙卫视": "内蒙古卫视", - "福建东南卫视": "东南卫视", - "广东南方卫视": "南方卫视", - "湖南金鹰卡通": "金鹰卡通", - "炫动卡通": "哈哈炫动", - "卡酷卡通": "卡酷少儿", - "卡酷动画": "卡酷少儿", - "BRTVKAKU少儿": "卡酷少儿", - "优曼卡通": "优漫卡通", - "优曼卡通": "优漫卡通", - "嘉佳卡通": "佳嘉卡通", - "世界地理": "地理世界", - "CCTV世界地理": "地理世界", - "BTV北京卫视": "北京卫视", - "BTV冬奥纪实": "冬奥纪实", - "东奥纪实": "冬奥纪实", - "卫视台": "卫视", - "湖南电视台": "湖南卫视", - "少儿科教": "少儿", - "影视剧": "影视", - "电视剧": "影视", - "CCTV1CCTV1": "CCTV1", - "CCTV2CCTV2": "CCTV2", - "CCTV7CCTV7": "CCTV7", - "CCTV10CCTV10": "CCTV10" -} -with open('汇总.txt', 'w', encoding='utf-8') as new_file: - for line in lines: - # 去除行尾的换行符 - line = line.rstrip('\n') - # 分割行,获取逗号前的字符串 - parts = line.split(',', 1) - if len(parts) > 0: - # 替换逗号前的字符串 - before_comma = parts[0] - for old, new in replacements.items(): - before_comma = before_comma.replace(old, new) - # 将替换后的逗号前部分和逗号后部分重新组合成一行,并写入新文件 - new_line = f'{before_comma},{parts[1]}\n' if len(parts) > 1 else f'{before_comma}\n' - new_file.write(new_line) - - - - - - -# 打开文本文件进行读取 -def read_and_process_file(input_filename, output_filename, encodings=['utf-8', 'gbk']): - for encoding in encodings: - try: - with open(input_filename, 'r', encoding=encoding) as file: - lines = file.readlines() - break - except UnicodeDecodeError: - continue - else: - raise ValueError(f"Cannot decode file '{input_filename}' with any of the provided encodings") - - with open(output_filename, 'w', encoding='utf-8') as outfile: - for line in lines: - if '$' in line: - processed_line = line.split('$')[0].rstrip('\n') - outfile.write(processed_line + '\n') - else: - outfile.write(line) - -# 调用函数 -read_and_process_file('汇总.txt', '汇总.txt') # 修改输出文件名以避免覆盖原始文件 - -###################################################################去重##################################### -def remove_duplicates(input_file, output_file): - # 用于存储已经遇到的URL和包含genre的行 - seen_urls = set() - seen_lines_with_genre = set() - # 用于存储最终输出的行 - output_lines = [] - # 打开输入文件并读取所有行 - with open(input_file, 'r', encoding='utf-8') as f: - lines = f.readlines() - print("去重前的行数:", len(lines)) - # 遍历每一行 - for line in lines: - # 使用正则表达式查找URL和包含genre的行,默认最后一行 - urls = re.findall(r'[https]?[http]?[rtsp]?[rtmp]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', line) - genre_line = re.search(r'\bgenre\b', line, re.IGNORECASE) is not None - # 如果找到URL并且该URL尚未被记录 - if urls and urls[0] not in seen_urls: - seen_urls.add(urls[0]) - output_lines.append(line) - # 如果找到包含genre的行,无论是否已被记录,都写入新文件 - if genre_line: - output_lines.append(line) - # 将结果写入输出文件 - with open(output_file, 'w', encoding='utf-8') as f: - f.writelines(output_lines) - print("去重后的行数:", len(output_lines)) -# 使用方法 -remove_duplicates('汇总.txt', '2.txt') - - - - - - -######################################################################################提取goodiptv -import re -import os -# 定义一个包含所有要排除的关键词的列表 -excluded_keywords = ['epg', 'mitv', 'udp', 'rtp', 'P2p', 'p2p', 'p3p', 'P2P', '新闻综合', 'P3p', 'jdshipin', '9930/qilu', 'gitcode.net', '151:99', '21dtv', 'txmov2', 'gcw.bdcdn', 'metshop', - 'shandong', 'goodiptv', '购物', '[', 'P3P', '腔', '曲', '//1', '/hls/', '春节', 'gat', '95.179', 'hlspull', 'github', 'lunbo', 'tw.ts138', '114:8278', '//tvb', 'extraott', - '22:8891', 'fanmingming', '43:22222', 'etv.xhgvip', 'free.xiptv', 'www.zhixun', 'xg.52sw'] #, '', '', '', '' - -# 定义一个包含所有要提取的关键词的列表 -extract_keywords = ['1905', '凤凰卫视', '人间卫视', '亚洲卫视', '香港卫视', '神乐', '翡翠台', '凤凰香港', '凤凰中文', '凤凰资讯', 'AXN', 'AMC', '香蕉', '电影台', '大爱', '东森', 'MTV', - '华视', '中天', '天良', '美亚', '星影', '纬来', '天映', '无线', '华剧台', '华丽台', '剧台', 'Movie', '八大', '采昌', '靖天', '民视', '探索', '旅游', '影视2', - '影视3', '中视', '豬哥亮', 'TVB', '公视', '寰宇', '戏剧', '靖', '龙华', '龙祥', '猪哥亮', '综艺', '新闻', '影迷', '影剧', '电视剧', '49号', '欢喜', '珍珠', '台视', '华视', - 'AMC电影台', 'BLOOM', 'CINEM', 'CMUSIC音乐台', 'CNBC', 'DWNEWS', 'Dbox', 'ELTV', 'FEST', 'GOOD2', 'HISTORY', 'HITS', 'HakkaTV', 'KOREA', 'LIFETIME', 'LISTENONSPOTIFY', 'LOVENATURE', - 'LUXE', 'MCE', 'Nickelodeon', 'PETCLUB', 'ROCK', 'ROCKACTION', 'TECHSTORM', 'TV5MONDE', 'TVBS', 'TiTV', 'VOA', 'elta', 'hokkotv', 'momo', 'nhkworld', 'nickjr', - '中华小当家', '中天娱乐', '互动英语', '亚洲旅游', '全球财经', '公视戏剧', '动漫', '动物星球', '动画台', '博斯', - '哆啦A梦', '壹新闻', '大立电视', '天良综合台', '幸福空间', '探案', '探索亚洲', '智林体育', '東森新聞', '环球电视', - '美亚C+', '超人', '镜新闻', '番薯'] - - -# 读取文件并处理每一行 -with open('2.txt', 'r', encoding='utf-8') as file: - lines = file.readlines() - - # 创建或打开一个输出文件用于写入处理后的数据 - with open('网络收集.txt', 'w', encoding='utf-8') as outfile: - for line in lines: - # 首先检查行是否包含任何提取关键词 - if any(keyword in line for keyword in extract_keywords): - # 如果包含提取关键词,进一步检查行是否不包含任何排除关键词 - if not any(keyword in line for keyword in excluded_keywords): - outfile.write(line) # 写入符合条件的行到文件 - - -############################################################### -import re -def parse_file(input_file_path, output_file_name): - # 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分 - ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)') - # 用于存储每个IP或域名及其对应的行列表 - ip_or_domain_to_lines = {} - # 读取原始文件内容 - with open(input_file_path, 'r', encoding='utf-8') as file: - for line in file: - line = line.strip() - # 如果行是分类标签行,则跳过 - if ",#genre#" in line: - continue - # 检查行是否包含IP或域名 - match = ip_or_domain_pattern.search(line) - if match: - # 提取匹配到的IP或域名 - matched_text = match.group(1) - # 去除IP或域名后的剩余部分,只保留匹配到的IP或域名 - ip_or_domain = matched_text.split('://')[-1].split('/')[0].split('::')[0] - # 将行添加到对应的IP或域名列表中 - if ip_or_domain not in ip_or_domain_to_lines: - ip_or_domain_to_lines[ip_or_domain] = [] - ip_or_domain_to_lines[ip_or_domain].append(line) - ############################################################################### 过滤掉小于1500字节的IP或域名段 - filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items() - if sum(len(line) for line in lines) >= 500} - # 如果没有满足条件的IP或域名段,则不生成文件 - if not filtered_ip_or_domain_to_lines: - print("没有满足条件的IP或域名段,不生成文件。") - return - # 合并所有满足条件的IP或域名的行到一个文件 - with open(output_file_name, 'w', encoding='utf-8') as output_file: - for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items(): - # 写入IP或域名及其对应的行到输出文件 - output_file.write(f"频道,#genre#\n") - for line in lines: - output_file.write(line + '\n') - output_file.write('\n') # 在每个小段后添加一个空行作为分隔 -# 调用函数并传入文件路径和输出文件名 -parse_file('网络收集.txt', '网络收集.txt') - - - -import cv2 -import time -from tqdm import tqdm -import os - -# 存储文件路径 -file_path = "网络收集.txt" -output_file_path = "网络收集.txt" - -def get_ip_key(url): - """从 URL 中提取 IP 地址,并构造一个唯一的键""" - start = url.find('://') + 3 - end = url.find('/', start) - return url[start:end] if end!= -1 else None - -def merge_and_filter(): - with open(file_path, 'r', encoding='utf-8') as file: - lines = file.readlines() - - total_lines = len(lines) - - # 检查并创建失败 IP 文件如果不存在 - failed_ips_file_path = "failed_ips.txt" - if not os.path.exists(failed_ips_file_path): - open(failed_ips_file_path, 'w', encoding='utf-8').close() - - # 读取上次保存的失败 IP 文件 - failed_ips = set() - if os.path.exists(failed_ips_file_path): - with open(failed_ips_file_path, 'r', encoding='utf-8') as failed_file: - for line in failed_file.readlines(): - failed_ips.add(line) - - # 处理输入文件中的数据并进行检测 - with open(output_file_path, 'a', encoding='utf-8') as output_file: - for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'): - if 'genre' in line or line in failed_ips: - output_file.write(line) - continue - parts = line.split(',', 1) - if len(parts) == 2: - channel_name, url = parts - channel_name = channel_name.strip() - url = url.strip() - ip_key = get_ip_key(url) - if ip_key and ip_key in detected_ips: - if detected_ips[ip_key]['status'] == 'ok': - output_file.write(line) - elif ip_key: - cap = cv2.VideoCapture(url) - start_time = time.time() - frame_count = 0 - while frame_count < 50 and (time.time() - start_time) < 3: - ret, frame = cap.read() - if not ret: - break - frame_count += 1 - cap.release() - if frame_count >= 50: - detected_ips[ip_key] = {'status': 'ok'} - output_file.write(line) - else: - detected_ips[ip_key] = {'status': 'fail'} - # 将失败的行添加到失败 IP 文件集合中 - failed_ips.add(line) - - # 合并任意字符加上网络收集.txt 的文件 - all_files = [f for f in os.listdir(os.getcwd()) if f.endswith('网络收集.txt')] - with open(output_file_path, 'a', encoding='utf-8') as main_output: - for file_name in all_files: - if file_name!= output_file_path: - with open(file_name, 'r', encoding='utf-8') as other_file: - content = other_file.read() - if content: - main_output.write('\n') - main_output.write(content) - - # 将检测失败的 IP 行保存到另一个文件 - with open(failed_ips_file_path, 'w', encoding='utf-8') as failed_ips_file: - for failed_line in failed_ips: - failed_ips_file.write(failed_line) - - - - -def remove_duplicates(input_file, output_file): - # 用于存储已经遇到的URL和包含genre的行 - seen_urls = set() - seen_lines_with_genre = set() - # 用于存储最终输出的行 - output_lines = [] - # 打开输入文件并读取所有行 - with open(input_file, 'r', encoding='utf-8') as f: - lines = f.readlines() - print("去重前的行数:", len(lines)) - # 遍历每一行 - for line in lines: - # 使用正则表达式查找URL和包含genre的行,默认最后一行 - urls = re.findall(r'[https]?[http]?[P2p]?[mitv]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', line) - genre_line = re.search(r'\bgenre\b', line, re.IGNORECASE) is not None - # 如果找到URL并且该URL尚未被记录 - if urls and urls[0] not in seen_urls: - seen_urls.add(urls[0]) - output_lines.append(line) - # 如果找到包含genre的行,无论是否已被记录,都写入新文件 - if genre_line: - output_lines.append(line) - # 将结果写入输出文件 - with open(output_file, 'w', encoding='utf-8') as f: - f.writelines(output_lines) - print("去重后的行数:", len(output_lines)) -# 使用方法 -remove_duplicates('网络收集.txt', '网络收集.txt') - - - - - -############################################################################全部检测,防止IP段失效 -# 函数:获取视频分辨率 -def get_video_resolution(video_path, timeout=0.8): - cap = cv2.VideoCapture(video_path) - if not cap.isOpened(): - return None - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - cap.release() - return (width, height) -# 函数:处理每一行 -def process_line(line, output_file, order_list, valid_count, invalid_count, total_lines): - parts = line.strip().split(',') - if '#genre#' in line: - # 如果行包含 '#genre#',直接写入新文件 - with threading.Lock(): - output_file.write(line) - print(f"已写入genre行:{line.strip()}") - elif len(parts) == 2: - channel_name, channel_url = parts - resolution = get_video_resolution(channel_url, timeout=8) - if resolution and resolution[1] >= 720: # 检查分辨率是否大于等于720p - with threading.Lock(): - #output_file.write(f"{channel_name}[{resolution[1]}p],{channel_url}\n") - output_file.write(f"{channel_name},{channel_url}\n") - order_list.append((channel_name, resolution[1], channel_url)) - valid_count[0] += 1 - print(f"Channel '{channel_name}' accepted with resolution {resolution[1]}p at URL {channel_url}.") - else: - invalid_count[0] += 1 - with threading.Lock(): - print(f"有效: {valid_count[0]}, 无效: {invalid_count[0]}, 总数: {total_lines}, 进度: {(valid_count[0] + invalid_count[0]) / total_lines * 100:.2f}%") -# 函数:多线程工作 -def worker(task_queue, output_file, order_list, valid_count, invalid_count, total_lines): - while True: - try: - line = task_queue.get(timeout=1) - process_line(line, output_file, order_list, valid_count, invalid_count, total_lines) - except Queue.Empty: - break - finally: - task_queue.task_done() -# 主函数 -def main(source_file_path, output_file_path): - order_list = [] - valid_count = [0] - invalid_count = [0] - task_queue = Queue() - # 读取源文件 - with open(source_file_path, 'r', encoding='utf-8') as source_file: - lines = source_file.readlines() - with open(output_file_path + '.txt', 'w', encoding='utf-8') as output_file: - # 创建线程池 - with ThreadPoolExecutor(max_workers=64) as executor: - # 创建并启动工作线程 - for _ in range(64): - executor.submit(worker, task_queue, output_file, order_list, valid_count, invalid_count, len(lines)) - # 将所有行放入队列 - for line in lines: - task_queue.put(line) - # 等待队列中的所有任务完成 - task_queue.join() - print(f"任务完成,有效频道数:{valid_count[0]}, 无效频道数:{invalid_count[0]}, 总频道数:{len(lines)}") -if __name__ == "__main__": - source_file_path = '网络收集.txt' # 替换为你的源文件路径 - output_file_path = '网络收集' # 替换为你的输出文件路径,不要后缀名 - main(source_file_path, output_file_path) - - - - - -import re -from pypinyin import lazy_pinyin -# 打开一个utf-8编码的文本文件 -with open("网络收集.txt", "r", encoding="utf-8") as file: - # 读取所有行并存储到列表中 - lines = file.readlines() -# 定义一个函数,用于提取每行的第一个数字 -def extract_first_number(line): - match = re.search(r'\d+', line) - return int(match.group()) if match else float('inf') -# 对列表中的行进行排序,按照第一个数字的大小排列,其余行按中文排序 -sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip()))) -# 将排序后的行写入新的utf-8编码的文本文件 -with open("网络收集.txt", "w", encoding="utf-8") as file: - for line in sorted_lines: - file.write(line) - - - - -def parse_file(input_file_path, output_file_name): # - # 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分 - ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)') - # 用于存储每个IP或域名及其对应的行列表 - ip_or_domain_to_lines = {} - # 用于生成分类名的字母和数字计数器 - alphabet_counter = 0 # 字母计数器,从0开始 - number_counter = 1 # 数字计数器,从1开始 - # 读取原始文件内容 - with open(input_file_path, 'r', encoding='utf-8') as file: - for line in file: - line = line.strip() - # 如果行是分类标签行,则跳过 - if ",#genre#" in line: - continue - # 检查行是否包含IP或域名 - match = ip_or_domain_pattern.search(line) - if match: - # 提取匹配到的IP或域名 - matched_text = match.group(1) - # 去除IP或域名后的剩余部分,只保留匹配到的IP或域名 - ip_or_domain = matched_text.split('://')[-1].split('/')[0].split('::')[0] - # 将行添加到对应的IP或域名列表中 - if ip_or_domain not in ip_or_domain_to_lines: - ip_or_domain_to_lines[ip_or_domain] = [] - ip_or_domain_to_lines[ip_or_domain].append(line) - # 过滤掉小于1000字节的IP或域名段 - filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items() - if sum(len(line) for line in lines) >= 500} # 过滤掉小于1000字节的IP或域名段 - # 如果没有满足条件的IP或域名段,则不生成文件 - if not filtered_ip_or_domain_to_lines: - print("没有满足条件的IP或域名段,不生成文件。") - return - # 合并所有满足条件的IP或域名的行到一个文件 -############################################################ - with open(output_file_name, 'w', encoding='utf-8') as output_file: #output_ - for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items(): - # 检查是否需要递增数字计数器 - if alphabet_counter >= 26: - number_counter += 1 - alphabet_counter = 0 # 重置字母计数器 - ###################################################### - # 生成分类名 - genre_name = chr(65 + alphabet_counter)# + str(number_counter) - output_file.write(f"港澳{genre_name}组,#genre#\n") - for line in lines: - output_file.write(line + '\n') - output_file.write('\n') # 在每个小段后添加一个空行作为分隔 - alphabet_counter += 1 # 递增字母计数器 -# 调用函数并传入文件路径和输出文件名 -parse_file('网络收集.txt', '网络收集.txt') - - -def append_text_between_files(file1_path, file2_path): - with open(file1_path, 'r', encoding='utf-8') as file1: - content1 = file1.read() - lines1 = content1.split('\n') - seen = set() - unique_lines1 = [] - for line in lines1: - if line not in seen: - seen.add(line) - unique_lines1.append(line) - with open(file2_path, 'r', encoding='utf-8') as file2: - content2 = file2.read() - lines2 = content2.split('\n') - seen = set() - unique_lines2 = [] - for line in lines2: - if line not in seen: - seen.add(line) - unique_lines2.append(line) - combined_lines = unique_lines2 + unique_lines1 - with open(file2_path, 'w', encoding='utf-8') as file2: - file2.write('\n'.join(combined_lines)) -file_path1 = '网络收集.txt' -file_path2 = '综合源.txt' -append_text_between_files(file_path1, file_path2) - - -#TXT转M3U# -import datetime -def txt_to_m3u(input_file, output_file): - # 读取txt文件内容 - with open(input_file, 'r', encoding='utf-8') as f: - lines = f.readlines() - # 打开m3u文件并写入内容 - now = datetime.datetime.utcnow() + datetime.timedelta(hours=8) - current_time = now.strftime("%Y/%m/%d %H:%M") - with open(output_file, 'w', encoding='utf-8') as f: - f.write('#EXTM3U x-tvg-url="https://live.fanmingming.com/e.xml" catchup="append" catchup-source="?playseek=${(b)yyyyMMddHHmmss}-${(e)yyyyMMddHHmmss}"\n') - #f.write(f'#EXTINF:-1 group-title="更新时间",请您欣赏\n') - #f.write(f'https://vd2.bdstatic.com/mda-nk3am8nwdgqfy6nh/sc/cae_h264/1667555203921394810/mda-nk3am8nwdgqfy6nh.mp4\n') - #f.write(f'#EXTINF:-1 group-title="{current_time}",虚情的爱\n') - #f.write(f'https://vd2.bdstatic.com/mda-mi1dd05gmhwejdwn/sc/cae_h264/1630576203346678103/mda-mi1dd05gmhwejdwn.mp4\n') - # 初始化genre变量 - genre = '' - # 遍历txt文件内容 - for line in lines: - line = line.strip() - if "," in line: # 防止文件里面缺失",”号报错 - # if line: - # 检查是否是genre行 - channel_name, channel_url = line.split(',', 1) - if channel_url == '#genre#': - genre = channel_name - print(genre) - else: - # 将频道信息写入m3u文件 - f.write(f'#EXTINF:-1 tvg-logo="https://live.fanmingming.com/tv/{channel_name}.png" group-title="{genre}",{channel_name}\n') - f.write(f'{channel_url}\n') -# 将txt文件转换为m3u文件 -txt_to_m3u('综合源.txt', '综合源.m3u') - - -import datetime -now = datetime.datetime.utcnow() + datetime.timedelta(hours=8) -current_time = now.strftime("%Y/%m/%d %H:%M") #:%M -# 打开文本文件并将时间添加到开头 -file_path = "综合源.m3u" -with open(file_path, 'r+', encoding='utf-8') as f: - content = f.read() - f.seek(0, 0) - f.write(f'{content}\n') - #f.write(f'#EXTINF:-1 group-title="更新时间",请您欣赏\n') - #f.write(f'http://em.21dtv.com/songs/60144971.mkv\n') - f.write(f'#EXTINF:-1 group-title="{current_time}更新",虚情的爱\n') - f.write(f'https://vd2.bdstatic.com/mda-mi1dd05gmhwejdwn/sc/cae_h264/1630576203346678103/mda-mi1dd05gmhwejdwn.mp4\n') - - - - -import datetime -now = datetime.datetime.utcnow() + datetime.timedelta(hours=8) -current_time = now.strftime("%Y/%m/%d %H:%M") -# 打开文本文件并将时间添加到开头 -file_path = "综合源.txt" -with open(file_path, 'r+', encoding='utf-8') as f: - content = f.read() - f.seek(0, 0) - f.write(f'{content}\n') - f.write(f'') - #f.write(f'请您欣赏,https://vd2.bdstatic.com/mda-mi1dd05gmhwejdwn/sc/cae_h264/1630576203346678103/mda-mi1dd05gmhwejdwn.mp4\n') - f.write(f'{current_time}更新,#genre#\n') - f.write(f'虚情的爱,https://vd2.bdstatic.com/mda-mi1dd05gmhwejdwn/sc/cae_h264/1630576203346678103/mda-mi1dd05gmhwejdwn.mp4\n') - - -import datetime -now = datetime.datetime.utcnow() + datetime.timedelta(hours=8) -current_time = now.strftime("%Y/%m/%d %H:%M") -# 打开文本文件并将时间添加到开头 -file_path = "网络收集.txt" -with open(file_path, 'r+', encoding='utf-8') as f: - content = f.read() - f.seek(0, 0) - f.write(f'{current_time}更新,#genre#\n') - f.write(f'虚情的爱,https://vd2.bdstatic.com/mda-mi1dd05gmhwejdwn/sc/cae_h264/1630576203346678103/mda-mi1dd05gmhwejdwn.mp4\n{content}') - - -################################################################################################任务结束,删除不必要的过程文件 -files_to_remove = ["2.txt", "汇总.txt"] -for file in files_to_remove: - if os.path.exists(file): - os.remove(file) - else: # 如果文件不存在,则提示异常并打印提示信息 - print(f"文件 {file} 不存在,跳过删除。") -print("任务运行完毕,频道列表可查看文件夹内源.txt文件!")