Update 酒店源255.py

This commit is contained in:
frxz751113
2025-05-04 12:47:28 +08:00
committed by GitHub
parent f9129dd6e3
commit ed4ddbdf06
+77 -32
View File
@@ -30,9 +30,7 @@ from translate import Translator # 导入Translator类,用于文本翻译
###########################################################ZHGX采集####################################################
######################################################################################################################
######################################################################################################################
import random
import concurrent.futures
from fake_useragent import UserAgent # 需要先安装:pip install fake-useragent
import requests
urls = [
#"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODA4Ig%3D%3D", #808
@@ -42,87 +40,132 @@ urls = [
#"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODAwMyI%3D" , #8003
"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODg4Ig%3D%3D" , #888
#"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODA4OSI%3D" , #8089
"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODgi"
#"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcG9ydD0iODAwOSI%3D", #8009
]
def modify_urls(url):
# 创建一个空列表用于存储修改后的 URL
modified_urls = []
# 找到 URL 中 IP 地址开始的索引位置,"//" 后两个字符开始为 IP 地址起始位置
ip_start_index = url.find("//") + 2
# 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置
ip_end_index = url.find(":", ip_start_index)
# 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置
base_url = url[:ip_start_index]
# 获取 URL 中的 IP 地址部分
ip_address = url[ip_start_index:ip_end_index]
# 获取 URL 中的端口部分
port = url[ip_end_index:]
# 定义一个字符串,表示 IP 地址的结尾部分
ip_end = "/ZHGXTV/Public/json/live_interface.txt"
# 遍历 1 到 255 的数字
for i in range(1, 256):
# 修改 IP 地址的最后一位数字
modified_ip = f"{ip_address[:-1]}{i}"
# 组合成新的 URL
modified_url = f"{base_url}{modified_ip}{port}{ip_end}"
# 将新的 URL 添加到列表中
modified_urls.append(modified_url)
# 返回修改后的 URL 列表
return modified_urls
def is_url_accessible(url):
try:
response = requests.get(url, timeout=3, headers={'User-Agent': UserAgent().random})
# 发送 GET 请求,设置超时时间为 3 秒
response = requests.get(url, timeout=3)
# 如果响应状态码在 200 到 401 之间(包括 200 和 401),则认为 URL 可访问
if 200 <= response.status_code <= 401:
return url
except requests.exceptions.RequestException:
# 如果请求过程中出现异常,不做任何处理,直接跳过
pass
return None
# 创建一个空列表用于存储结果
results = []
for url in urls:
response = requests.get(url, headers={'User-Agent': UserAgent().random})
# 发送 GET 请求获取 URL 的内容
response = requests.get(url)
# 获取响应的文本内容
page_content = response.text
urls_all = re.findall(r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+", page_content)
urls = set(urls_all)
# 查找所有符合指定格式的网址
pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如 http://8.8.8.8:8888
# 使用正则表达式在页面内容中查找所有符合格式的 URL
urls_all = re.findall(pattern, page_content)
# urls = list(set(urls_all)) # 去重得到唯一的URL列表
urls = set(urls_all) # 去重得到唯一的URL列表
x_urls = []
for url in urls: # 缩进修正:内层循环与外层for对齐
for url in urls: # 对urls进行处理,ip第四位修改为1,并去重
url = url.strip()
# 找到 URL 中 IP 地址开始的索引位置,"//" 后两个字符开始为 IP 地址起始位置
ip_start_index = url.find("//") + 2
# 找到 URL 中 IP 地址结束的索引位置,从 ip_start_index 开始查找第一个 ":" 的位置
ip_end_index = url.find(":", ip_start_index)
# 找到 IP 地址中第一个 "." 的位置
ip_dot_start = url.find(".") + 1
# 找到 IP 地址中第二个 "." 的位置
ip_dot_second = url.find(".", ip_dot_start) + 1
# 找到 IP 地址中第三个 "." 的位置
ip_dot_three = url.find(".", ip_dot_second) + 1
base_url = url[:ip_start_index]
# 获取 URL 的基础部分,即从开头到 IP 地址开始的部分
base_url = url[:ip_start_index] # http:// or https://
# 获取 URL 中的 IP 地址部分,截取到第三个 "." 的位置
ip_address = url[ip_start_index:ip_dot_three]
# 获取 URL 中的端口部分
port = url[ip_end_index:]
# 定义一个字符串,表示 IP 地址的结尾部分为 "1"
ip_end = "1"
# 修改 IP 地址的最后一位为 "1"
modified_ip = f"{ip_address}{ip_end}"
# 组合成新的 URL
x_url = f"{base_url}{modified_ip}{port}"
# 将新的 URL 添加到列表中
x_urls.append(x_url)
urls = set(x_urls)
urls = set(x_urls) # 去重得到唯一的URL列表
valid_urls = []
# 多线程获取可用url
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for url in urls: # 缩进修正:内层循环与with块对齐
for url in urls:
url = url.strip()
# 获取修改后的 URL 列表
modified_urls = modify_urls(url)
for modified_url in modified_urls:
# 提交任务,检查每个修改后的 URL 是否可访问
futures.append(executor.submit(is_url_accessible, modified_url))
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
# 如果 URL 可访问,将其添加到有效 URL 列表中
valid_urls.append(result)
for url in valid_urls: # 缩进修正:与外层for对齐
for url in valid_urls:
print(url)
for url in valid_urls: # 缩进修正:与外层for对齐
# 遍历网址列表,获取JSON文件并解析
for url in valid_urls:
try:
# 发送GET请求获取JSON文件,设置超时时间为0.5秒
json_url = f"{url}"
response = requests.get(json_url, timeout=3, headers={'User-Agent': UserAgent().random})
response = requests.get(json_url, timeout=3)################################
json_data = response.content.decode('utf-8')
lines = json_data.split('\n')
excluded_keywords = ['udp', 'rtp']
for line in lines: # 缩进修正:内层循环与try块对齐
if 'hls' in line and all(keyword not in line for keyword in excluded_keywords):
line = line.strip()
if line:
name, channel_url = line.split(',')
urls = channel_url.split('/', 3)
url_data = json_url.split('/', 3)
if len(urls) >= 3:
urld = (f"{urls[0]}//{url_data[2]}/{urls[3]}")
else:
urld = (f"{urls}")
try:
# 按行分割数据
lines = json_data.split('\n')
excluded_keywords = ['udp', 'rtp']
for line in lines:
if 'hls' in line and all(keyword not in line for keyword in excluded_keywords):
line = line.strip()
if line:
name, channel_url = line.split(',')
urls = channel_url.split('/', 3)
url_data = json_url.split('/', 3)
if len(urls) >= 3:
urld = (f"{urls[0]}//{url_data[2]}/{urls[3]}")
else:
urld = (f"{urls}")
#print(f"{name},{urld}") #关闭频道名称和频道地址打印,缩短运行时间
if name and urld:
name = name.replace("高清电影", "影迷电影")
name = name.replace("高清电影", "影迷电影")
name = name.replace("中央", "CCTV")
name = name.replace("高清", "")
name = name.replace("HD", "")
@@ -229,15 +272,17 @@ for url in urls:
name = name.replace("奥运匹克", "")
urld = urld.replace("index.m3u8", "index.m3u8?$智慧光迅听说名字越长越好看")
results.append(f"{name},{urld}")
except: # 缩进修正:与外层for对齐
except:
continue
except:
continue
channels = []
for result in results: # 缩进修正:与外层代码块对齐
for result in results:
line = result.strip()
if result:
channel_name, channel_url = result.split(',')
channels.append((channel_name, channel_url))
with open("iptv.txt", 'w', encoding='utf-8') as file: # 缩进修正:与外层代码块对齐
with open("iptv.txt", 'w', encoding='utf-8') as file:
for result in results:
file.write(result + "\n")
print(result) #关闭频道名称和频道地址打印,缩短运行时间