This commit is contained in:
pricema
2025-10-08 01:35:03 +08:00
parent acdd8fe01c
commit c766586c4d
3 changed files with 663 additions and 315 deletions
+256 -235
View File
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# by @嗷呜
# 🌈 Love
import json
import random
import re
@@ -8,6 +8,7 @@ import threading
import time
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
@@ -19,135 +20,237 @@ from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
try:self.proxies = json.loads(extend).get('proxy',{})
try:self.proxies = json.loads(extend)
except:self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'sec-ch-ua-platform': '"macOS"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'DNT': '1',
'sec-ch-ua-mobile': '?0',
'Origin': '',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
self.host=self.host_late(self.gethosts())
# Use working dynamic URLs directly
self.host = self.get_working_host()
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
self.getcnh()
self.log(f"使用站点: {self.host}")
print(f"使用站点: {self.host}")
pass
def getName(self):
pass
return "🌈 老僧酿酒、吃瓜群众"
def isVideoFormat(self, url):
pass
# Treat direct media formats as playable without parsing
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
pass
return False
def destroy(self):
pass
def homeContent(self, filter):
data=self.getpq(requests.get(self.host, headers=self.headers,proxies=self.proxies).text)
result = {}
classes = []
for k in list(data('.navbar-nav.mr-auto').children('li').items())[1:-3]:
if k('ul'):
for j in k('ul li').items():
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'class': [], 'list': []}
data = self.getpq(response.text)
result = {}
classes = []
# Try to get categories from different possible locations
category_selectors = [
'.category-list ul li',
'.nav-menu li',
'.menu li',
'nav ul li'
]
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
# Skip placeholder or invalid entries
if not href or href == '#' or not name:
continue
classes.append({
'type_name': j('a').text(),
'type_id': j('a').attr('href').strip()+'/',
'type_name': name,
'type_id': href
})
else:
classes.append({
'type_name': k('a').text(),
'type_id': k('a').attr('href').strip()+'/',
})
result['class'] = classes
result['list'] = self.getlist(data('#index article a'))
return result
def getcnh(self):
data=self.getpq(requests.get(f"{self.host}/homeway.html", headers=self.headers,proxies=self.proxies).text)
url=data('.post-content[itemprop="articleBody"] blockquote p').eq(0)('a').attr('href')
parsed_url = urlparse(url)
host = parsed_url.scheme + "://" + parsed_url.netloc
self.setCache('host_51cn',host)
if classes:
break
# If no categories found, create some default ones
if not classes:
classes = [
{'type_name': '首页', 'type_id': '/'},
{'type_name': '最新', 'type_id': '/latest/'},
{'type_name': '热门', 'type_id': '/hot/'}
]
result['class'] = classes
result['list'] = self.getlist(data('#index article a'))
return result
except Exception as e:
print(f"homeContent error: {e}")
return {'class': [], 'list': []}
def homeVideoContent(self):
pass
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article a, #archive article a'))}
except Exception as e:
print(f"homeVideoContent error: {e}")
return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
if '@folder' in tid:
id=tid.replace('@folder','')
videos=self.getfod(id)
else:
data=self.getpq(requests.get(f"{self.host}{tid}{pg}", headers=self.headers,proxies=self.proxies).text)
videos=self.getlist(data('#archive article a'),tid)
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 1 if '@folder' in tid else 99999
result['limit'] = 90
result['total'] = 999999
return result
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
vdata=data('.post-content[itemprop="articleBody"]')
r=['.txt-apps','.line','blockquote','.tags','.content-tabs']
for i in r:vdata.remove(i)
p=vdata('p')
videos=[]
for i,x in enumerate(vdata('h2').items()):
c=i*2
videos.append({
'vod_id': p.eq(c)('a').attr('href'),
'vod_name': p.eq(c).text(),
'vod_pic': f"{self.getProxyUrl()}&url={p.eq(c+1)('img').attr('data-xkrkllgl')}&type=img",
'vod_remarks':x.text()
})
return videos
try:
if '@folder' in tid:
id = tid.replace('@folder', '')
videos = self.getfod(id)
else:
# Build URL properly
if tid.startswith('/'):
if pg and pg != '1':
url = f"{self.host}{tid}page/{pg}/"
else:
url = f"{self.host}{tid}"
else:
url = f"{self.host}/{tid}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': [], 'page': pg, 'pagecount': 1, 'limit': 90, 'total': 0}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article a, #index article a'), tid)
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 1 if '@folder' in tid else 99999
result['limit'] = 90
result['total'] = 999999
return result
except Exception as e:
print(f"categoryContent error: {e}")
return {'list': [], 'page': pg, 'pagecount': 1, 'limit': 90, 'total': 0}
def detailContent(self, ids):
url=f"{self.host}{ids[0]}"
data=self.getpq(requests.get(url, headers=self.headers,proxies=self.proxies).text)
vod = {'vod_play_from': '老僧酿酒'}
try:
clist = []
if data('.tags .keywords a'):
for k in data('.tags .keywords a').items():
title = k.text()
href = k.attr('href')
clist.append('[a=cr:' + json.dumps({'id': href, 'name': title}) + '/]' + title + '[/a]')
vod['vod_content'] = ' '.join(clist)
except:
vod['vod_content'] = data('.post-title').text()
try:
plist=[]
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
config = json.loads(k.attr('data-config'))
plist.append(f"视频{c}${config['video']['url']}")
vod['vod_play_url']='#'.join(plist)
except:
vod['vod_play_url']=f"可能没有视频${url}"
return {'list':[vod]}
url = f"{self.host}{ids[0]}" if not ids[0].startswith('http') else ids[0]
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': [{'vod_play_from': '老僧酿酒、吃瓜群众', 'vod_play_url': f'页面加载失败${url}'}]}
data = self.getpq(response.text)
vod = {'vod_play_from': '老僧酿酒、吃瓜群众'}
# Get content/description
try:
clist = []
if data('.tags .keywords a'):
for k in data('.tags .keywords a').items():
title = k.text()
href = k.attr('href')
if title and href:
clist.append('[a=cr:' + json.dumps({'id': href, 'name': title}) + '/]' + title + '[/a]')
vod['vod_content'] = ' '.join(clist) if clist else data('.post-title').text()
except:
vod['vod_content'] = data('.post-title').text() or '老僧酿酒、吃瓜群众视频'
# Get video URLs (build episode list when multiple players exist)
try:
plist = []
used_names = set()
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
config_attr = k.attr('data-config')
if config_attr:
try:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
# Determine a readable episode name from nearby headings if present
ep_name = ''
try:
parent = k.parents().eq(0)
# search up to a few ancestors for a heading text
for _ in range(3):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text() or ''
heading = heading.strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
except Exception:
ep_name = ''
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
# Ensure the name is unique
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
if video_url:
self.log(f"解析到视频: {name} -> {video_url}")
print(f"解析到视频: {name} -> {video_url}")
plist.append(f"{name}${video_url}")
except:
continue
if plist:
self.log(f"拼装播放列表,共{len(plist)}")
print(f"拼装播放列表,共{len(plist)}")
vod['vod_play_url'] = '#'.join(plist)
else:
vod['vod_play_url'] = f"未找到视频源${url}"
except Exception as e:
vod['vod_play_url'] = f"视频解析失败${url}"
return {'list': [vod]}
except Exception as e:
print(f"detailContent error: {e}")
return {'list': [{'vod_play_from': '老僧酿酒、吃瓜群众', 'vod_play_url': f'详情页加载失败${ids[0] if ids else ""}'}]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(requests.get(f"{self.host}/search/{key}/{pg}", headers=self.headers,proxies=self.proxies).text)
return {'list':self.getlist(data('#archive article a')),'page':pg}
try:
url = f"{self.host}/search/{key}/{pg}" if pg != "1" else f"{self.host}/search/{key}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': [], 'page': pg}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article a, #index article a'))
return {'list': videos, 'page': pg}
except Exception as e:
print(f"searchContent error: {e}")
return {'list': [], 'page': pg}
def playerContent(self, flag, id, vipFlags):
p=1
if '.m3u8' in id:p,id=0,self.proxy(id)
return {'parse': p, 'url': id, 'header': self.headers}
url = id
p = 1
if self.isVideoFormat(url):
# m3u8/mp4 direct play; when using proxy setting, wrap to proxy for m3u8
if '.m3u8' in url:
url = self.proxy(url)
p = 0
self.log(f"播放请求: parse={p}, url={url}")
print(f"播放请求: parse={p}, url={url}")
return {'parse': p, 'url': url, 'header': self.headers}
def localProxy(self, param):
if param.get('type') == 'img':
@@ -163,7 +266,6 @@ class Spider(Spider):
def m3Proxy(self, url):
url=self.d64(url)
ydata = requests.get(url, headers=self.headers, proxies=self.proxies, allow_redirects=False)
print(ydata.text)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
@@ -212,154 +314,73 @@ class Spider(Spider):
print(f"Base64解码错误: {str(e)}")
return ""
def gethosts(self):
url='https://51cg.fun'
curl=self.getCache('host_51cn')
if curl:
def get_working_host(self):
"""Get working host from known dynamic URLs"""
# Known working URLs from the dynamic gateway
dynamic_urls = [
'https://artist.vgwtswi.xyz',
'https://ability.vgwtswi.xyz',
'https://am.vgwtswi.xyz'
]
# Test each URL to find a working one
for url in dynamic_urls:
try:
data=self.getpq(requests.get(curl, headers=self.headers, proxies=self.proxies).text)('a').attr('href')
if data:
parsed_url = urlparse(data)
url = parsed_url.scheme + "://" + parsed_url.netloc
except:
pass
try:
html = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
html_pattern = r"Base64\.decode\('([^']+)'\)"
html_match = re.search(html_pattern, html('script').eq(-1).text(), re.DOTALL)
if not html_match:raise Exception("未找到html")
html = self.getpq(b64decode(html_match.group(1)).decode())('script').eq(-4).text()
return self.hstr(html)
except Exception as e:
self.log(f"获取: {str(e)}")
return ""
def hstr(self, html):
pattern = r"(backupLine\s*=\s*\[\])\s+(words\s*=)"
replacement = r"\1, \2"
html = re.sub(pattern, replacement, html)
data = f"""
var Vx = {{
range: function(start, end) {{
const result = [];
for (let i = start; i < end; i++) {{
result.push(i);
}}
return result;
}},
map: function(array, callback) {{
const result = [];
for (let i = 0; i < array.length; i++) {{
result.push(callback(array[i], i, array));
}}
return result;
}}
}};
Array.prototype.random = function() {{
return this[Math.floor(Math.random() * this.length)];
}};
var location = {{
protocol: "https:"
}};
function executeAndGetResults() {{
var allLines = lineAry.concat(backupLine);
var resultStr = JSON.stringify(allLines);
return resultStr;
}};
{html}
executeAndGetResults();
"""
return self.p_qjs(data)
def p_qjs(self, js_code):
try:
from com.whl.quickjs.wrapper import QuickJSContext
ctx = QuickJSContext.create()
result_json = ctx.evaluate(js_code)
ctx.destroy()
return json.loads(result_json)
except Exception as e:
self.log(f"执行失败: {e}")
return []
def get_domains(self):
html = self.getpq(requests.get("https://51cg.fun", headers=self.headers,proxies=self.proxies).text)
html_pattern = r"Base64\.decode\('([^']+)'\)"
html_match = re.search(html_pattern, html('script').eq(-1).text(), re.DOTALL)
if not html_match:
raise Exception("未找到html")
html = b64decode(html_match.group(1)).decode()
words_pattern = r"words\s*=\s*'([^']+)'"
words_match = re.search(words_pattern, html, re.DOTALL)
if not words_match:
raise Exception("未找到words")
words = words_match.group(1).split(',')
main_pattern = r"lineAry\s*=.*?words\.random\(\)\s*\+\s*'\.([^']+)'"
domain_match = re.search(main_pattern, html, re.DOTALL)
if not domain_match:
raise Exception("未找到主域名")
domain_suffix = domain_match.group(1)
domains = []
for _ in range(3):
random_word = random.choice(words)
domain = f"https://{random_word}.{domain_suffix}"
domains.append(domain)
return domains
def host_late(self, url_list):
if isinstance(url_list, str):
urls = [u.strip() for u in url_list.split(',')]
else:
urls = url_list
if len(urls) <= 1:
return urls[0] if urls else ''
results = {}
threads = []
def test_host(url):
try:
start_time = time.time()
response = requests.head(url,headers=self.headers,proxies=self.proxies,timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
# Verify it has the expected content structure
data = self.getpq(response.text)
articles = data('#index article a')
if len(articles) > 0:
self.log(f"选用可用站点: {url}")
print(f"选用可用站点: {url}")
return url
except Exception as e:
results[url] = float('inf')
continue
# Fallback to first URL if none work (better than crashing)
self.log(f"未检测到可用站点,回退: {dynamic_urls[0]}")
print(f"未检测到可用站点,回退: {dynamic_urls[0]}")
return dynamic_urls[0]
for url in urls:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def getlist(self,data,tid=''):
def getlist(self, data, tid=''):
videos = []
l='/mrdg' in tid
l = '/mrdg' in tid
for k in data.items():
a=k.attr('href')
b=k('h2').text()
c=k('span[itemprop="datePublished"]').text()
if a and b and c:
a = k.attr('href')
b = k('h2').text()
# Some pages might not include datePublished; use a fallback
c = k('span[itemprop="datePublished"]').text() or k('.post-meta, .entry-meta, time').text()
if a and b:
videos.append({
'vod_id': f"{a}{'@folder' if l else ''}",
'vod_name': b.replace('\n', ' '),
'vod_pic': self.getimg(k('script').text()),
'vod_remarks': c,
'vod_tag':'folder' if l else '',
'vod_remarks': c or '',
'vod_tag': 'folder' if l else '',
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
vdata=data('.post-content[itemprop="articleBody"]')
r=['.txt-apps','.line','blockquote','.tags','.content-tabs']
for i in r:vdata.remove(i)
p=vdata('p')
videos=[]
for i,x in enumerate(vdata('h2').items()):
c=i*2
videos.append({
'vod_id': p.eq(c)('a').attr('href'),
'vod_name': p.eq(c).text(),
'vod_pic': f"{self.getProxyUrl()}&url={p.eq(c+1)('img').attr('data-xkrkllgl')}&type=img",
'vod_remarks':x.text()
})
return videos
def getimg(self, text):
match = re.search(r"loadBannerDirect\('([^']+)'", text)
if match:
@@ -380,4 +401,4 @@ class Spider(Spider):
return pq(data)
except Exception as e:
print(f"{str(e)}")
return pq(data.encode('utf-8'))
return pq(data.encode('utf-8'))