diff --git a/py/酒店源255.py b/py/酒店源255.py index cc28add..1ecddbf 100644 --- a/py/酒店源255.py +++ b/py/酒店源255.py @@ -30,7 +30,9 @@ from translate import Translator # 导入Translator类,用于文本翻译 ###########################################################ZHGX采集#################################################### ###################################################################################################################### ###################################################################################################################### -import requests +import random +import concurrent.futures +from fake_useragent import UserAgent # 需要先安装:pip install fake-useragent urls = [ #"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODA4Ig%3D%3D", #808 @@ -44,126 +46,81 @@ urls = [ ] def modify_urls(url): - # 创建一个空列表用于存储修改后的 URL modified_urls = [] - # 找到 URL 中 IP 地址开始的索引位置,"//" 后两个字符开始为 IP 地址起始位置 ip_start_index = url.find("//") + 2 - # 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置 ip_end_index = url.find(":", ip_start_index) - # 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置 base_url = url[:ip_start_index] - # 获取 URL 中的 IP 地址部分 ip_address = url[ip_start_index:ip_end_index] - # 获取 URL 中的端口部分 port = url[ip_end_index:] - # 定义一个字符串,表示 IP 地址的结尾部分 ip_end = "/ZHGXTV/Public/json/live_interface.txt" - # 遍历 1 到 255 的数字 for i in range(1, 256): - # 修改 IP 地址的最后一位数字 modified_ip = f"{ip_address[:-1]}{i}" - # 组合成新的 URL modified_url = f"{base_url}{modified_ip}{port}{ip_end}" - # 将新的 URL 添加到列表中 modified_urls.append(modified_url) - # 返回修改后的 URL 列表 return modified_urls def is_url_accessible(url): try: - # 发送 GET 请求,设置超时时间为 3 秒 - response = requests.get(url, timeout=3) - # 如果响应状态码在 200 到 401 之间(包括 200 和 401),则认为 URL 可访问 + response = requests.get(url, timeout=3, headers={'User-Agent': UserAgent().random}) if 200 <= response.status_code <= 401: return url except requests.exceptions.RequestException: - # 如果请求过程中出现异常,不做任何处理,直接跳过 pass return None -# 创建一个空列表用于存储结果 results = [] for url in urls: - # 发送 GET 请求获取 URL 的内容 - response = requests.get(url) - # 获取响应的文本内容 + response = requests.get(url, headers={'User-Agent': UserAgent().random}) page_content = response.text - - # 查找所有符合指定格式的网址 - pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如 http://8.8.8.8:8888 - # 使用正则表达式在页面内容中查找所有符合格式的 URL - urls_all = re.findall(pattern, page_content) - # urls = list(set(urls_all)) # 去重得到唯一的URL列表 - urls = set(urls_all) # 去重得到唯一的URL列表 + urls_all = re.findall(r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+", page_content) + urls = set(urls_all) x_urls = [] - for url in urls: # 对urls进行处理,ip第四位修改为1,并去重 + for url in urls: url = url.strip() - # 找到 URL 中 IP 地址开始的索引位置,"//" 后两个字符开始为 IP 地址起始位置 ip_start_index = url.find("//") + 2 - # 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置 ip_end_index = url.find(":", ip_start_index) - # 找到 IP 地址中第一个 "." 的位置 ip_dot_start = url.find(".") + 1 - # 找到 IP 地址中第二个 "." 的位置 ip_dot_second = url.find(".", ip_dot_start) + 1 - # 找到 IP 地址中第三个 "." 的位置 ip_dot_three = url.find(".", ip_dot_second) + 1 - # 获取 URL 的基础部分,即从开头到 IP 地址开始的部分 - base_url = url[:ip_start_index] # http:// or https:// - # 获取 URL 中的 IP 地址部分,截取到第三个 "." 的位置 + base_url = url[:ip_start_index] ip_address = url[ip_start_index:ip_dot_three] - # 获取 URL 中的端口部分 port = url[ip_end_index:] - # 定义一个字符串,表示 IP 地址的结尾部分为 "1" ip_end = "1" - # 修改 IP 地址的最后一位为 "1" modified_ip = f"{ip_address}{ip_end}" - # 组合成新的 URL x_url = f"{base_url}{modified_ip}{port}" - # 将新的 URL 添加到列表中 x_urls.append(x_url) - urls = set(x_urls) # 去重得到唯一的URL列表 + urls = set(x_urls) valid_urls = [] - # 多线程获取可用url with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: futures = [] for url in urls: - url = url.strip() - # 获取修改后的 URL 列表 modified_urls = modify_urls(url) for modified_url in modified_urls: - # 提交任务,检查每个修改后的 URL 是否可访问 futures.append(executor.submit(is_url_accessible, modified_url)) for future in concurrent.futures.as_completed(futures): result = future.result() if result: - # 如果 URL 可访问,将其添加到有效 URL 列表中 valid_urls.append(result) for url in valid_urls: print(url) - # 遍历网址列表,获取JSON文件并解析 for url in valid_urls: try: - # 发送GET请求获取JSON文件,设置超时时间为0.5秒 json_url = f"{url}" - response = requests.get(json_url, timeout=3)################################ + response = requests.get(json_url, timeout=3, headers={'User-Agent': UserAgent().random}) json_data = response.content.decode('utf-8') - try: - # 按行分割数据 - lines = json_data.split('\n') - excluded_keywords = ['udp', 'rtp'] - for line in lines: - if 'hls' in line and all(keyword not in line for keyword in excluded_keywords): - line = line.strip() - if line: - name, channel_url = line.split(',') - urls = channel_url.split('/', 3) - url_data = json_url.split('/', 3) - if len(urls) >= 3: - urld = (f"{urls[0]}//{url_data[2]}/{urls[3]}") - else: - urld = (f"{urls}") - #print(f"{name},{urld}") #关闭频道名称和频道地址打印,缩短运行时间 + lines = json_data.split('\n') + excluded_keywords = ['udp', 'rtp'] + for line in lines: + if 'hls' in line and all(keyword not in line for keyword in excluded_keywords): + line = line.strip() + if line: + name, channel_url = line.split(',') + urls = channel_url.split('/', 3) + url_data = json_url.split('/', 3) + if len(urls) >= 3: + urld = (f"{urls[0]}//{url_data[2]}/{urls[3]}") + else: + urld = (f"{urls}") if name and urld: name = name.replace("高清电影", "影迷电影") name = name.replace("中央", "CCTV")