From a7cb245274033ec1d9283ced7441829e30e2c1da Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Mon, 12 Aug 2024 19:34:05 +0800 Subject: [PATCH] =?UTF-8?q?Delete=20py/iptv=E5=8E=9F.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/iptv原.py | 721 --------------------------------------------------- 1 file changed, 721 deletions(-) delete mode 100644 py/iptv原.py diff --git a/py/iptv原.py b/py/iptv原.py deleted file mode 100644 index 9b21925..0000000 --- a/py/iptv原.py +++ /dev/null @@ -1,721 +0,0 @@ -#######################################################简易排序 -with open('iptv.txt', 'r', encoding='UTF-8') as f: - lines = f.readlines() -lines.sort() -with open('iptv.txt', 'w', encoding='UTF-8') as f: - for line in lines: - f.write(line) - -##########################################################IP段去重 -import re -def deduplicate_lines(input_file_path, output_file_path): - seen_combinations = {} - unique_lines = [] - with open(input_file_path, 'r', encoding='utf-8') as file: - for line in file: - # 使用正则表达式查找行中的所有URL,并捕获IP地址、端口号和端口号之后的部分 - urls = re.findall(r'http://([\d.]+):(\d+)(/.*)?', line) - # 为每个URL生成一个去重键 - for full_url in urls: - ip, port, path = full_url - ip_parts = ip.split('.') - if len(ip_parts) < 3: - continue - # 使用IP的前三个字段和端口号之后的部分生成去重键 - combination_key = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}-{port}-{path or ''}" - # 检查这个组合是否已经出现过 - if combination_key not in seen_combinations: - # 如果没有出现过,记录当前行和去重键 - seen_combinations[combination_key] = line.strip() - else: - # 如果已经出现过,更新为最后出现的行 - seen_combinations[combination_key] = line.strip() - # 将去重后的所有唯一行写入新文件 - with open(output_file_path, 'w', encoding='utf-8') as file: - for line in seen_combinations.values(): - file.write(line + '\n') -# 调用函数 -input_file_path = 'iptv.txt' -output_file_path = 'iptv.txt' -deduplicate_lines(input_file_path, output_file_path) -################################################################################ - - - - - - - - - - - - - - - - - - - -import time -import concurrent.futures -from selenium import webdriver -from selenium.webdriver.chrome.options import Options -import requests -import re -import os -import threading -from queue import Queue -from datetime import datetime -import replace -import fileinput - -# 获取远程港澳台直播源文件 -url = "https://raw.githubusercontent.com/frxz751113/AAAAA/main/IPTV/TW.txt" #源采集地址 -r = requests.get(url) -open('ott移动v4.txt','wb').write(r.content) #打开源文件并临时写入 - -keywords = [''] # 需要提取的关键字列表,留空则全局选择 -pattern = '|'.join(keywords) # 创建正则表达式模式,匹配任意一个关键字 -#pattern = r"^(.*?),(?!#genre#)(.*?)$" #去掉genre行直接复制 -with open('ott移动v4.txt', 'r', encoding='utf-8') as file, open('TW.txt', 'w', encoding='utf-8') as TW: - #TW.write('\n央视频道,#genre#\n') - for line in file: - if "genre" not in line: - if re.search(pattern, line): # 如果行中有任意关键字 - TW.write(line) # 将该行写入输出文件 - - - - -# 扫源测绘空间地址 -# 搜素关键词:"iptv/live/zh_cn.js" && country="CN" && region="Hunan" && city="changsha" -# 搜素关键词:"ZHGXTV" && country="CN" && region="Hunan" && city="changsha" -#"isShowLoginJs"智能KUTV管理 - -urls = [ - #"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcmVnaW9uPSJmdWppYW4i",#福建 - "https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcG9ydD0iODE4MSIgJiYgY2l0eT0iR3VpZ2FuZyI%3D", #贵港8181 - #"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyI%3D", #随机 - "https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY2l0eT0ieXVsaW4i",#玉林 - #"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcmVnaW9uPSJIdWJlaSIg",#湖北 - #"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcG9ydD0iODE4MSI%3D",#8181 - #"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY2l0eT0iemhhbmd6aG91Ig%3D%3D", #漳州 - #"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIGNpdHk9Im5hbm5pbmci", #广西南宁 - #"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5bm%2F6KW%2FIg%3D%3D", #广西 壮族iptv -] -def modify_urls(url): - modified_urls = [] - ip_start_index = url.find("//") + 2 - ip_end_index = url.find(":", ip_start_index) - base_url = url[:ip_start_index] # http:// or https:// - ip_address = url[ip_start_index:ip_end_index] - port = url[ip_end_index:] - ip_end = "/iptv/live/1000.json?key=txiptv" - for i in range(1, 256): - modified_ip = f"{ip_address[:-1]}{i}" - modified_url = f"{base_url}{modified_ip}{port}{ip_end}" - modified_urls.append(modified_url) - - return modified_urls - - -def is_url_accessible(url): - try: - response = requests.get(url, timeout=2) ###////////////////// - if response.status_code == 200: - return url - except requests.exceptions.RequestException: - pass - return None - - -results = [] - -for url in urls: - # 创建一个Chrome WebDriver实例 - chrome_options = Options() - chrome_options.add_argument('--headless') - chrome_options.add_argument('--no-sandbox') - chrome_options.add_argument('--disable-dev-shm-usage') - - driver = webdriver.Chrome(options=chrome_options) - # 使用WebDriver访问网页 - driver.get(url) # 将网址替换为你要访问的网页地址 - time.sleep(10) - # 获取网页内容 - page_content = driver.page_source - - # 关闭WebDriver - driver.quit() - - # 查找所有符合指定格式的网址 - pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如http://8.8.8.8:8888 - urls_all = re.findall(pattern, page_content) - # urls = list(set(urls_all)) # 去重得到唯一的URL列表 - urls = set(urls_all) # 去重得到唯一的URL列表 - x_urls = [] - for url in urls: # 对urls进行处理,ip第四位修改为1,并去重 - url = url.strip() - ip_start_index = url.find("//") + 2 - ip_end_index = url.find(":", ip_start_index) - ip_dot_start = url.find(".") + 1 - ip_dot_second = url.find(".", ip_dot_start) + 1 - ip_dot_three = url.find(".", ip_dot_second) + 1 - base_url = url[:ip_start_index] # http:// or https:// - ip_address = url[ip_start_index:ip_dot_three] - port = url[ip_end_index:] - ip_end = "1" - modified_ip = f"{ip_address}{ip_end}" - x_url = f"{base_url}{modified_ip}{port}" - x_urls.append(x_url) - urls = set(x_urls) # 去重得到唯一的URL列表 - - valid_urls = [] - # 多线程获取可用url - with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: - futures = [] - for url in urls: - url = url.strip() - modified_urls = modify_urls(url) - for modified_url in modified_urls: - futures.append(executor.submit(is_url_accessible, modified_url)) - - for future in concurrent.futures.as_completed(futures): - result = future.result() - if result: - valid_urls.append(result) - - for url in valid_urls: - print(url) - # 遍历网址列表,获取JSON文件并解析 - for url in valid_urls: - try: - # 发送GET请求获取JSON文件,设置超时时间为0.5秒 - ip_start_index = url.find("//") + 2 - ip_dot_start = url.find(".") + 1 - ip_index_second = url.find("/", ip_dot_start) - base_url = url[:ip_start_index] # http:// or https:// - ip_address = url[ip_start_index:ip_index_second] - url_x = f"{base_url}{ip_address}" - - json_url = f"{url}" - response = requests.get(json_url, timeout=2) ####/////////////// - json_data = response.json() - - try: - # 解析JSON文件,获取name和url字段 - for item in json_data['data']: - if isinstance(item, dict): - name = item.get('name') - urlx = item.get('url') - if ',' in urlx: - if 'udp' not in urlx and 'rtp' not in urlx and ':1111' not in urlx: - urlx = f"aaaaaaaa" - - #if 'http' in urlx or 'udp' in urlx or 'rtp' in urlx: - if 'http' in urlx: - urld = f"{urlx}" - else: - urld = f"{url_x}{urlx}" - - if name and urld: - name = name.replace("高清电影", "影迷电影") - name = name.replace("中央", "CCTV") - name = name.replace("高清", "") - name = name.replace("HD", "") - name = name.replace("标清", "") - name = name.replace("超高", "") - name = name.replace("频道", "") - name = name.replace("汉1", "汉") - name = name.replace("汉2", "汉") - name = name.replace("汉3", "汉") - name = name.replace("汉4", "汉") - name = name.replace("汉5", "汉") - name = name.replace("汉6", "汉") - name = name.replace("CHC动", "动") - name = name.replace("CHC家", "家") - name = name.replace("CHC影", "影") - name = name.replace("-", "") - name = name.replace(" ", "") - name = name.replace("PLUS", "+") - name = name.replace("+", "+") - name = name.replace("(", "") - name = name.replace(")", "") - name = name.replace("CHC", "") - name = name.replace("L", "") - name = name.replace("002", "AA酒店MV") - name = name.replace("测试002", "凤凰卫视") - name = name.replace("测试003", "凤凰卫视") - name = name.replace("测试004", "私人影院") - name = name.replace("测试005", "私人影院") - name = name.replace("测试006", "东森洋片") - name = name.replace("测试007", "东森电影") - name = name.replace("测试008", "AXN电影") - name = name.replace("测试009", "好莱坞电影") - name = name.replace("测试010", "龙祥电影") - name = name.replace("莲花台", "凤凰香港") - name = name.replace("测试014", "凤凰资讯") - name = name.replace("测试015", "未知影视") - name = name.replace("TV星河", "空") - name = name.replace("305", "酒店影视1") - name = name.replace("306", "酒店影视2") - name = name.replace("307", "酒店影视3") - name = name.replace("CMIPTV", "") - name = name.replace("cctv", "CCTV") - name = re.sub(r"CCTV(\d+)台", r"CCTV\1", name) - name = name.replace("CCTV1综合", "CCTV1") - name = name.replace("CCTV2财经", "CCTV2") - name = name.replace("CCTV3综艺", "CCTV3") - name = name.replace("CCTV4国际", "CCTV4") - name = name.replace("CCTV4中文国际", "CCTV4") - name = name.replace("CCTV4欧洲", "CCTV4") - name = name.replace("CCTV5体育", "CCTV5") - name = name.replace("CCTV5+体育", "CCTV5+") - name = name.replace("CCTV6电影", "CCTV6") - name = name.replace("CCTV7军事", "CCTV7") - name = name.replace("CCTV7军农", "CCTV7") - name = name.replace("CCTV7农业", "CCTV7") - name = name.replace("CCTV7国防军事", "CCTV7") - name = name.replace("CCTV8电视剧", "CCTV8") - name = name.replace("CCTV8纪录", "CCTV9") - name = name.replace("CCTV9记录", "CCTV9") - name = name.replace("CCTV9纪录", "CCTV9") - name = name.replace("CCTV10科教", "CCTV10") - name = name.replace("CCTV11戏曲", "CCTV11") - name = name.replace("CCTV12社会与法", "CCTV12") - name = name.replace("CCTV13新闻", "CCTV13") - name = name.replace("CCTV新闻", "CCTV13") - name = name.replace("CCTV14少儿", "CCTV14") - name = name.replace("央视14少儿", "CCTV14") - name = name.replace("CCTV少儿超", "CCTV14") - name = name.replace("CCTV15音乐", "CCTV15") - name = name.replace("CCTV音乐", "CCTV15") - name = name.replace("CCTV16奥林匹克", "CCTV16") - name = name.replace("CCTV17农业农村", "CCTV17") - name = name.replace("CCTV17军农", "CCTV17") - name = name.replace("CCTV17农业", "CCTV17") - name = name.replace("CCTV5+体育赛视", "CCTV5+") - name = name.replace("CCTV5+赛视", "CCTV5+") - name = name.replace("CCTV5+体育赛事", "CCTV5+") - name = name.replace("CCTV5+赛事", "CCTV5+") - name = name.replace("CCTV5+体育", "CCTV5+") - name = name.replace("CCTV5赛事", "CCTV5+") - name = name.replace("凤凰中文台", "凤凰中文") - name = name.replace("凤凰资讯台", "凤凰资讯") - name = name.replace("CCTV4K测试)", "CCTV4") - name = name.replace("CCTV164K", "CCTV16") - name = name.replace("上海东方卫视", "上海卫视") - name = name.replace("东方卫视", "上海卫视") - name = name.replace("内蒙卫视", "内蒙古卫视") - name = name.replace("福建东南卫视", "东南卫视") - name = name.replace("广东南方卫视", "南方卫视") - name = name.replace("湖南金鹰卡通", "金鹰卡通") - name = name.replace("炫动卡通", "哈哈炫动") - name = name.replace("卡酷卡通", "卡酷少儿") - name = name.replace("卡酷动画", "卡酷少儿") - name = name.replace("BRTVKAKU少儿", "卡酷少儿") - name = name.replace("优曼卡通", "优漫卡通") - name = name.replace("优曼卡通", "优漫卡通") - name = name.replace("嘉佳卡通", "佳嘉卡通") - name = name.replace("世界地理", "地理世界") - name = name.replace("CCTV世界地理", "地理世界") - name = name.replace("BTV北京卫视", "北京卫视") - name = name.replace("BTV冬奥纪实", "冬奥纪实") - name = name.replace("东奥纪实", "冬奥纪实") - name = name.replace("卫视台", "卫视") - name = name.replace("湖南电视台", "湖南卫视") - name = name.replace("少儿科教", "少儿") - name = name.replace("TV星河2)", "星河") - name = name.replace("影视剧", "影视") - name = name.replace("电视剧", "影视") - name = name.replace("奥运匹克", "") - name = name.replace("星空卫视", "动物杂技") - results.append(f"{name},{urld}") - except: - continue - except: - continue - -channels = [] - -for result in results: - line = result.strip() - if result: - channel_name, channel_url = result.split(',') - channels.append((channel_name, channel_url)) - -with open("iptv.txt", 'w', encoding='utf-8') as file: - for result in results: - file.write(result + "\n") - print(result) -print("频道列表文件iptv.txt获取完成!") -############################################################ -def filter_lines(input_file, output_file): - with open(input_file, 'r', encoding='utf-8') as file: - lines = file.readlines() - filtered_lines = [] - for line in lines: - if ',' in line and 'udp' not in line and 'rtp' not in line: - filtered_lines.append(line) - with open(output_file, 'w', encoding='utf-8') as output_file: - output_file.writelines(filtered_lines) -# 使用函数 -filter_lines("iptv.txt", "iptv.txt") -################################################################# - -def remove_duplicates(input_file, output_file): - # 用于存储已经遇到的URL和包含genre的行 - seen_urls = set() - seen_lines_with_genre = set() - # 用于存储最终输出的行 - output_lines = [] - # 打开输入文件并读取所有行 - with open(input_file, 'r', encoding='utf-8') as f: - lines = f.readlines() - print("去重前的行数:", len(lines)) - # 遍历每一行 - for line in lines: - # 使用正则表达式查找URL和包含genre的行,默认最后一行 - urls = re.findall(r'[https]?[http]?[P2p]?[mitv]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', line) - genre_line = re.search(r'\bgenre\b', line, re.IGNORECASE) is not None - # 如果找到URL并且该URL尚未被记录 - if urls and urls[0] not in seen_urls: - seen_urls.add(urls[0]) - output_lines.append(line) - # 如果找到包含genre的行,无论是否已被记录,都写入新文件 - if genre_line: - output_lines.append(line) - # 将结果写入输出文件 - with open(output_file, 'w', encoding='utf-8') as f: - f.writelines(output_lines) - print("去重后的行数:", len(output_lines)) - -# 使用方法 -remove_duplicates('iptv.txt', 'iptv.txt') - - - -for line in fileinput.input("iptv.txt", inplace=True): #打开文件,并对其进行关键词原地替换 ########### - line = line.replace("CHC电影", "影迷电影") ########### ########### - print(line, end="") #设置end="",避免输出多余的换行符 - -import eventlet - -eventlet.monkey_patch() - -# 线程安全的队列,用于存储下载任务 -task_queue = Queue() - -# 线程安全的列表,用于存储结果 -results = [] - -channels = [] -error_channels = [] -# 从iptv.txt文件内提取其他频道进行检测并分组 -with open("iptv.txt", 'r', encoding='utf-8') as file: - lines = file.readlines() - for line in lines: - line = line.strip() - if line: - channel_name, channel_url = line.split(',') - if 'genre' not in channel_url: - channels.append((channel_name, channel_url)) - - -# 定义工作线程函数 -def worker(): - while True: - # 从队列中获取一个任务 - channel_name, channel_url = task_queue.get() - try: - channel_url_t = channel_url.rstrip(channel_url.split('/')[-1]) # m3u8链接前缀 - lines = requests.get(channel_url).text.strip().split('\n') # 获取m3u8文件内容 - ts_lists = [line.split('/')[-1] for line in lines if line.startswith('#') == False] # 获取m3u8文件下视频流后缀 - ts_lists_0 = ts_lists[0].rstrip(ts_lists[0].split('.ts')[-1]) # m3u8链接前缀 - ts_url = channel_url_t + ts_lists[0] # 拼接单个视频片段下载链接 - - - # 获取的视频数据进行5秒钟限制 - with eventlet.Timeout(30, False): #################//////////////////////////////// - start_time = time.time() - content = requests.get(ts_url).content - end_time = time.time() - response_time = (end_time - start_time) * 1 - - if content: - with open(ts_lists_0, 'ab') as f: - f.write(content) # 写入文件 - file_size = len(content) - # print(f"文件大小:{file_size} 字节") - download_speed = file_size / response_time / 1024 - # print(f"下载速度:{download_speed:.3f} kB/s") - normalized_speed = min(max(download_speed / 1024, 0.001), 100) # 将速率从kB/s转换为MB/s并限制在1~100之间 - # print(f"标准化后的速率:{normalized_speed:.3f} MB/s") - - # 删除下载的文件 - os.remove(ts_lists_0) - result = channel_name, channel_url, f"{normalized_speed:.3f} MB/s" - results.append(result) - numberx = (len(results) + len(error_channels)) / len(channels) * 100 - print( - f"可用频道:{len(results)} 个 , 不可用频道:{len(error_channels)} 个 , 总频道:{len(channels)} 个 ,总进度:{numberx:.2f} %。") - except: - error_channel = channel_name, channel_url - error_channels.append(error_channel) - numberx = (len(results) + len(error_channels)) / len(channels) * 100 - print( - f"可用频道:{len(results)} 个 , 不可用频道:{len(error_channels)} 个 , 总频道:{len(channels)} 个 ,总进度:{numberx:.2f} %。") - - # 标记任务完成 - task_queue.task_done() - - -# 创建多个工作线程 -num_threads = 128 -for _ in range(num_threads): - t = threading.Thread(target=worker, daemon=True) - # t = threading.Thread(target=worker, args=(event,len(channels))) # 将工作线程设置为守护线程 - t.start() - # event.set() - -# 添加下载任务到队列 -for channel in channels: - task_queue.put(channel) - -# 等待所有任务完成 -task_queue.join() - - -def channel_key(channel_name): - match = re.search(r'\d+', channel_name) - if match: - return int(match.group()) - else: - return float('inf') # 返回一个无穷大的数字作为关键字 - - -# 对频道进行排序 -results.sort(key=lambda x: (x[0], -float(x[2].split()[0]))) -results.sort(key=lambda x: channel_key(x[0])) -result_counter = 88 # 每个频道需要的个数 - -with open("hn.txt", 'w', encoding='utf-8') as file: - channel_counters = {} - file.write('央视频道1,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if 'CCTV' in channel_name or '动作' in channel_name or '家庭' in channel_name or '影迷' in channel_name or '爱上' in channel_name or 'CETV' in channel_name: - if '剧场' not in channel_name and '风云' not in channel_name and '教育' not in channel_name and '经典' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - channel_counters = {} - file.write('卫视频道1,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '重温经典' in channel_name: - if '凤凰' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - - channel_counters = {} - file.write('卫视频道1,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '卫视' in channel_name: - if '凤凰' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - - channel_counters = {} - file.write('省市频道,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '湖' in channel_name or '河北' in channel_name: - if 'CCTV' not in channel_name and '卫视' not in channel_name and '购物' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - channel_counters = {} - file.write('省市频道,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '武汉' in channel_name: - if 'CCTV' not in channel_name and '卫视' not in channel_name and '购物' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - channel_counters = {} - file.write('省市频道,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '河北' in channel_name: - if 'CCTV' not in channel_name and '卫视' not in channel_name and '购物' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - channel_counters = {} - file.write('省市频道,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '广' in channel_name or '珠江台测试' in channel_name or '南宁' in channel_name: - if 'CCTV' not in channel_name and '卫视' not in channel_name and '购物' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - - channel_counters = {} - file.write('少儿频道,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '卡通' in channel_name or '少儿动画' in channel_name or '哈哈' in channel_name or '动漫秀场' in channel_name: - if 'CCTV' not in channel_name and '卫视' not in channel_name and '湖' not in channel_name and '广' not in channel_name and '河' not in channel_name and '黑' not in channel_name and '保' not in channel_name and '宁' not in channel_name and '家庭' not in channel_name and '影迷' not in channel_name and '动作' not in channel_name and '武汉' not in channel_name and 'CETV' not in channel_name and '交通' not in channel_name and '冬' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - - - - - - - channel_counters = {} - file.write('港澳频道,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if '龙祥' in channel_name or '翡翠' in channel_name or '酒店' in channel_name or 'AXN' in channel_name or '东森' in channel_name or '莲花' in channel_name or '天映' in channel_name or '好莱坞' in channel_name or '星河' in channel_name or '私人' in channel_name or '哔哩' in channel_name or '凤凰' in channel_name: - #if 'CCTV' not in channel_name and '卫视' not in channel_name and 'TV' not in channel_name and '儿' not in channel_name and '文' not in channel_name and 'CHC' not in channel_name and '新' not in channel_name and '山东' not in channel_name and '河北' not in channel_name and '哈哈' not in channel_name and '临沂' not in channel_name and '公共' not in channel_name and 'CETV' not in channel_name and '交通' not in channel_name and '冬' not in channel_name and '梨园' not in channel_name and '民生' not in channel_name and '综合' not in channel_name and '法制' not in channel_name and '齐鲁' not in channel_name and '自办' not in channel_name and '都市' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - channel_counters = {} - file.write('其他频道,#genre#\n') - for result in results: - channel_name, channel_url, speed = result - if 'CCTV' not in channel_name and '卫视' not in channel_name: - if channel_name in channel_counters: - if channel_counters[channel_name] >= result_counter: - continue - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] += 1 - else: - file.write(f"{channel_name},{channel_url}\n") - channel_counters[channel_name] = 1 - - - - - - -# 合并自定义频道文件内容 -file_contents = [] -file_paths = ["hn.txt"] # 替换为实际的文件路径列表 -for file_path in file_paths: - with open(file_path, 'r', encoding="utf-8") as file: - content = file.read() - file_contents.append(content) - -# 写入合并后的文件 -with open("酒店源.txt", "w", encoding="utf-8") as output: - output.write(''.join(file_contents))#\n -for line in fileinput.input("酒店源.txt", inplace=True): #打开文件,并对其进行关键词原地替换 - line = line.replace("AA", "") - line = line.replace("\n电影,", "\n影迷电影,") - print(line, end="") #设置end="",避免输出多余的换行符 - -#########原始顺序去重,以避免同一个频道出现在不同的类中 -with open('酒店源.txt', 'r', encoding="utf-8") as file: - lines = file.readlines() -# 使用列表来存储唯一的行的顺序 - unique_lines = [] - seen_lines = set() -# 遍历每一行,如果是新的就加入unique_lines -for line in lines: - if line not in seen_lines: - unique_lines.append(line) - seen_lines.add(line) -# 将唯一的行写入新的文档 -with open('酒店源.txt', 'w', encoding="utf-8") as file: - file.writelines(unique_lines) -##################### - - -os.remove("iptv.txt") -os.remove("hn.txt") -os.remove("TW.txt") -os.remove("ott移动v4.txt") -print("任务运行完毕")