Update IPTV+测绘站采集.py

This commit is contained in:
frxz751113
2024-08-21 12:51:54 +08:00
committed by GitHub
parent 4d2cb4410c
commit 975e0aac80
+52 -25
View File
@@ -3,23 +3,34 @@ import re
from lxml import etree from lxml import etree
import os import os
import requests
import re
from lxml import etree
import os
# 定义代理 # 定义代理
proxy = { proxy = {
'http': 'http://139.9.119.20:80', 'http': 'http://139.9.119.20:80',
'https': 'http://139.9.119.20:80', # 注意:根据实际代理是否支持https进行设置 'https': 'http://139.9.119.20:80',
} }
# 定义请求头
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
}
# 验证tonkiang可用IP # 验证tonkiang可用IP
def via_tonking(url): def via_tonking(url):
headers = { headers = {
'Referer': 'http://tonkiang.us/hotellist.html', 'Referer': 'http://tonkiang.us/hotellist.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36', 'User-Agent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)',
} }
url = f'http://tonkiang.us/alllist.php?s={url}&c=false&y=false' url = f'http://tonkiang.us/alllist.php?s={url}&c=false&y=false'
response = requests.get( response = requests.get(
url=url, url=url,
headers=headers, headers=headers,
verify=False, # 注意:verify=False会忽略SSL证书验证 verify=False,
proxies=proxy, # 这里使用之前定义的代理 proxies=proxy,
timeout=10 timeout=10
) )
et = etree.HTML(response.text) et = etree.HTML(response.text)
@@ -27,45 +38,61 @@ def via_tonking(url):
return "暂时失效" not in div_text return "暂时失效" not in div_text
# 从tonkiang获取可用IP # 从tonkiang获取可用IP
def get_tonkiang(key_words): def get_tonkiang(keyword):
result_urls = [] # 构造POST数据
data = { data = {
"saerch": f"{key_words}", "saerch": f"{keyword}",
"Submit": " " "Submit": " "
} }
url = "http://tonkiang.us/hoteliptv.php"
resp = requests.post( resp = requests.post(
url=url, "http://tonkiang.us/hoteliptv.php",
headers=header, headers=header,
data=data, data=data,
timeout=10, timeout=10,
proxies=proxy # 这里使用之前定义的代理 proxies=proxy
) )
resp.encoding = 'utf-8' resp.encoding = 'utf-8'
et = etree.HTML(resp.text) et = etree.HTML(resp.text)
divs = et.xpath('//div[@class="tables"]/div') divs = et.xpath('//div[@class="tables"]/div')
result_urls = []
for div in divs: for div in divs:
try: try:
status = div.xpath('./div[3]/div/text()')[0] status = div.xpath('./div[3]/div/text()')[0]
if "暂时失效" not in status: if "暂时失效" not in status:
url = div.xpath('./div[1]/a/b/text()')[0] ip = div.xpath('./div[1]/a/b/text()')[0].strip()
url = url.strip() if via_tonking(ip):
if via_tonking(url): result_urls.append(f'http://{ip}')
result_urls.append(f'http://{url}') except (IndexError, ValueError):
except: continue
pass
return result_urls return result_urls
# 生成文件 # 生成文件
def gen_files(valid_ips, province, isp): def gen_files(valid_ips, province, isp):
udp_filename = f'rtp/{province}_{isp}.txt' udp_filename = f'rtp/{province}_{isp}.txt'
with open(udp_filename, 'r', encoding='utf-8') as file:
data = file.read()
txt_filename = f'playlist/{province}{isp}.txt' txt_filename = f'playlist/{province}{isp}.txt'
with open(txt_filename, 'a', encoding='utf-8') as new_file: try:
new_file.write(f'{province}{isp},#genre#\n') with open(udp_filename, 'r', encoding='utf-8') as file:
for url in valid_ips: data = file.read()
new_data = data.replace("rtp://", f"{url[0]}/rtp/") with open(txt_filename, 'a', encoding='utf-8') as new_file: # 修改为'a'以追加文件
new_file.write(new_data) for url in valid_ips:
new_file.write('\n') new_data = data.replace("rtp://", f"{url}/rtp/")
print(f'已生成播放列表,保存至{txt_filename}') new_file.write(new_data)
except FileNotFoundError:
print(f"文件 '{udp_filename}' 不存在.")
except Exception as e:
print(f"生成文件时发生错误: {e}")
# 遍历rtp文件夹中的所有文件
rtp_folder = 'rtp'
playlist_folder = 'playlist'
for filename in os.listdir(rtp_folder):
if filename.endswith(".txt") and "_" in filename:
province, isp = filename[:-4].split("_") # 假设文件名格式为"省份_运营商.txt"
key_word = f"{province}{isp}" # 构造关键词
valid_ips = get_tonkiang(key_word) # 搜索有效IP
if valid_ips:
print(f"找到有效IP,正在生成播放列表: {province}{isp}")
gen_files(valid_ips, province, isp) # 生成播放列表文件
else:
print(f"未找到有效IP: {province} {isp}")