Files
gyj07 a794bbcfa1 1
2026-05-21 16:39:48 +08:00

400 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import base64
import sys
import time
import json
import requests
import re
from datetime import datetime, timedelta
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
# 类级别的缓存(所有实例共享)
_cache_data = None
_cache_time = None
_cache_ttl = 3600 # 1小时
# 央视频道顺序定义
CCTV_ORDER = {
'cctv1': 1, 'cctv2': 2, 'cctv3': 3, 'cctv4': 4, 'cctv5': 5,
'cctv5plus': 6, 'cctv6': 7, 'cctv7': 8, 'cctv8': 9, 'cctv9': 10,
'cctv10': 11, 'cctv11': 12, 'cctv12': 13, 'cctv13': 14, 'cctv14': 15,
'cctv15': 16, 'cctv16': 17, 'cctv17': 18
}
# 卫视频道顺序定义
WEISHI_ORDER = {
'湖南卫视': 1, '东方卫视': 2, '浙江卫视': 3, '江苏卫视': 4, '北京卫视': 5,
'深圳卫视': 6, '广东卫视': 7, '安徽卫视': 8, '湖北卫视': 9, '山东卫视': 10,
'江西卫视': 11, '海南卫视': 12, '黑龙江卫视': 13, '东南卫视': 14, '河北卫视': 15,
'天津卫视': 16, '重庆卫视': 17, '辽宁卫视': 18, '四川卫视': 19, '河南卫视': 20,
'广西卫视': 21, '吉林卫视': 22, '山西卫视': 23, '陕西卫视': 24, '云南卫视': 25,
'贵州卫视': 26, '甘肃卫视': 27, '宁夏卫视': 28, '青海卫视': 29, '内蒙古卫视': 30,
'新疆卫视': 31, '西藏卫视': 32, '厦门卫视': 33, '金鹰卡通': 34, '卡酷少儿': 35,
'嘉佳卡通': 36, '优漫卡通': 37
}
def getName(self):
return "IPTV"
def init(self, extend):
self.extend = extend
try:
self.extendDict = json.loads(extend)
except:
self.extendDict = {}
proxy = self.extendDict.get('proxy', None)
if proxy is None:
self.is_proxy = False
else:
self.proxy = proxy
self.is_proxy = True
pass
def getDependence(self):
return []
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def _get_cached_data(self):
"""获取缓存数据"""
# 检查缓存是否存在且未过期
if (Spider._cache_data is not None and
Spider._cache_time is not None):
elapsed = (datetime.now() - Spider._cache_time).total_seconds()
if elapsed < Spider._cache_ttl:
print(f"使用内存缓存(剩余有效期: {Spider._cache_ttl - elapsed:.0f}秒)")
return Spider._cache_data
print("缓存失效,重新请求API")
return None
def _set_cached_data(self, data):
"""设置缓存数据"""
Spider._cache_data = data
Spider._cache_time = datetime.now()
print(f"已缓存数据,有效期: {Spider._cache_ttl}")
def _is_cctv_channel(self, channel_name, channel_code):
"""判断是否为央视频道"""
# 通过频道代码判断
if channel_code and channel_code.lower().startswith('cctv'):
return True
# 通过频道名称判断
if channel_name and channel_name.startswith('CCTV'):
return True
return False
def _get_cctv_order(self, channel_name, channel_code):
"""获取央视频道排序值"""
# 先通过频道代码获取
if channel_code:
code_lower = channel_code.lower()
if code_lower in self.CCTV_ORDER:
return self.CCTV_ORDER[code_lower]
# 通过频道名称获取
if channel_name:
# 处理如 "CCTV1"、"CCTV-1"、"CCTV1综合" 等格式
match = re.search(r'CCTV[-]?(\d+)', channel_name, re.IGNORECASE)
if match:
num = int(match.group(1))
if 1 <= num <= 18:
return num
# 特殊处理 CCTV5+
if 'CCTV5+' in channel_name or 'CCTV5plus' in channel_name:
return 6
return 999 # 无法识别的央视频道放在最后
def _get_weishi_order(self, channel_name):
"""获取卫视频道排序值"""
# 精确匹配
if channel_name in self.WEISHI_ORDER:
return self.WEISHI_ORDER[channel_name]
# 模糊匹配(去除"卫视"、"TV"等后缀)
clean_name = channel_name.replace('卫视', '').replace('TV', '').strip()
for key in self.WEISHI_ORDER:
if key.startswith(clean_name) or clean_name in key:
return self.WEISHI_ORDER[key]
return 999 # 无法识别的卫视放在最后
def _fetch_iptv_data(self):
"""获取IPTV数据(带内存缓存)"""
# 尝试从缓存获取
cached_data = self._get_cached_data()
if cached_data is not None:
return cached_data
# 缓存失效,请求新数据
api_url = "https://kzb29rda.com/prod-api/iptv/getIptvList?liveType=0&deviceType=1"
try:
# 创建会话
session = requests.Session()
# 第一次请求(会返回跳转页面)
headers_first = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive"
}
response = session.get(api_url, headers=headers_first, timeout=15)
# 提取跳转地址
target_url = None
# 方法1: location.replace
match = re.search(r'location\.replace\("([^"]+)"\)', response.text)
if match:
target_url = match.group(1)
# 方法2: meta refresh
if not target_url:
match = re.search(r'url=([^"\'\s>]+)', response.text)
if match:
target_url = match.group(1)
# 如果找到跳转地址,则请求真实数据
if target_url:
# 清理URL
target_url = target_url.replace('&amp;', '&')
# 延迟一下
time.sleep(1)
# 请求真实数据
headers_data = {
"User-Agent": headers_first["User-Agent"],
"Referer": api_url,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"X-Requested-With": "XMLHttpRequest"
}
data_response = session.get(target_url, headers=headers_data, timeout=15)
data_response.raise_for_status()
# 处理可能的BOM
text = data_response.text.strip()
if text.startswith('\ufeff'):
text = text[1:]
data = json.loads(text)
else:
# 如果没有跳转,尝试直接解析
response.raise_for_status()
text = response.text.strip()
if text.startswith('\ufeff'):
text = text[1:]
data = json.loads(text)
# 存入缓存
self._set_cached_data(data)
return data
except requests.exceptions.RequestException as e:
print(f"网络请求异常: {str(e)}")
return None
except json.JSONDecodeError as e:
print(f"JSON解析错误: {str(e)}")
return None
except Exception as e:
print(f"未知错误: {str(e)}")
return None
def _sort_channels(self, channels):
"""对频道进行排序:央视按顺序,卫视按顺序,其他放最后"""
cctv_channels = []
weishi_channels = []
other_channels = []
for channel in channels:
channel_name = channel.get("play_source_name", "")
channel_code = channel.get("play_source_code", "")
if self._is_cctv_channel(channel_name, channel_code):
# 计算央视频道排序值
order = self._get_cctv_order(channel_name, channel_code)
cctv_channels.append((order, channel))
elif '卫视' in channel_name or 'TV' in channel_name:
# 计算卫视频道排序值
order = self._get_weishi_order(channel_name)
weishi_channels.append((order, channel))
else:
# 其他频道
other_channels.append((999, channel))
# 按排序值排序
cctv_channels.sort(key=lambda x: x[0])
weishi_channels.sort(key=lambda x: x[0])
other_channels.sort(key=lambda x: x[0])
# 合并结果
result = []
result.extend([ch for _, ch in cctv_channels])
result.extend([ch for _, ch in weishi_channels])
result.extend([ch for _, ch in other_channels])
return result
def _parse_iptv_data(self, data):
"""解析IPTV数据为M3U格式"""
m3u_content = ['#EXTM3U']
if not data:
m3u_content.append('# 错误:无法获取直播数据')
return '\n'.join(m3u_content)
try:
# 检查响应状态
if data.get("code") != "0":
m3u_content.append(f'# 错误:{data.get("msg", "未知错误")}')
return '\n'.join(m3u_content)
# 获取频道列表
channel_list = data.get("list", [])
if not channel_list:
m3u_content.append('# 错误:没有获取到频道数据')
return '\n'.join(m3u_content)
# 对频道进行排序
sorted_channels = self._sort_channels(channel_list)
# 添加分组注释
m3u_content.append('# 央视频道')
cctv_count = 0
weishi_count = 0
# 遍历排序后的频道列表
for channel in sorted_channels:
channel_name = channel.get("play_source_name", "未知频道")
channel_url = channel.get("play_source_url", "")
channel_code = channel.get("play_source_code", "")
if channel_url:
# 检测分组变化
if self._is_cctv_channel(channel_name, channel_code):
if weishi_count == 0 and cctv_count > 0:
# 已经在央视分组中,不需要添加注释
pass
cctv_count += 1
elif '卫视' in channel_name or 'TV' in channel_name:
if cctv_count > 0 and weishi_count == 0:
# 从央视切换到卫视,添加分组注释
m3u_content.append('\n# 卫视频道')
weishi_count += 1
else:
if (cctv_count > 0 or weishi_count > 0) and (cctv_count + weishi_count) == len([ch for ch in sorted_channels if self._is_cctv_channel(ch.get("play_source_name", ""), ch.get("play_source_code", "")) or ('卫视' in ch.get("play_source_name", "") or 'TV' in ch.get("play_source_name", ""))]):
# 从央视/卫视切换到其他频道
m3u_content.append('\n# 其他频道')
# 构建EXTINF行
tvg_name = channel_code if channel_code else channel_name
extinf = f'#EXTINF:-1 tvg-id="{channel_code}" tvg-name="{tvg_name}" group-title="直播频道",{channel_name}'
m3u_content.extend([extinf, channel_url])
print(f"成功解析 {len(channel_list)} 个频道(央视: {cctv_count},卫视: {weishi_count},其他: {len(channel_list) - cctv_count - weishi_count}")
except Exception as e:
print(f"解析数据失败: {str(e)}")
m3u_content.append('# 错误:数据处理异常')
return '\n'.join(m3u_content)
def liveContent(self, url):
"""主入口方法,返回M3U内容"""
# 获取IPTV数据(带内存缓存)
iptv_data = self._fetch_iptv_data()
# 解析数据为M3U格式
m3u_content = self._parse_iptv_data(iptv_data)
return m3u_content
def homeContent(self, filter):
return {}
def homeVideoContent(self):
return {}
def categoryContent(self, cid, page, filter, ext):
return {}
def detailContent(self, did):
return {}
def searchContent(self, key, quick, page='1'):
return {}
def searchContentPage(self, keywords, quick, page):
return {}
def playerContent(self, flag, pid, vipFlags):
return {}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
if params['type'] == "ts":
return self.get_ts(params)
return [302, "text/plain", None, {'Location': 'https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/mp4/xgplayer-demo-720p.mp4'}]
def proxyM3u8(self, params):
# 这个方法保留但可能不会被使用,因为IPTV的URL直接是m3u8地址
pid = params['pid']
info = pid.split(',')
if len(info) >= 3:
a = info[0]
b = info[1]
c = info[2]
timestamp = int(time.time() / 4 - 355017625)
t = timestamp * 4
m3u8_text = f'#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:{timestamp}\n'
for i in range(10):
url = f'https://ntd-tgc.cdn.hinet.net/live/pool/{a}/litv-pc/{a}-avc1_6000000={b}-mp4a_134000_zho={c}-begin={t}0000000-dur=40000000-seq={timestamp}.ts'
if self.is_proxy:
url = f'http://127.0.0.1:9978/proxy?do=py&type=ts&url={self.b64encode(url)}'
m3u8_text += f'#EXTINF:4,\n{url}\n'
timestamp += 1
t += 4
return [200, "application/vnd.apple.mpegurl", m3u8_text]
return [200, "application/vnd.apple.mpegurl", "#EXTM3U\n# 错误:无效的代理参数"]
def get_ts(self, params):
url = self.b64decode(params['url'])
headers = {'User-Agent': 'Mozilla/5.0'}
if self.is_proxy:
response = requests.get(url, headers=headers, stream=True, proxies=self.proxy)
else:
response = requests.get(url, headers=headers, stream=True)
return [206, "application/octet-stream", response.content]
def destroy(self):
return '正在Destroy'
def b64encode(self, data):
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def b64decode(self, data):
return base64.b64decode(data.encode('utf-8')).decode('utf-8')
if __name__ == '__main__':
pass