Update 酒店源.py

This commit is contained in:
frxz751113
2024-08-29 19:01:08 +08:00
committed by GitHub
parent 60bb227e07
commit cef355ce32
+17 -77
View File
@@ -46,105 +46,45 @@ def modify_urls(url):
ip_address = url[ip_start_index:ip_end_index] ip_address = url[ip_start_index:ip_end_index]
port = url[ip_end_index:] port = url[ip_end_index:]
ip_end = "/ZHGXTV/Public/json/live_interface.txt" ip_end = "/ZHGXTV/Public/json/live_interface.txt"
for i in range(1, 256):
modified_ip = f"{ip_address[:-1]}{i}" # 定义超时时间以及是否返回正确的状态码
modified_url = f"{base_url}{modified_ip}{port}{ip_end}"
modified_urls.append(modified_url)
return modified_urls
#定义超时时间以及是否返回正确的状态码
def is_url_accessible(url): def is_url_accessible(url):
try: try:
response = requests.get(url, timeout=1) #////////////////// response = requests.get(url, timeout=5)
#if response.status_code == 200: # 检查HTTP状态码是否在200到401之间
if 200 <= response.status_code <= 401: if 200 <= response.status_code <= 401:
return url return url
except requests.exceptions.RequestException: except requests.exceptions.RequestException:
pass pass
return None return None
results = []
# 遍历网址列表,检查每个URL的可访问性
valid_urls = []
for url in urls: for url in urls:
# 创建一个Chrome WebDriver实例 # 检查URL是否可访问,并添加到有效URL列表
chrome_options = Options() accessible_url = is_url_accessible(url)
chrome_options.add_argument('--headless') if accessible_url:
chrome_options.add_argument('--no-sandbox') valid_urls.append(accessible_url)
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
# 使用WebDriver访问网页
driver.get(url) # 将网址替换为你要访问的网页地址
time.sleep(10)
# 获取网页内容
page_content = driver.page_source
# 关闭WebDriver
driver.quit()
# 查找所有符合指定格式的网址
pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如http://8.8.8.8:8888
urls_all = re.findall(pattern, page_content)
# urls = list(set(urls_all)) # 去重得到唯一的URL列表
urls = set(urls_all) # 去重得到唯一的URL列表
x_urls = []
for url in urls: # 对urls进行处理,ip第四位修改为1,并去重
url = url.strip()
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
ip_dot_start = url.find(".") + 1
ip_dot_second = url.find(".", ip_dot_start) + 1
ip_dot_three = url.find(".", ip_dot_second) + 1
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_dot_three]
port = url[ip_end_index:]
ip_end = "1"
modified_ip = f"{ip_address}{ip_end}"
x_url = f"{base_url}{modified_ip}{port}"
x_urls.append(x_url)
urls = set(x_urls) # 去重得到唯一的URL列表
valid_urls = []
# 多线程获取可用url
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for url in urls:
url = url.strip()
modified_urls = modify_urls(url)
for modified_url in modified_urls:
futures.append(executor.submit(is_url_accessible, modified_url))
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
valid_urls.append(result)
for url in valid_urls:
print(url)
# 打开文件准备写入 # 打开文件准备写入
with open('iptv.txt', 'w', encoding='utf-8') as outfile: with open('iptv.txt', 'w', encoding='utf-8') as outfile:
# 遍历网址列表,获取JSON文件并解析 # 遍历有效URL列表获取JSON文件并解析
for json_url in valid_urls: for json_url in valid_urls:
try: try:
# 发送GET请求获取JSON文件,设置超时时间为0.5秒 # 发送GET请求获取JSON文件设置超时时间为0.5秒
response = requests.get(json_url, timeout=0.5) response = requests.get(json_url, timeout=5)
json_data = response.content.decode('utf-8') json_data = response.content.decode('utf-8')
# 按行分割数据 # 按行分割数据
lines = json_data.split('\n') lines = json_data.split('\n')
for line in lines: for line in lines:
# 行中需包含hls,但排除udp和rtp # 行中需包含hls,但排除udp和rtp
if 'hls' in line and ('udp' not in line and 'rtp' not in line): if 'hls' in line and ('udp' not in line and 'rtp' not in line):
line = line.strip() line = line.strip()
if line: if line:
# 分割行以获取频道名和原始URL # 直接使用原始URL,不进行IP变体生成
name, channel_url = line.split(',')
# 从json_url中提取IP地址和端口号
parsed_json_url = urlparse(json_url)
new_ip = parsed_json_url.hostname
new_port = parsed_json_url.port if parsed_json_url.port else ''
# 替换原始URL中的IP地址和端口号
if new_port:
new_channel_url = line.replace(f"://{parsed_json_url.hostname}", f"://{new_ip}:{new_port}")
else:
new_channel_url = line.replace(f"://{parsed_json_url.hostname}", f"://{new_ip}")
# 写入到文件中 # 写入到文件中
outfile.write(new_channel_url + '\n') outfile.write(line + '\n')
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"Error fetching or processing the JSON data: {e}") print(f"Error fetching or processing the JSON data: {e}")
print("频道列表文件iptv.txt获取完成!") print("频道列表文件iptv.txt获取完成!")