# -*- coding: utf-8 -*- import base64 import sys import time import json import requests import re from datetime import datetime, timedelta sys.path.append('..') from base.spider import Spider class Spider(Spider): # 类级别的缓存(所有实例共享) _cache_data = None _cache_time = None _cache_ttl = 3600 # 1小时 def getName(self): return "Litv" def init(self, extend): self.extend = extend try: self.extendDict = json.loads(extend) except: self.extendDict = {} proxy = self.extendDict.get('proxy', None) if proxy is None: self.is_proxy = False else: self.proxy = proxy self.is_proxy = True pass def getDependence(self): return [] def isVideoFormat(self, url): pass def manualVideoCheck(self): pass def _get_cached_data(self): """获取缓存数据(自动清空过期缓存)""" if Spider._cache_data is None or Spider._cache_time is None: return None elapsed = (datetime.now() - Spider._cache_time).total_seconds() if elapsed < Spider._cache_ttl: return Spider._cache_data # 缓存过期,主动清空 Spider._cache_data = None Spider._cache_time = None return None def _set_cached_data(self, data): """设置缓存数据""" Spider._cache_data = data Spider._cache_time = datetime.now() def _fetch_live_data(self): """获取直播数据(带内存缓存)""" # 尝试从缓存获取 cached_data = self._get_cached_data() if cached_data is not None: return cached_data # 缓存失效,请求新数据 starttime = datetime.now().strftime("%Y-%m-%d") api_url = f"https://kzb29rda.com/prod-api/match/list/new?isfanye=1&type=0&cid=0&ishot=1&pn=1&ps=50&level=&name=&langtype=zh&starttime={starttime}&pid=4&zoneId=Asia%2FShanghai&zhuboType=1" try: # 创建会话 session = requests.Session() # 第一次请求(会返回跳转页面) headers_first = { "User-Agent": "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Connection": "keep-alive" } response = session.get(api_url, headers=headers_first, timeout=15) # 提取跳转地址 target_url = None # 方法1: location.replace match = re.search(r'location\.replace\("([^"]+)"\)', response.text) if match: target_url = match.group(1) # 方法2: meta refresh if not target_url: match = re.search(r'url=([^"\'\s>]+)', response.text) if match: target_url = match.group(1) # 如果找到跳转地址,则请求真实数据 if target_url: # 清理URL target_url = target_url.replace('&', '&') # 延迟一下 time.sleep(1) # 请求真实数据 headers_data = { "User-Agent": headers_first["User-Agent"], "Referer": api_url, "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Connection": "keep-alive", "X-Requested-With": "XMLHttpRequest" } data_response = session.get(target_url, headers=headers_data, timeout=15) data_response.raise_for_status() # 处理可能的BOM text = data_response.text.strip() if text.startswith('\ufeff'): text = text[1:] data = json.loads(text) else: # 如果没有跳转,尝试直接解析 response.raise_for_status() text = response.text.strip() if text.startswith('\ufeff'): text = text[1:] data = json.loads(text) # 存入缓存 self._set_cached_data(data) return data except: return None def _parse_live_data(self, data): """解析直播数据为M3U格式""" m3u_content = ['#EXTM3U'] if not data: m3u_content.append('# 错误:无法获取直播数据') return '\n'.join(m3u_content) try: # 处理 topList for match in data.get("data", {}).get("topList", []): hteam = match.get("hteam_name", "Unknown Home") ateam = match.get("ateam_name", "Unknown Away") name = match.get("name", "Unnamed Match") matchtime = match.get("matchtime", "Unknown Time") status = match.get("status_up_name", "Unknown Status") for url_info in match.get("live_urls", []): url = url_info.get("url", "") if url: extinf = f'#EXTINF:-1 tvg-name="{name}({hteam}-{ateam}){status}{matchtime}" group-title="{name}",({hteam}-{ateam}){status}{matchtime}' m3u_content.extend([extinf, url]) # 处理 dataList for match in data.get("data", {}).get("dataList", []): hteam = match.get("hteam_name", "Unknown Home") ateam = match.get("ateam_name", "Unknown Away") name = match.get("name", "Unnamed Match") matchtime = match.get("matchtime", "Unknown Time") status = match.get("status_up_name", "Unknown Status") for url_info in match.get("live_urls", []): url = url_info.get("url", "") if url: extinf = f'#EXTINF:-1 tvg-name="{name}({hteam}-{ateam}){status}{matchtime}" group-title="{name}",({hteam}-{ateam}){status}{matchtime}' m3u_content.extend([extinf, url]) except: m3u_content.append('# 错误:数据处理异常') return '\n'.join(m3u_content) def liveContent(self, url): """主入口方法,返回M3U内容""" live_data = self._fetch_live_data() m3u_content = self._parse_live_data(live_data) return m3u_content def homeContent(self, filter): return {} def homeVideoContent(self): return {} def categoryContent(self, cid, page, filter, ext): return {} def detailContent(self, did): return {} def searchContent(self, key, quick, page='1'): return {} def searchContentPage(self, keywords, quick, page): return {} def playerContent(self, flag, pid, vipFlags): return {} def localProxy(self, params): if params['type'] == "m3u8": return self.proxyM3u8(params) if params['type'] == "ts": return self.get_ts(params) return [302, "text/plain", None, {'Location': 'https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/mp4/xgplayer-demo-720p.mp4'}] def proxyM3u8(self, params): pid = params['pid'] info = pid.split(',') a = info[0] b = info[1] c = info[2] timestamp = int(time.time() / 4 - 355017625) t = timestamp * 4 m3u8_text = f'#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:{timestamp}\n' for i in range(10): url = f'https://ntd-tgc.cdn.hinet.net/live/pool/{a}/litv-pc/{a}-avc1_6000000={b}-mp4a_134000_zho={c}-begin={t}0000000-dur=40000000-seq={timestamp}.ts' if self.is_proxy: url = f'http://127.0.0.1:9978/proxy?do=py&type=ts&url={self.b64encode(url)}' m3u8_text += f'#EXTINF:4,\n{url}\n' timestamp += 1 t += 4 return [200, "application/vnd.apple.mpegurl", m3u8_text] def get_ts(self, params): url = self.b64decode(params['url']) headers = {'User-Agent': 'Mozilla/5.0'} if self.is_proxy: response = requests.get(url, headers=headers, stream=True, proxies=self.proxy) else: response = requests.get(url, headers=headers, stream=True) return [206, "application/octet-stream", response.content] def destroy(self): return '正在Destroy' def b64encode(self, data): return base64.b64encode(data.encode('utf-8')).decode('utf-8') def b64decode(self, data): return base64.b64decode(data.encode('utf-8')).decode('utf-8') if __name__ == '__main__': pass