上传文件至「/」

This commit is contained in:
2026-03-24 06:47:45 +01:00
parent 49197a7972
commit 7e161af3d4
+190
View File
@@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
import base64
import sys
import time
import json
import requests
import re
from datetime import datetime, timedelta
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "山东电视台"
def init(self, extend):
self.extend = extend
try:
self.extendDict = json.loads(extend)
except:
self.extendDict = {}
proxy = self.extendDict.get('proxy', None)
if proxy is None:
self.is_proxy = False
else:
self.proxy = proxy
self.is_proxy = True
pass
def getDependence(self):
return []
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def _get_stream_by_orgid(self, orgid):
"""根据orgid获取该电视台的所有频道流"""
api_url = f"https://app.litenews.cn/v1/app/play/tv/live?_orgid_={orgid}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://app.litenews.cn/',
'Accept': 'application/json, text/plain, */*'
}
try:
if self.is_proxy:
response = requests.get(api_url, headers=headers, timeout=10, proxies=self.proxy)
else:
response = requests.get(api_url, headers=headers, timeout=10)
if response.status_code != 200:
return []
text = response.text.strip()
if text.startswith('\ufeff'):
text = text[1:]
data = json.loads(text)
# 检查返回码
if data.get("code") != 1:
return []
# 返回data数组
return data.get("data", [])
except Exception:
return []
def _fetch_all_channels(self):
"""获取所有山东电视台频道数据"""
# 从之前测试成功的orgid列表
orgid_list = [
171, 303, 85, 261, 257, 97, 195, 537, 29, 163, 237, 269, 153,
403, 227, 221, 305, 173, 295, 253, 45, 1, 71, 133, 125, 175,
189, 255, 57, 239, 109, 165, 55, 17, 61, 23, 203, 75, 101,
243, 233, 103, 37, 207, 211, 169, 277, 11, 193, 19, 33, 235,
27, 135, 273, 13, 73, 117, 231, 31, 265, 65, 183, 81, 191,
113, 201, 167, 147, 145, 159, 289, 187, 293, 51, 59, 213, 77, 91
]
all_channels = []
for orgid in orgid_list:
channels = self._get_stream_by_orgid(orgid)
if channels:
all_channels.extend(channels)
time.sleep(0.1)
return all_channels
def liveContent(self, url):
"""主入口方法,返回M3U内容"""
# 获取所有频道数据
channels = self._fetch_all_channels()
if not channels:
return "#EXTM3U\n# 错误:无法获取直播数据"
# 生成M3U内容(严格按照模板格式)
m3u_content = ['#EXTM3U']
for channel in channels:
channel_name = channel.get("name", "未知频道")
channel_url = channel.get("stream", "")
if channel_url:
# 严格按照模板格式:EXTINF行 + URL行
# 格式:#EXTINF:-1 tvg-name="频道名称" group-title="山东电视台",频道名称
extinf = f'#EXTINF:-1 tvg-name="{channel_name}" group-title="山东电视台",{channel_name}'
m3u_content.extend([extinf, channel_url])
return '\n'.join(m3u_content)
def homeContent(self, filter):
return {}
def homeVideoContent(self):
return {}
def categoryContent(self, cid, page, filter, ext):
return {}
def detailContent(self, did):
return {}
def searchContent(self, key, quick, page='1'):
return {}
def searchContentPage(self, keywords, quick, page):
return {}
def playerContent(self, flag, pid, vipFlags):
return {}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
if params['type'] == "ts":
return self.get_ts(params)
return [302, "text/plain", None, {'Location': 'https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/mp4/xgplayer-demo-720p.mp4'}]
def proxyM3u8(self, params):
pid = params['pid']
info = pid.split(',')
if len(info) >= 3:
a = info[0]
b = info[1]
c = info[2]
timestamp = int(time.time() / 4 - 355017625)
t = timestamp * 4
m3u8_text = f'#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:{timestamp}\n'
for i in range(10):
url = f'https://ntd-tgc.cdn.hinet.net/live/pool/{a}/litv-pc/{a}-avc1_6000000={b}-mp4a_134000_zho={c}-begin={t}0000000-dur=40000000-seq={timestamp}.ts'
if self.is_proxy:
url = f'http://127.0.0.1:9978/proxy?do=py&type=ts&url={self.b64encode(url)}'
m3u8_text += f'#EXTINF:4,\n{url}\n'
timestamp += 1
t += 4
return [200, "application/vnd.apple.mpegurl", m3u8_text]
return [200, "application/vnd.apple.mpegurl", "#EXTM3U\n# 错误:无效的代理参数"]
def get_ts(self, params):
url = self.b64decode(params['url'])
headers = {'User-Agent': 'Mozilla/5.0'}
if self.is_proxy:
response = requests.get(url, headers=headers, stream=True, proxies=self.proxy)
else:
response = requests.get(url, headers=headers, stream=True)
return [206, "application/octet-stream", response.content]
def destroy(self):
return '正在Destroy'
def b64encode(self, data):
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def b64decode(self, data):
return base64.b64decode(data.encode('utf-8')).decode('utf-8')
if __name__ == '__main__':
pass