上传文件至 XYQTVBox/lib

This commit is contained in:
2025-06-21 09:02:32 +00:00
parent 76e7ad9098
commit f878679ad9
4 changed files with 698 additions and 0 deletions
+155
View File
@@ -0,0 +1,155 @@
import base64
import sys
import time
import json
import requests
import re
sys.path.append('..')
from base.spider import Spider
from bs4 import BeautifulSoup
class Spider(Spider):
def getName(self):
return "Litv"
def init(self, extend):
self.extend = extend
try:
self.extendDict = json.loads(extend)
except:
self.extendDict = {}
proxy = self.extendDict.get('proxy', None)
if proxy is None:
self.is_proxy = False
else:
self.proxy = proxy
self.is_proxy = True
pass
def getDependence(self):
return []
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def natural_sort_key(self, s):
"""
自然排序辅助函数
"""
return [
int(part) if part.isdigit() else part.lower()
for part in re.split(r'(\d+)', s)
]
def liveContent(self, url):
m3u_content = ['#EXTM3U']
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
'Referer': 'https://360pai.xyz/',
}
try:
response = requests.get('https://360pai.xyz/live', headers=headers, timeout=10)
response.raise_for_status()
res = response.text
soup = BeautifulSoup(res, 'html.parser')
result = []
cards = soup.select('div.anchor-grid_anchor-card-wrap__NR9Ov')
for card in cards:
a_tag = card.find('a', class_='anchor-grid_anchor-card__nJf0J')
if not a_tag:
continue
href = a_tag.get('href', '')
live_id_match = re.search(r'/live/([^/]+)', href)
if not live_id_match:
continue
live_id = live_id_match.group(1)
title_div = a_tag.find('div', class_='anchor-grid_anchor-avatar-title__5hTsp')
title = title_div.get_text(strip=True) if title_div else ''
title = title.replace(" vs ","vs")
if not title:
continue
extinf = f'#EXTINF:-1 tvg-name="{title}" group-title="360pai",{title}'
ch_url = f"video://https://360pai.xyz/live/{live_id}"
m3u_content.extend([extinf, ch_url])
except requests.exceptions.RequestException as e:
print(f"网络请求失败: {e}")
except Exception as e:
print(f"处理过程中发生错误: {e}")
return '\n'.join(m3u_content)
def homeContent(self, filter):
return {}
def homeVideoContent(self):
return {}
def categoryContent(self, cid, page, filter, ext):
return {}
def detailContent(self, did):
return {}
def searchContent(self, key, quick, page='1'):
return {}
def searchContentPage(self, keywords, quick, page):
return {}
def playerContent(self, flag, pid, vipFlags):
return {}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
if params['type'] == "ts":
return self.get_ts(params)
return [302, "text/plain", None, {'Location': 'https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/mp4/xgplayer-demo-720p.mp4'}]
def proxyM3u8(self, params):
pid = params['pid']
info = pid.split(',')
a = info[0]
b = info[1]
c = info[2]
timestamp = int(time.time() / 4 - 355017625)
t = timestamp * 4
m3u8_text = f'#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:{timestamp}\n'
for i in range(10):
url = f'https://ntd-tgc.cdn.hinet.net/live/pool/{a}/litv-pc/{a}-avc1_6000000={b}-mp4a_134000_zho={c}-begin={t}0000000-dur=40000000-seq={timestamp}.ts'
if self.is_proxy:
url = f'http://127.0.0.1:9978/proxy?do=py&type=ts&url={self.b64encode(url)}'
m3u8_text += f'#EXTINF:4,\n{url}\n'
timestamp += 1
t += 4
return [200, "application/vnd.apple.mpegurl", m3u8_text]
def get_ts(self, params):
url = self.b64decode(params['url'])
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers, stream=True, proxies=self.proxy)
return [206, "application/octet-stream", response.content]
def destroy(self):
return '正在Destroy'
def b64encode(self, data):
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def b64decode(self, data):
return base64.b64decode(data.encode('utf-8')).decode('utf-8')
if __name__ == '__main__':
pass
+178
View File
@@ -0,0 +1,178 @@
# -*- coding: utf-8 -*-
# @Author : Doubebly
# @Time : 2025/3/23 21:55
import base64
import sys
import time
import json
import requests
import re
from datetime import datetime
sys.path.append('..')
from base.spider import Spider
from bs4 import BeautifulSoup
class Spider(Spider):
def getName(self):
return "Litv"
def init(self, extend):
self.extend = extend
try:
self.extendDict = json.loads(extend)
except:
self.extendDict = {}
proxy = self.extendDict.get('proxy', None)
if proxy is None:
self.is_proxy = False
else:
self.proxy = proxy
self.is_proxy = True
pass
def getDependence(self):
return []
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def liveContent(self, url):
cookies = {
'_ga': 'GA1.1.782627561.1745936696',
'_oredge_rl': 'u1ORyxILkguVwZ3LWQNJTpqceYPzrL/Cgugwu74xDwA=',
'dailyMessageShown515': 'shown',
'_ga_F2ET4TBC70': 'GS2.1.s1747232930$o3$g1$t1747233259$j0$l0$h0',
}
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'max-age=0',
'priority': 'u=0, i',
'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
# 'cookie': '_ga=GA1.1.782627561.1745936696; _oredge_rl=u1ORyxILkguVwZ3LWQNJTpqceYPzrL/Cgugwu74xDwA=; dailyMessageShown515=shown; _ga_F2ET4TBC70=GS2.1.s1747232930$o3$g1$t1747233259$j0$l0$h0',
}
response = requests.get('https://www.515001.tv/', cookies=cookies, headers=headers)
#print(response.text) #自己添加,表示输出响应的内容
html = response.text
soup = BeautifulSoup(html, 'html.parser')
matches = []
for a in soup.find_all('a', class_='clearfix'):
# 提取链接
link = a.get('href', '')
link = f"video://{link}"
# 提取赛事名称和时间
event_time_p = a.find('p', class_='eventtime_wuy')
if event_time_p:
em = event_time_p.find('em')
event_name = em.get_text(strip=True) if em else ''
i_tag = event_time_p.find('i')
time = i_tag.get_text(strip=True) if i_tag else ''
else:
event_name = ''
time = ''
# 提取主队名称
home_div = a.find('div', class_=lambda c: c and 'zhudui' in c)
home_team = home_div.find('p').get_text(strip=True) if home_div else ''
# 提取客队名称
away_div = a.find('div', class_='kedui')
away_team = away_div.find('p').get_text(strip=True) if away_div else ''
if event_name and home_team and away_team and time and link:
match_str = f"[{event_name}]{home_team} VS {away_team}{time},{link}"
matches.append(match_str)
m3u_content = ['#EXTM3U']
for match in matches:
title = match.split(",")[0]
ch_url = match.split(",")[1]
extinf = f'#EXTINF:-1 tvg-name="{title}" group-title="515001",{title}'
m3u_content.extend([extinf, ch_url])
return '\n'.join(m3u_content)
def homeContent(self, filter):
return {}
def homeVideoContent(self):
return {}
def categoryContent(self, cid, page, filter, ext):
return {}
def detailContent(self, did):
return {}
def searchContent(self, key, quick, page='1'):
return {}
def searchContentPage(self, keywords, quick, page):
return {}
def playerContent(self, flag, pid, vipFlags):
return {}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
if params['type'] == "ts":
return self.get_ts(params)
return [302, "text/plain", None, {'Location': 'https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/mp4/xgplayer-demo-720p.mp4'}]
def proxyM3u8(self, params):
pid = params['pid']
info = pid.split(',')
a = info[0]
b = info[1]
c = info[2]
timestamp = int(time.time() / 4 - 355017625)
t = timestamp * 4
m3u8_text = f'#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:{timestamp}\n'
for i in range(10):
url = f'https://ntd-tgc.cdn.hinet.net/live/pool/{a}/litv-pc/{a}-avc1_6000000={b}-mp4a_134000_zho={c}-begin={t}0000000-dur=40000000-seq={timestamp}.ts'
if self.is_proxy:
url = f'http://127.0.0.1:9978/proxy?do=py&type=ts&url={self.b64encode(url)}'
m3u8_text += f'#EXTINF:4,\n{url}\n'
timestamp += 1
t += 4
return [200, "application/vnd.apple.mpegurl", m3u8_text]
def get_ts(self, params):
url = self.b64decode(params['url'])
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers, stream=True, proxies=self.proxy)
return [206, "application/octet-stream", response.content]
def destroy(self):
return '正在Destroy'
def b64encode(self, data):
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def b64decode(self, data):
return base64.b64decode(data.encode('utf-8')).decode('utf-8')
if __name__ == '__main__':
pass
+164
View File
@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
# @Author : Doubebly
# @Time : 2025/3/23 21:55
import base64
import sys
import time
import json
import requests
import re
from datetime import datetime
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "Litv"
def init(self, extend):
self.extend = extend
try:
self.extendDict = json.loads(extend)
except:
self.extendDict = {}
proxy = self.extendDict.get('proxy', None)
if proxy is None:
self.is_proxy = False
else:
self.proxy = proxy
self.is_proxy = True
pass
def getDependence(self):
return []
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def liveContent(self, url):
m3u_content = ['#EXTM3U']
try:
starttime = datetime.now().strftime("%Y-%m-%d")
api_url = f"https://kzb29rda.com/prod-api/match/list/new?isfanye=1&type=0&cid=0&ishot=1&pn=1&ps=50&level=&name=&langtype=zh&starttime={starttime}&pid=4&zoneId=Asia%2FShanghai&zhuboType=1"
response = requests.get(api_url, timeout=10)
response.raise_for_status()
data = response.json()
for match in data.get("data", {}).get("topList", []):
hteam = match.get("hteam_name", "Unknown Home")
ateam = match.get("ateam_name", "Unknown Away")
name = match.get("name", "Unnamed Match")
matchtime = match.get("matchtime", "Unknown Time")
status = match.get("status_up_name", "Unknown Status")
for url_info in match.get("live_urls", []):
url = url_info.get("url", "")
if url:
extinf = f'#EXTINF:-1 tvg-name="{name}({hteam}-{ateam}){status}{matchtime}" group-title="{name}",({hteam}-{ateam}){status}{matchtime}'
m3u_content.extend([extinf, url])
for match in data.get("data", {}).get("dataList", []):
hteam = match.get("hteam_name", "Unknown Home")
ateam = match.get("ateam_name", "Unknown Away")
name = match.get("name", "Unnamed Match")
matchtime = match.get("matchtime", "Unknown Time")
status = match.get("status_up_name", "Unknown Status")
for url_info in match.get("live_urls", []):
url = url_info.get("url", "")
if url:
extinf = f'#EXTINF:-1 tvg-name="{name}({hteam}-{ateam}){status}{matchtime}" group-title="{name}",({hteam}-{ateam}){status}{matchtime}'
m3u_content.extend([extinf, url])
except requests.exceptions.RequestException as e:
print(f"网络请求异常: {str(e)}")
m3u_content.append('# 错误:无法获取直播数据')
except json.JSONDecodeError:
print("响应内容不是有效的JSON")
m3u_content.append('# 错误:无效的API响应格式')
except Exception as e:
print(f"未知错误: {str(e)}")
m3u_content.append('# 错误:数据处理异常')
return '\n'.join(m3u_content)
def homeContent(self, filter):
return {}
def homeVideoContent(self):
return {}
def categoryContent(self, cid, page, filter, ext):
return {}
def detailContent(self, did):
return {}
def searchContent(self, key, quick, page='1'):
return {}
def searchContentPage(self, keywords, quick, page):
return {}
def playerContent(self, flag, pid, vipFlags):
return {}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
if params['type'] == "ts":
return self.get_ts(params)
return [302, "text/plain", None, {'Location': 'https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/mp4/xgplayer-demo-720p.mp4'}]
def proxyM3u8(self, params):
pid = params['pid']
info = pid.split(',')
a = info[0]
b = info[1]
c = info[2]
timestamp = int(time.time() / 4 - 355017625)
t = timestamp * 4
m3u8_text = f'#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:{timestamp}\n'
for i in range(10):
url = f'https://ntd-tgc.cdn.hinet.net/live/pool/{a}/litv-pc/{a}-avc1_6000000={b}-mp4a_134000_zho={c}-begin={t}0000000-dur=40000000-seq={timestamp}.ts'
if self.is_proxy:
url = f'http://127.0.0.1:9978/proxy?do=py&type=ts&url={self.b64encode(url)}'
m3u8_text += f'#EXTINF:4,\n{url}\n'
timestamp += 1
t += 4
return [200, "application/vnd.apple.mpegurl", m3u8_text]
def get_ts(self, params):
url = self.b64decode(params['url'])
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers, stream=True, proxies=self.proxy)
return [206, "application/octet-stream", response.content]
def destroy(self):
return '正在Destroy'
def b64encode(self, data):
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def b64decode(self, data):
return base64.b64decode(data.encode('utf-8')).decode('utf-8')
if __name__ == '__main__':
pass
+201
View File
@@ -0,0 +1,201 @@
import base64
import sys
import time
import json
import requests
import re
from datetime import datetime
sys.path.append('..')
from base.spider import Spider
from bs4 import BeautifulSoup
from urllib.parse import urlparse
class Spider(Spider):
def getName(self):
return "Litv"
def init(self, extend):
self.extend = extend
try:
self.extendDict = json.loads(extend)
except:
self.extendDict = {}
proxy = self.extendDict.get('proxy', None)
if proxy is None:
self.is_proxy = False
else:
self.proxy = proxy
self.is_proxy = True
pass
def getDependence(self):
return []
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def liveContent(self, url):
m3u_content = ['#EXTM3U']
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh,zh-CN;q=0.9,en-US;q=0.8,en;q=0.7',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'If-Modified-Since': 'Sun, 27 Apr 2025 02:30:02 GMT',
'If-None-Match': 'W/"680d96aa-36851"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36',
}
response = requests.get('http://www.yoozb.live/', headers=headers, verify=False)
html_content = response.content.decode('utf-8-sig')
soup = BeautifulSoup(html_content, 'html.parser')
data_div = soup.find('div', class_='data')
rows = data_div.find_all('tr')
# 初始化变量
current_date = ""
matches = {
"结束": [],
"直播": [],
"预告": []
}
for row in rows:
# 处理日期行
if 'class' in row.attrs and 'date' in row['class']:
date_text = row.td.get_text(strip=True).split('&nbsp')[0]
try:
dt = datetime.strptime(date_text, "%Y年%m月%d")
current_date = dt.strftime("%m-%d") # 格式化为 月-日
except:
current_date = ""
continue
# 跳过表头
if 'class' in row.attrs and 'head' in row['class']:
continue
# 处理比赛行
if row.find('td', class_='matcha'):
tds = row.find_all('td')
try:
# 提取基础信息
category = tds[1].get_text(strip=True)
time = f"{current_date} {tds[2].get_text(strip=True)}" if current_date else tds[2].get_text(strip=True)
status = tds[3].get_text(strip=True) or "预告"
home_team = tds[4].get_text(strip=True)
away_team = tds[6].get_text(strip=True)
live_links = [a['href'] for a in tds[7].find_all('a') if a.has_attr('href')]
# 状态标准化
status_key = "直播" if "直播" in status else "结束" if "结束" in status else "预告"
# 添加到对应分组
matches[status_key].append({
"时间": time,
"分类": category,
"主队": home_team,
"客队": away_team,
"直播链接": live_links
})
except IndexError:
continue
m3u_content = []
# 分组输出结果
for status_group in ['直播', '结束', '预告']:
#print(f"\n===== {status_group}的比赛 =====")
if status_group == "直播" or status_group == "结束":
for i, match in enumerate(matches[status_group], 1):
#ch_name = f"{i}. [{match['时间']}] {match['分类']}-{match['主队']} vs {match['客队']}"
ch_name = f"[{match['时间']}] {match['分类']}-{match['主队']}vs{match['客队']}"
links = match['直播链接'][:3]
#print("links:",links)
for k, link in enumerate(links, 1):
link = link.replace("\n","").replace(" ","")
if link:
ch_url = f"video://{link}"
extinf = f'#EXTINF:-1 tvg-name="{ch_name}{k}" group-title="{status_group}",{ch_name}{k}'
#print(f"{ch_name}[{k}],{ch_url}")
m3u_content.extend([extinf, ch_url])
elif status_group == "预告":
for i, match in enumerate(matches[status_group], 1):
ch_name = f"{i}. [{match['时间']}] {match['分类']}-{match['主队']} vs {match['客队']}"
ch_url = "https://gh-proxy.com/raw.githubusercontent.com/cqshushu/tvjk/master/yootv.mp4"
#print(f"{ch_name},{ch_url}")
extinf = f'#EXTINF:-1 tvg-name="{ch_name}]" group-title="{status_group}",{ch_name}'
#print(f"{ch_name}[{k}],{ch_url}")
m3u_content.extend([extinf, ch_url])
return '\n'.join(m3u_content)
def homeContent(self, filter):
return {}
def homeVideoContent(self):
return {}
def categoryContent(self, cid, page, filter, ext):
return {}
def detailContent(self, did):
return {}
def searchContent(self, key, quick, page='1'):
return {}
def searchContentPage(self, keywords, quick, page):
return {}
def playerContent(self, flag, pid, vipFlags):
return {}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
if params['type'] == "ts":
return self.get_ts(params)
return [302, "text/plain", None, {'Location': 'https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/mp4/xgplayer-demo-720p.mp4'}]
def proxyM3u8(self, params):
pid = params['pid']
info = pid.split(',')
a = info[0]
b = info[1]
c = info[2]
timestamp = int(time.time() / 4 - 355017625)
t = timestamp * 4
m3u8_text = f'#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:{timestamp}\n'
for i in range(10):
url = f'https://ntd-tgc.cdn.hinet.net/live/pool/{a}/litv-pc/{a}-avc1_6000000={b}-mp4a_134000_zho={c}-begin={t}0000000-dur=40000000-seq={timestamp}.ts'
if self.is_proxy:
url = f'http://127.0.0.1:9978/proxy?do=py&type=ts&url={self.b64encode(url)}'
m3u8_text += f'#EXTINF:4,\n{url}\n'
timestamp += 1
t += 4
return [200, "application/vnd.apple.mpegurl", m3u8_text]
def get_ts(self, params):
url = self.b64decode(params['url'])
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers, stream=True, proxies=self.proxy)
return [206, "application/octet-stream", response.content]
def destroy(self):
return '正在Destroy'
def b64encode(self, data):
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def b64decode(self, data):
return base64.b64decode(data.encode('utf-8')).decode('utf-8')
if __name__ == '__main__':
pass