From e29604d27dcc085a6fe8be117b1d0a20310195d1 Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Mon, 16 Jun 2025 07:10:25 +0800 Subject: [PATCH] =?UTF-8?q?Update=20and=20rename=20=E9=85=92=E5=BA=97?= =?UTF-8?q?=E6=BA=90=20-=20=E5=89=AF=E6=9C=AC.py=20to=20=E9=85=92=E5=BA=97?= =?UTF-8?q?=E6=BA=90=E5=89=AF=E6=9C=AC.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/{酒店源 - 副本.py => 酒店源副本.py} | 75 +++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) rename py/{酒店源 - 副本.py => 酒店源副本.py} (90%) diff --git a/py/酒店源 - 副本.py b/py/酒店源副本.py similarity index 90% rename from py/酒店源 - 副本.py rename to py/酒店源副本.py index 7a5e656..d787cfa 100644 --- a/py/酒店源 - 副本.py +++ b/py/酒店源副本.py @@ -505,6 +505,79 @@ remove_duplicates('iptv.txt', 'iptv_temp.txt') filter_lines('iptv_temp.txt', 'iptv.txt') print("去重和过滤完成!最终结果保存在 iptv.txt") +import requests +from tqdm import tqdm +import threading +import re +print("本程序只检测地址中带m3u/php/live的,其它的无论是否有效一律不检测不输出\n") +print("如果你确定不含这种关键词的地址是有效源请预先备份\n") + +# 测试HTTP连接 +def test_connectivity(url, max_attempts=2): + #if "udp" in url or "rtp" in url: + #print("\n组播地址: 跳过检测") + #return False + + video_formats = ["m3u", "1", "/", "rtsp"] + if not any(re.search(keyword, url, re.I) for keyword in video_formats): + print("\n特殊网址: 跳过检测") + return False + + for _ in range(max_attempts): + try: + response = requests.get(url, timeout=0.3) + return response.status_code == 200 + except requests.RequestException: + pass + + return False + +# 处理每一行的函数 +def process_line(line, output_file, valid_count, invalid_count): + parts = line.strip().split(",") + if len(parts) == 2: + channel_name, channel_url = parts + if "genre" in line.lower(): + with threading.Lock(): + output_file.write("\n" +line) # 直接写入原始行 + elif test_connectivity(channel_url): + with threading.Lock(): + output_file.write(f"{channel_name},{channel_url}\n") + valid_count[0] += 1 + else: + with threading.Lock(): + invalid_count[0] += 1 + else: + with threading.Lock(): + invalid_count[0] += 1 + +# 主函数 +def main(source_file_path, output_file_path): + with open(source_file_path, "r", encoding="utf-8") as source_file: + lines = source_file.readlines() + + valid_count = [0] + invalid_count = [0] + + with open(output_file_path, "w", encoding="utf-8") as output_file: + threads = [] + for line in tqdm(lines, desc="地址有效"): + thread = threading.Thread(target=process_line, args=(line, output_file, valid_count, invalid_count)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + print(f"任务完成,有效源数量: {valid_count[0]}, 无效源数量: {invalid_count[0]}") +if __name__ == "__main__": + try: + source_file_path = "iptv.txt"('请动动你发财的小手吧\n\n拖入utf-8直播源文件回车后运行:') + output_file_path = "酒店源.txt" + main(source_file_path, output_file_path) + except Exception as e: + print(f"程序发生错误: {e}") + +os.remove("iptv.txt") os.remove("iptv_temp.txt") -input("按任意键退出...") \ No newline at end of file