diff --git a/py/酒店源.py b/py/酒店源.py index d853f8a..ddcd318 100644 --- a/py/酒店源.py +++ b/py/酒店源.py @@ -46,62 +46,98 @@ def modify_urls(url): ip_address = url[ip_start_index:ip_end_index] port = url[ip_end_index:] ip_end = "/ZHGXTV/Public/json/live_interface.txt" - -# 定义超时时间以及是否返回正确的状态码 + for i in range(1, 256): + modified_ip = f"{ip_address[:-1]}{i}" + modified_url = f"{base_url}{modified_ip}{port}{ip_end}" + modified_urls.append(modified_url) + return modified_urls +#定义超时时间以及是否返回正确的状态码 def is_url_accessible(url): try: - response = requests.get(url, timeout=5) - # 检查HTTP状态码是否在200到401之间 + response = requests.get(url, timeout=1) #////////////////// + #if response.status_code == 200: if 200 <= response.status_code <= 401: return url except requests.exceptions.RequestException: pass return None - -# 遍历网址列表,检查每个URL的可访问性 -valid_urls = [] +results = [] for url in urls: - # 检查URL是否可访问,并添加到有效URL列表 - accessible_url = is_url_accessible(url) - if accessible_url: - valid_urls.append(accessible_url) + # 创建一个Chrome WebDriver实例 + chrome_options = Options() + chrome_options.add_argument('--headless') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + driver = webdriver.Chrome(options=chrome_options) + # 使用WebDriver访问网页 + driver.get(url) # 将网址替换为你要访问的网页地址 + time.sleep(10) + # 获取网页内容 + page_content = driver.page_source + # 关闭WebDriver + driver.quit() + # 查找所有符合指定格式的网址 + pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如http://8.8.8.8:8888 + urls_all = re.findall(pattern, page_content) + # urls = list(set(urls_all)) # 去重得到唯一的URL列表 + urls = set(urls_all) # 去重得到唯一的URL列表 + x_urls = [] + for url in urls: # 对urls进行处理,ip第四位修改为1,并去重 + url = url.strip() + ip_start_index = url.find("//") + 2 + ip_end_index = url.find(":", ip_start_index) + ip_dot_start = url.find(".") + 1 + ip_dot_second = url.find(".", ip_dot_start) + 1 + ip_dot_three = url.find(".", ip_dot_second) + 1 + base_url = url[:ip_start_index] # http:// or https:// + ip_address = url[ip_start_index:ip_dot_three] + port = url[ip_end_index:] + ip_end = "1" + modified_ip = f"{ip_address}{ip_end}" + x_url = f"{base_url}{modified_ip}{port}" + x_urls.append(x_url) + urls = set(x_urls) # 去重得到唯一的URL列表 + valid_urls = [] + # 多线程获取可用url + with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: + futures = [] + for url in urls: + url = url.strip() + modified_urls = modify_urls(url) + for modified_url in modified_urls: + futures.append(executor.submit(is_url_accessible, modified_url)) + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result: + valid_urls.append(result) + for url in valid_urls: + print(url) -# 打开文件准备写入 +# 从有效URL提取数据并替换IP后写入文件 with open('iptv.txt', 'w', encoding='utf-8') as outfile: - # 遍历有效URL列表,获取JSON文件并解析 for json_url in valid_urls: try: - # 发送GET请求获取JSON文件,设置超时时间为0.5秒 - response = requests.get(json_url, timeout=5) + response = requests.get(json_url, timeout=1) json_data = response.content.decode('utf-8') - - # 按行分割数据 - lines = json_data.split('\n') + lines = re.split(r'\r?\n', json_data) for line in lines: - # 行中需包含hls,但排除udp和rtp - if 'hls' in line and ('udp' not in line and 'rtp' not in line): - line = line.strip() - if line: - # 解析json_url以提取IP和端口 - parsed_json_url = urlparse(json_url) - json_ip = parsed_json_url.hostname - json_port = ':' + parsed_json_url.port if parsed_json_url.port else '' - - # 分割行以获取频道名和原始URL - name, channel_url = line.split(',') - - # 替换原始URL中的IP地址和端口为json_url中的IP地址和端口 - new_channel_url = channel_url.replace(channel_url.split('/')[2], json_ip) - if json_port: - new_channel_url = new_channel_url.replace(f":{channel_url.split(':')[1]}", json_port) - - # 写入到文件中 - outfile.write(f"{name},{new_channel_url}\n") + if 'hls' in line and 'udp' not in line and 'rtp' not in line: + name, channel_url = line.split(',', 1) + parsed_channel_url = urlparse(channel_url) + channel_ip = parsed_channel_url.hostname + channel_port = f":{parsed_channel_url.port}" if parsed_channel_url.port else '' + original_parsed_url = urlparse(json_url) + original_ip = original_parsed_url.hostname + original_port = f":{original_parsed_url.port}" if original_parsed_url.port else '' + # 替换频道URL中的IP和端口 + new_channel_url = channel_url.replace(f"{channel_ip}{channel_port}", f"{original_ip}{original_port}") + outfile.write(f"{name},{new_channel_url}\n") + print(f"写入行: {name},{new_channel_url}") # 打印写入的行 except requests.exceptions.RequestException as e: print(f"Error fetching or processing the JSON data: {e}") print("频道列表文件iptv.txt获取完成!") -print("频道列表文件iptv.txt获取完成!") +