Update IPTV+测绘站采集.py

This commit is contained in:
frxz751113
2024-08-21 13:08:10 +08:00
committed by GitHub
parent 9c114646fe
commit 20de0f7d07
+36 -35
View File
@@ -1,18 +1,6 @@
import requests
import re
from lxml import etree
import os
import requests
import re
from lxml import etree
import os
# 定义代理
proxy = {
'http': '139.9.119.20:80',
'http': '47.106.144.184:7890',
}
# 定义请求头
header = {
@@ -23,34 +11,36 @@ header = {
def via_tonking(url):
headers = {
'Referer': 'http://tonkiang.us/hotellist.html',
'User-Agent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)',
'User-Agent': header["User-Agent"],
}
url = f'http://tonkiang.us/alllist.php?s={url}&c=false&y=false'
try:
response = requests.get(
url=url,
url=f'http://tonkiang.us/alllist.php?s={url}&c=false&y=false',
headers=headers,
verify=False,
proxies=proxy,
timeout=10
)
response.raise_for_status()
et = etree.HTML(response.text)
div_text = et.xpath('//div[@class="result"]/div/text()')[1]
return "暂时失效" not in div_text
except Exception as e:
print(f"验证IP时发生错误: {e}")
return False
# 从tonkiang获取可用IP
def get_tonkiang(keyword):
# 构造POST数据
data = {
"saerch": f"{keyword}",
"Submit": " "
}
try:
resp = requests.post(
"http://tonkiang.us/hoteliptv.php",
headers=header,
data=data,
timeout=10,
proxies=proxy
timeout=10
)
resp.raise_for_status()
resp.encoding = 'utf-8'
et = etree.HTML(resp.text)
divs = et.xpath('//div[@class="tables"]/div')
@@ -65,34 +55,45 @@ def get_tonkiang(keyword):
except (IndexError, ValueError):
continue
return result_urls
except Exception as e:
print(f"获取IP时发生错误: {e}")
return []
# 生成文件
def gen_files(valid_ips, province, isp):
# 生成节目列表 省份运营商.txt
index = 0
print(valid_ips)
udp_filename = f'rtp/{province}_{isp}.txt'
txt_filename = f'playlist/{province}{isp}.txt'
try:
with open(udp_filename, 'r', encoding='utf-8') as file:
data = file.read()
with open(txt_filename, 'a', encoding='utf-8') as new_file: # 修改为'a'以追加文件
txt_filename = f'playlist/{province}{isp}.txt'
with open(txt_filename, 'a', encoding='utf-8') as new_file:
new_file.write(f'{province}{isp},#genre#\n')
for url in valid_ips:
new_data = data.replace("rtp://", f"{url}/rtp/")
if index < 3:
new_data = data.replace("rtp://", f"{url[0]}/rtp/")
new_file.write(new_data)
except FileNotFoundError:
print(f"文件 '{udp_filename}' 不存在.")
except Exception as e:
print(f"生成文件时发生错误: {e}")
new_file.write('\n')
index += 1
else:
continue
print(f'已生成播放列表,保存至{txt_filename}')
# 遍历rtp文件夹中的所有文件
rtp_folder = 'rtp'
playlist_folder = 'playlist'
# 确保playlist目录存在
os.makedirs(playlist_folder, exist_ok=True)
for filename in os.listdir(rtp_folder):
if filename.endswith(".txt") and "_" in filename:
province, isp = filename[:-4].split("_") # 假设文件名格式为"省份_运营商.txt"
key_word = f"{province}{isp}" # 构造关键词
valid_ips = get_tonkiang(key_word) # 搜索有效IP
if filename.endswith(".txt"):
province_isp = filename[:-4] # 获取不包含扩展名的文件名
keyword = province_isp.replace('_', '') # 假设文件名格式为"省份_运营商"
valid_ips = get_tonkiang(keyword) # 搜索有效IP
if valid_ips:
print(f"找到有效IP,正在生成播放列表: {province}{isp}")
gen_files(valid_ips, province, isp) # 生成播放列表文件
print(f"找到有效IP,正在生成文本文件: {provinceisp}")
gen_txt_files(valid_ips, province_isp.split('_')[0], province_isp.split('_')[1]) # 生成文本文件
else:
print(f"未找到有效IP: {province} {isp}")
print(f"未找到有效IP: {province_isp}")