Update 酒店源.py

This commit is contained in:
frxz751113
2024-08-29 19:42:57 +08:00
committed by GitHub
parent 6fd0f61aee
commit 50e790c579
+73 -37
View File
@@ -46,62 +46,98 @@ def modify_urls(url):
ip_address = url[ip_start_index:ip_end_index] ip_address = url[ip_start_index:ip_end_index]
port = url[ip_end_index:] port = url[ip_end_index:]
ip_end = "/ZHGXTV/Public/json/live_interface.txt" ip_end = "/ZHGXTV/Public/json/live_interface.txt"
for i in range(1, 256):
# 定义超时时间以及是否返回正确的状态码 modified_ip = f"{ip_address[:-1]}{i}"
modified_url = f"{base_url}{modified_ip}{port}{ip_end}"
modified_urls.append(modified_url)
return modified_urls
#定义超时时间以及是否返回正确的状态码
def is_url_accessible(url): def is_url_accessible(url):
try: try:
response = requests.get(url, timeout=5) response = requests.get(url, timeout=1) #//////////////////
# 检查HTTP状态码是否在200到401之间 #if response.status_code == 200:
if 200 <= response.status_code <= 401: if 200 <= response.status_code <= 401:
return url return url
except requests.exceptions.RequestException: except requests.exceptions.RequestException:
pass pass
return None return None
results = []
# 遍历网址列表,检查每个URL的可访问性
valid_urls = []
for url in urls: for url in urls:
# 检查URL是否可访问,并添加到有效URL列表 # 创建一个Chrome WebDriver实例
accessible_url = is_url_accessible(url) chrome_options = Options()
if accessible_url: chrome_options.add_argument('--headless')
valid_urls.append(accessible_url) chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
# 使用WebDriver访问网页
driver.get(url) # 将网址替换为你要访问的网页地址
time.sleep(10)
# 获取网页内容
page_content = driver.page_source
# 关闭WebDriver
driver.quit()
# 查找所有符合指定格式的网址
pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如http://8.8.8.8:8888
urls_all = re.findall(pattern, page_content)
# urls = list(set(urls_all)) # 去重得到唯一的URL列表
urls = set(urls_all) # 去重得到唯一的URL列表
x_urls = []
for url in urls: # 对urls进行处理,ip第四位修改为1,并去重
url = url.strip()
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
ip_dot_start = url.find(".") + 1
ip_dot_second = url.find(".", ip_dot_start) + 1
ip_dot_three = url.find(".", ip_dot_second) + 1
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_dot_three]
port = url[ip_end_index:]
ip_end = "1"
modified_ip = f"{ip_address}{ip_end}"
x_url = f"{base_url}{modified_ip}{port}"
x_urls.append(x_url)
urls = set(x_urls) # 去重得到唯一的URL列表
valid_urls = []
# 多线程获取可用url
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for url in urls:
url = url.strip()
modified_urls = modify_urls(url)
for modified_url in modified_urls:
futures.append(executor.submit(is_url_accessible, modified_url))
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
valid_urls.append(result)
for url in valid_urls:
print(url)
# 打开文件准备写入 # 从有效URL提取数据并替换IP后写入文件
with open('iptv.txt', 'w', encoding='utf-8') as outfile: with open('iptv.txt', 'w', encoding='utf-8') as outfile:
# 遍历有效URL列表,获取JSON文件并解析
for json_url in valid_urls: for json_url in valid_urls:
try: try:
# 发送GET请求获取JSON文件,设置超时时间为0.5秒 response = requests.get(json_url, timeout=1)
response = requests.get(json_url, timeout=5)
json_data = response.content.decode('utf-8') json_data = response.content.decode('utf-8')
lines = re.split(r'\r?\n', json_data)
# 按行分割数据
lines = json_data.split('\n')
for line in lines: for line in lines:
# 行中需包含hls,但排除udp和rtp if 'hls' in line and 'udp' not in line and 'rtp' not in line:
if 'hls' in line and ('udp' not in line and 'rtp' not in line): name, channel_url = line.split(',', 1)
line = line.strip() parsed_channel_url = urlparse(channel_url)
if line: channel_ip = parsed_channel_url.hostname
# 解析json_url以提取IP和端口 channel_port = f":{parsed_channel_url.port}" if parsed_channel_url.port else ''
parsed_json_url = urlparse(json_url) original_parsed_url = urlparse(json_url)
json_ip = parsed_json_url.hostname original_ip = original_parsed_url.hostname
json_port = ':' + parsed_json_url.port if parsed_json_url.port else '' original_port = f":{original_parsed_url.port}" if original_parsed_url.port else ''
# 替换频道URL中的IP和端口
# 分割行以获取频道名和原始URL new_channel_url = channel_url.replace(f"{channel_ip}{channel_port}", f"{original_ip}{original_port}")
name, channel_url = line.split(',')
# 替换原始URL中的IP地址和端口为json_url中的IP地址和端口
new_channel_url = channel_url.replace(channel_url.split('/')[2], json_ip)
if json_port:
new_channel_url = new_channel_url.replace(f":{channel_url.split(':')[1]}", json_port)
# 写入到文件中
outfile.write(f"{name},{new_channel_url}\n") outfile.write(f"{name},{new_channel_url}\n")
print(f"写入行: {name},{new_channel_url}") # 打印写入的行
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"Error fetching or processing the JSON data: {e}") print(f"Error fetching or processing the JSON data: {e}")
print("频道列表文件iptv.txt获取完成!") print("频道列表文件iptv.txt获取完成!")
print("频道列表文件iptv.txt获取完成!")