From ed4ddbdf06265cd2589c00f08792cbd6db1943be Mon Sep 17 00:00:00 2001 From: frxz751113 <156018267+frxz751113@users.noreply.github.com> Date: Sun, 4 May 2025 12:47:28 +0800 Subject: [PATCH] =?UTF-8?q?Update=20=E9=85=92=E5=BA=97=E6=BA=90255.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/酒店源255.py | 109 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 77 insertions(+), 32 deletions(-) diff --git a/py/酒店源255.py b/py/酒店源255.py index 3d2047d..cc28add 100644 --- a/py/酒店源255.py +++ b/py/酒店源255.py @@ -30,9 +30,7 @@ from translate import Translator # 导入Translator类,用于文本翻译 ###########################################################ZHGX采集#################################################### ###################################################################################################################### ###################################################################################################################### -import random -import concurrent.futures -from fake_useragent import UserAgent # 需要先安装:pip install fake-useragent +import requests urls = [ #"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODA4Ig%3D%3D", #808 @@ -42,87 +40,132 @@ urls = [ #"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODAwMyI%3D" , #8003 "https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODg4Ig%3D%3D" , #888 #"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODA4OSI%3D" , #8089 - "https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODgi" + #"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODAwOSI%3D", #8009 ] def modify_urls(url): + # 创建一个空列表用于存储修改后的 URL modified_urls = [] + # 找到 URL 中 IP 地址开始的索引位置,"//" 后两个字符开始为 IP 地址起始位置 ip_start_index = url.find("//") + 2 + # 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置 ip_end_index = url.find(":", ip_start_index) + # 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置 base_url = url[:ip_start_index] + # 获取 URL 中的 IP 地址部分 ip_address = url[ip_start_index:ip_end_index] + # 获取 URL 中的端口部分 port = url[ip_end_index:] + # 定义一个字符串,表示 IP 地址的结尾部分 ip_end = "/ZHGXTV/Public/json/live_interface.txt" + # 遍历 1 到 255 的数字 for i in range(1, 256): + # 修改 IP 地址的最后一位数字 modified_ip = f"{ip_address[:-1]}{i}" + # 组合成新的 URL modified_url = f"{base_url}{modified_ip}{port}{ip_end}" + # 将新的 URL 添加到列表中 modified_urls.append(modified_url) + # 返回修改后的 URL 列表 return modified_urls def is_url_accessible(url): try: - response = requests.get(url, timeout=3, headers={'User-Agent': UserAgent().random}) + # 发送 GET 请求,设置超时时间为 3 秒 + response = requests.get(url, timeout=3) + # 如果响应状态码在 200 到 401 之间(包括 200 和 401),则认为 URL 可访问 if 200 <= response.status_code <= 401: return url except requests.exceptions.RequestException: + # 如果请求过程中出现异常,不做任何处理,直接跳过 pass return None +# 创建一个空列表用于存储结果 results = [] for url in urls: - response = requests.get(url, headers={'User-Agent': UserAgent().random}) + # 发送 GET 请求获取 URL 的内容 + response = requests.get(url) + # 获取响应的文本内容 page_content = response.text - urls_all = re.findall(r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+", page_content) - urls = set(urls_all) + + # 查找所有符合指定格式的网址 + pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如 http://8.8.8.8:8888 + # 使用正则表达式在页面内容中查找所有符合格式的 URL + urls_all = re.findall(pattern, page_content) + # urls = list(set(urls_all)) # 去重得到唯一的URL列表 + urls = set(urls_all) # 去重得到唯一的URL列表 x_urls = [] - for url in urls: # 缩进修正:内层循环与外层for对齐 + for url in urls: # 对urls进行处理,ip第四位修改为1,并去重 url = url.strip() + # 找到 URL 中 IP 地址开始的索引位置,"//" 后两个字符开始为 IP 地址起始位置 ip_start_index = url.find("//") + 2 + # 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置 ip_end_index = url.find(":", ip_start_index) + # 找到 IP 地址中第一个 "." 的位置 ip_dot_start = url.find(".") + 1 + # 找到 IP 地址中第二个 "." 的位置 ip_dot_second = url.find(".", ip_dot_start) + 1 + # 找到 IP 地址中第三个 "." 的位置 ip_dot_three = url.find(".", ip_dot_second) + 1 - base_url = url[:ip_start_index] + # 获取 URL 的基础部分,即从开头到 IP 地址开始的部分 + base_url = url[:ip_start_index] # http:// or https:// + # 获取 URL 中的 IP 地址部分,截取到第三个 "." 的位置 ip_address = url[ip_start_index:ip_dot_three] + # 获取 URL 中的端口部分 port = url[ip_end_index:] + # 定义一个字符串,表示 IP 地址的结尾部分为 "1" ip_end = "1" + # 修改 IP 地址的最后一位为 "1" modified_ip = f"{ip_address}{ip_end}" + # 组合成新的 URL x_url = f"{base_url}{modified_ip}{port}" + # 将新的 URL 添加到列表中 x_urls.append(x_url) - urls = set(x_urls) + urls = set(x_urls) # 去重得到唯一的URL列表 valid_urls = [] + # 多线程获取可用url with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: futures = [] - for url in urls: # 缩进修正:内层循环与with块对齐 + for url in urls: + url = url.strip() + # 获取修改后的 URL 列表 modified_urls = modify_urls(url) for modified_url in modified_urls: + # 提交任务,检查每个修改后的 URL 是否可访问 futures.append(executor.submit(is_url_accessible, modified_url)) for future in concurrent.futures.as_completed(futures): result = future.result() if result: + # 如果 URL 可访问,将其添加到有效 URL 列表中 valid_urls.append(result) - for url in valid_urls: # 缩进修正:与外层for对齐 + for url in valid_urls: print(url) - for url in valid_urls: # 缩进修正:与外层for对齐 + # 遍历网址列表,获取JSON文件并解析 + for url in valid_urls: try: + # 发送GET请求获取JSON文件,设置超时时间为0.5秒 json_url = f"{url}" - response = requests.get(json_url, timeout=3, headers={'User-Agent': UserAgent().random}) + response = requests.get(json_url, timeout=3)################################ json_data = response.content.decode('utf-8') - lines = json_data.split('\n') - excluded_keywords = ['udp', 'rtp'] - for line in lines: # 缩进修正:内层循环与try块对齐 - if 'hls' in line and all(keyword not in line for keyword in excluded_keywords): - line = line.strip() - if line: - name, channel_url = line.split(',') - urls = channel_url.split('/', 3) - url_data = json_url.split('/', 3) - if len(urls) >= 3: - urld = (f"{urls[0]}//{url_data[2]}/{urls[3]}") - else: - urld = (f"{urls}") + try: + # 按行分割数据 + lines = json_data.split('\n') + excluded_keywords = ['udp', 'rtp'] + for line in lines: + if 'hls' in line and all(keyword not in line for keyword in excluded_keywords): + line = line.strip() + if line: + name, channel_url = line.split(',') + urls = channel_url.split('/', 3) + url_data = json_url.split('/', 3) + if len(urls) >= 3: + urld = (f"{urls[0]}//{url_data[2]}/{urls[3]}") + else: + urld = (f"{urls}") + #print(f"{name},{urld}") #关闭频道名称和频道地址打印,缩短运行时间 if name and urld: - name = name.replace("高清电影", "影迷电影") + name = name.replace("高清电影", "影迷电影") name = name.replace("中央", "CCTV") name = name.replace("高清", "") name = name.replace("HD", "") @@ -229,15 +272,17 @@ for url in urls: name = name.replace("奥运匹克", "") urld = urld.replace("index.m3u8", "index.m3u8?$智慧光迅听说名字越长越好看") results.append(f"{name},{urld}") - except: # 缩进修正:与外层for对齐 + except: + continue + except: continue channels = [] -for result in results: # 缩进修正:与外层代码块对齐 +for result in results: line = result.strip() if result: channel_name, channel_url = result.split(',') channels.append((channel_name, channel_url)) -with open("iptv.txt", 'w', encoding='utf-8') as file: # 缩进修正:与外层代码块对齐 +with open("iptv.txt", 'w', encoding='utf-8') as file: for result in results: file.write(result + "\n") print(result) #关闭频道名称和频道地址打印,缩短运行时间