diff --git a/py/IPTV+测绘站采集.py b/py/IPTV+测绘站采集.py index fb3576c..1d75649 100644 --- a/py/IPTV+测绘站采集.py +++ b/py/IPTV+测绘站采集.py @@ -1,18 +1,6 @@ import requests -import re -from lxml import etree import os - -import requests -import re from lxml import etree -import os - -# 定义代理 -proxy = { - 'http': '139.9.119.20:80', - 'http': '47.106.144.184:7890', -} # 定义请求头 header = { @@ -23,76 +11,89 @@ header = { def via_tonking(url): headers = { 'Referer': 'http://tonkiang.us/hotellist.html', - 'User-Agent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)', + 'User-Agent': header["User-Agent"], } - url = f'http://tonkiang.us/alllist.php?s={url}&c=false&y=false' - response = requests.get( - url=url, - headers=headers, - verify=False, - proxies=proxy, - timeout=10 - ) - et = etree.HTML(response.text) - div_text = et.xpath('//div[@class="result"]/div/text()')[1] - return "暂时失效" not in div_text + try: + response = requests.get( + url=f'http://tonkiang.us/alllist.php?s={url}&c=false&y=false', + headers=headers, + timeout=10 + ) + response.raise_for_status() + et = etree.HTML(response.text) + div_text = et.xpath('//div[@class="result"]/div/text()')[1] + return "暂时失效" not in div_text + except Exception as e: + print(f"验证IP时发生错误: {e}") + return False # 从tonkiang获取可用IP def get_tonkiang(keyword): - # 构造POST数据 data = { "saerch": f"{keyword}", "Submit": " " } - resp = requests.post( - "http://tonkiang.us/hoteliptv.php", - headers=header, - data=data, - timeout=10, - proxies=proxy - ) - resp.encoding = 'utf-8' - et = etree.HTML(resp.text) - divs = et.xpath('//div[@class="tables"]/div') - result_urls = [] - for div in divs: - try: - status = div.xpath('./div[3]/div/text()')[0] - if "暂时失效" not in status: - ip = div.xpath('./div[1]/a/b/text()')[0].strip() - if via_tonking(ip): - result_urls.append(f'http://{ip}') - except (IndexError, ValueError): - continue - return result_urls + try: + resp = requests.post( + "http://tonkiang.us/hoteliptv.php", + headers=header, + data=data, + timeout=10 + ) + resp.raise_for_status() + resp.encoding = 'utf-8' + et = etree.HTML(resp.text) + divs = et.xpath('//div[@class="tables"]/div') + result_urls = [] + for div in divs: + try: + status = div.xpath('./div[3]/div/text()')[0] + if "暂时失效" not in status: + ip = div.xpath('./div[1]/a/b/text()')[0].strip() + if via_tonking(ip): + result_urls.append(f'http://{ip}') + except (IndexError, ValueError): + continue + return result_urls + except Exception as e: + print(f"获取IP时发生错误: {e}") + return [] # 生成文件 def gen_files(valid_ips, province, isp): + # 生成节目列表 省份运营商.txt + index = 0 + print(valid_ips) udp_filename = f'rtp/{province}_{isp}.txt' + with open(udp_filename, 'r', encoding='utf-8') as file: + data = file.read() txt_filename = f'playlist/{province}{isp}.txt' - try: - with open(udp_filename, 'r', encoding='utf-8') as file: - data = file.read() - with open(txt_filename, 'a', encoding='utf-8') as new_file: # 修改为'a'以追加文件 - for url in valid_ips: - new_data = data.replace("rtp://", f"{url}/rtp/") + with open(txt_filename, 'a', encoding='utf-8') as new_file: + new_file.write(f'{province}{isp},#genre#\n') + for url in valid_ips: + if index < 3: + new_data = data.replace("rtp://", f"{url[0]}/rtp/") new_file.write(new_data) - except FileNotFoundError: - print(f"文件 '{udp_filename}' 不存在.") - except Exception as e: - print(f"生成文件时发生错误: {e}") + new_file.write('\n') + index += 1 + else: + continue + print(f'已生成播放列表,保存至{txt_filename}') # 遍历rtp文件夹中的所有文件 rtp_folder = 'rtp' playlist_folder = 'playlist' +# 确保playlist目录存在 +os.makedirs(playlist_folder, exist_ok=True) + for filename in os.listdir(rtp_folder): - if filename.endswith(".txt") and "_" in filename: - province, isp = filename[:-4].split("_") # 假设文件名格式为"省份_运营商.txt" - key_word = f"{province}{isp}" # 构造关键词 - valid_ips = get_tonkiang(key_word) # 搜索有效IP + if filename.endswith(".txt"): + province_isp = filename[:-4] # 获取不包含扩展名的文件名 + keyword = province_isp.replace('_', '') # 假设文件名格式为"省份_运营商" + valid_ips = get_tonkiang(keyword) # 搜索有效IP if valid_ips: - print(f"找到有效IP,正在生成播放列表: {province}{isp}") - gen_files(valid_ips, province, isp) # 生成播放列表文件 + print(f"找到有效IP,正在生成文本文件: {provinceisp}") + gen_txt_files(valid_ips, province_isp.split('_')[0], province_isp.split('_')[1]) # 生成文本文件 else: - print(f"未找到有效IP: {province} {isp}") + print(f"未找到有效IP: {province_isp}")