This commit is contained in:
pricema
2025-10-07 01:31:31 +08:00
parent 239a874feb
commit 6e8af5c540
238 changed files with 58798 additions and 36 deletions
+352
View File
@@ -0,0 +1,352 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import random
import re
import sys
import threading
import time
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
try:self.proxies = json.loads(extend)
except:self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'sec-ch-ua-platform': '"macOS"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'DNT': '1',
'sec-ch-ua-mobile': '?0',
'Origin': '',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
self.host=self.host_late(self.gethosts())
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
self.getcnh()
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
data=self.getpq(requests.get(self.host, headers=self.headers,proxies=self.proxies).text)
result = {}
classes = []
for k in data('.category-list ul li').items():
classes.append({
'type_name': k('a').text(),
'type_id': k('a').attr('href')
})
result['class'] = classes
result['list'] = self.getlist(data('#index article a'))
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
if '@folder' in tid:
id = tid.replace('@folder', '')
videos = self.getfod(id)
else:
data = self.getpq(requests.get(f"{self.host}{tid}{pg}", headers=self.headers, proxies=self.proxies).text)
videos = self.getlist(data('#archive article a'), tid)
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 1 if '@folder' in tid else 99999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
url=f"{self.host}{ids[0]}"
data=self.getpq(requests.get(url, headers=self.headers,proxies=self.proxies).text)
vod = {'vod_play_from': '51吸瓜'}
try:
clist = []
if data('.tags .keywords a'):
for k in data('.tags .keywords a').items():
title = k.text()
href = k.attr('href')
clist.append('[a=cr:' + json.dumps({'id': href, 'name': title}) + '/]' + title + '[/a]')
vod['vod_content'] = ' '.join(clist)
except:
vod['vod_content'] = data('.post-title').text()
try:
plist=[]
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
config = json.loads(k.attr('data-config'))
plist.append(f"视频{c}${config['video']['url']}")
vod['vod_play_url']='#'.join(plist)
except:
vod['vod_play_url']=f"请停止活塞运动,可能没有视频${url}"
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(requests.get(f"{self.host}/search/{key}/{pg}", headers=self.headers,proxies=self.proxies).text)
return {'list':self.getlist(data('#archive article a')),'page':pg}
def playerContent(self, flag, id, vipFlags):
p=1
if '.m3u8' in id:p,id=0,self.proxy(id)
return {'parse': p, 'url': id, 'header': self.headers}
def localProxy(self, param):
if param.get('type') == 'img':
res=requests.get(param['url'], headers=self.headers, proxies=self.proxies, timeout=10)
return [200,res.headers.get('Content-Type'),self.aesimg(res.content)]
elif param.get('type') == 'm3u8':return self.m3Proxy(param['url'])
else:return self.tsProxy(param['url'])
def proxy(self, data, type='m3u8'):
if data and len(self.proxies):return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
else:return data
def m3Proxy(self, url):
url=self.d64(url)
ydata = requests.get(url, headers=self.headers, proxies=self.proxies, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = requests.get(url, headers=self.headers, proxies=self.proxies).content.decode('utf-8')
lines = data.strip().split('\n')
last_r = url[:url.rfind('/')]
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
iskey=True
for index, string in enumerate(lines):
if iskey and 'URI' in string:
pattern = r'URI="([^"]*)"'
match = re.search(pattern, string)
if match:
lines[index] = re.sub(pattern, f'URI="{self.proxy(match.group(1), "mkey")}"', string)
iskey=False
continue
if '#EXT' not in string:
if 'http' not in string:
domain = last_r if string.count('/') < 2 else durl
string = domain + ('' if string.startswith('/') else '/') + string
lines[index] = self.proxy(string, string.split('.')[-1].split('?')[0])
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def tsProxy(self, url):
url = self.d64(url)
data = requests.get(url, headers=self.headers, proxies=self.proxies, stream=True)
return [200, data.headers['Content-Type'], data.content]
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def gethosts(self):
url = 'https://51cg.fun'
curl = self.getCache('host_51cn')
if curl:
try:
data = self.getpq(requests.get(curl, headers=self.headers, proxies=self.proxies).text)('a').attr('href')
if data:
parsed_url = urlparse(data)
url = parsed_url.scheme + "://" + parsed_url.netloc
except:
pass
try:
html = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
html_pattern = r"Base64\.decode\('([^']+)'\)"
html_match = re.search(html_pattern, html('script').eq(-1).text(), re.DOTALL)
if not html_match: raise Exception("未找到html")
html = self.getpq(b64decode(html_match.group(1)).decode())('script').eq(-4).text()
return self.hstr(html)
except Exception as e:
self.log(f"获取: {str(e)}")
return ""
def getcnh(self):
data=self.getpq(requests.get(f"{self.host}/ybml.html", headers=self.headers,proxies=self.proxies).text)
url=data('.post-content[itemprop="articleBody"] blockquote p').eq(0)('a').attr('href')
parsed_url = urlparse(url)
host = parsed_url.scheme + "://" + parsed_url.netloc
self.setCache('host_51cn',host)
def hstr(self, html):
pattern = r"(backupLine\s*=\s*\[\])\s+(words\s*=)"
replacement = r"\1, \2"
html = re.sub(pattern, replacement, html)
data = f"""
var Vx = {{
range: function(start, end) {{
const result = [];
for (let i = start; i < end; i++) {{
result.push(i);
}}
return result;
}},
map: function(array, callback) {{
const result = [];
for (let i = 0; i < array.length; i++) {{
result.push(callback(array[i], i, array));
}}
return result;
}}
}};
Array.prototype.random = function() {{
return this[Math.floor(Math.random() * this.length)];
}};
var location = {{
protocol: "https:"
}};
function executeAndGetResults() {{
var allLines = lineAry.concat(backupLine);
var resultStr = JSON.stringify(allLines);
return resultStr;
}};
{html}
executeAndGetResults();
"""
return self.p_qjs(data)
def p_qjs(self, js_code):
try:
from com.whl.quickjs.wrapper import QuickJSContext
ctx = QuickJSContext.create()
result_json = ctx.evaluate(js_code)
ctx.destroy()
return json.loads(result_json)
except Exception as e:
self.log(f"执行失败: {e}")
return []
def host_late(self, url_list):
if isinstance(url_list, str):
urls = [u.strip() for u in url_list.split(',')]
else:
urls = url_list
if len(urls) <= 1:
return urls[0] if urls else ''
results = {}
threads = []
def test_host(url):
try:
start_time = time.time()
response = requests.head(url,headers=self.headers,proxies=self.proxies,timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
except Exception as e:
results[url] = float('inf')
for url in urls:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def getlist(self, data, tid=''):
videos = []
l = '/mrdg' in tid
for k in data.items():
a = k.attr('href')
b = k('h2').text()
c = k('span[itemprop="datePublished"]').text()
if a and b and c:
videos.append({
'vod_id': f"{a}{'@folder' if l else ''}",
'vod_name': b.replace('\n', ' '),
'vod_pic': self.getimg(k('script').text()),
'vod_remarks': c,
'vod_tag': 'folder' if l else '',
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
vdata=data('.post-content[itemprop="articleBody"]')
r=['.txt-apps','.line','blockquote','.tags','.content-tabs']
for i in r:vdata.remove(i)
p=vdata('p')
videos=[]
for i,x in enumerate(vdata('h2').items()):
c=i*2
videos.append({
'vod_id': p.eq(c)('a').attr('href'),
'vod_name': p.eq(c).text(),
'vod_pic': f"{self.getProxyUrl()}&url={p.eq(c+1)('img').attr('data-xkrkllgl')}&type=img",
'vod_remarks':x.text()
})
return videos
def getimg(self, text):
match = re.search(r"loadBannerDirect\('([^']+)'", text)
if match:
url = match.group(1)
return f"{self.getProxyUrl()}&url={url}&type=img"
else:
return ''
def aesimg(self, word):
key = b'f5d965df75336270'
iv = b'97b60394abc2fbe1'
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(word), AES.block_size)
return decrypted
def getpq(self, data):
try:
return pq(data)
except Exception as e:
print(f"{str(e)}")
return pq(data.encode('utf-8'))
+165
View File
@@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import time
import uuid
from base64 import b64decode, b64encode
import json
import sys
from urllib.parse import urlparse, urlunparse
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad, pad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = "https://api.230110.xyz"
phost = "https://cdn.230110.xyz"
headers = {
'origin': host,
'referer': f'{host}/',
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.8 Mobile/15E148 Safari/604.1',
}
def homeContent(self, filter):
data='9XSPkyFMrOOG34JSg//ZosMof45cyBo9hwZMZ5rvI6Yz/ZZlXWIf8/644OzwW+FNIOdJ61R/Lxjy1tqN+ZzokxtiVzb8LjYAkh6GFudwAUXFt9yS1ZjAxC3tDKrQsJQLk3nym0s00DBBzLBntRBDFz7nbba+OOBuQOZpL3CESGL42l4opdoViQLhO/dIizY1kIOk2NxxpDC9Z751gPl1ctHWuLWhuLG/QWgNWi/iHScjKrMHJKcC9GQHst/4Q3dgZ03eQIIVB6jvoV1XXoBCz6fjM/jM3BXpzSttT4Stglwy93gWuNWuZiKypHK2Q0lO10oM0ceRW2a0fPGId+rNYMRO3cR/C0ZueD4cmTAVOuxVr9ZZSP8/nhD0bHyAPONXtchIDJb0O/kdFHk2KTJfQ5q4fHOyzezczc4iQDV/R0S8cGZKM14MF+wytA/iljfj43H0UYqq5pM+MCUGRTdYEtuxCp0+A+DiOhNZwY/Km/TgBoGZQWGbpljJ2LAVnWhxX+ickLH7zuR/FeIwP/R8zOuR+8C8UlT9eHTqtvfNzaGdFxt316atHy8TNjRO7J5a177mqsHs3ziG0toDDzLDCbhRUjFgVA3ktahhXiWaaCo/ZGSJAA8TDO5DYqnJ0JDaX0ILPj8QB5zxrHYmRE8PboIr3RBAjz1sREbaHfjrUjoh29ePhlolLV00EvgoxP5knaqt5Ws/sq5IG57qKCAPgqXzblPLHToJGBtukKhLp8jbGJrkb6PVn4/jysks0NGE'
return {'class':self.aes(data,False)}
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
data = {"q": "", "filter": [f"type_id = {tid}"], "offset": (int(pg)-1) * 24, "limit": 24, "sort": ["video_time:desc"],"lang": "zh-cn", "route": "/videos/search"}
result = {}
if 'skey_' in tid:return self.searchContent(tid.split('_')[-1], True, pg)
result['list'] = self.getl(self.getdata(data))
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data={"limit":1,"filter":[f"video_id = {ids[0]}"],"lang":"zh-cn","route":"/videos/search"}
res = self.getdata(data)[0]
purl=urlunparse(urlparse(self.phost)._replace(path=urlparse(res.get('video_url')).path))
vod = {
'vod_play_from': 'dsysav',
'vod_play_url': f"{res.get('video_duration')}${purl}"
}
if res.get('video_tag'):
clist = []
tags=res['video_tag'].split(',')
for k in tags:
clist.append('[a=cr:' + json.dumps({'id': f'skey_{k}', 'name': k}) + '/]' + k + '[/a]')
vod['vod_content'] = ' '.join(clist)
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data={"q":key,"filter":[],"offset":(int(pg)-1) * 24,"limit":24,"sort":["video_time:desc"],"lang":"zh-cn","route":"/videos/search"}
return {'list':self.getl(self.getdata(data)),'page':pg}
def playerContent(self, flag, id, vipFlags):
if id.endswith('.mpd'):
id=f"{self.getProxyUrl()}&url={self.e64(id)}&type=mpd"
return {'parse': 0, 'url': id, 'header':self.headers}
def localProxy(self, param):
if param.get('type') and param['type']=='mpd':
url = self.d64(param.get('url'))
ids=url.split('/')
id=f"{ids[-3]}/{ids[-2]}/"
xpu = f"{self.getProxyUrl()}&path=".replace('&', '&amp;')
data = self.fetch(url, headers=self.headers).text
data = data.replace('initialization="', f'initialization="{xpu}{id}').replace('media="',f'media="{xpu}{id}')
return [200,'application/octet-stream',data]
else:
hsign=self.md5(f"AjPuom638LmWfWyeM5YueKuJ9PuWLdRn/mpd/{param.get('path')}1767196800")
bytes_data = bytes.fromhex(hsign)
sign = b64encode(bytes_data).decode('utf-8').replace('=','').replace('+','-').replace('/','_')
url=f"{self.phost}/mpd/{param.get('path')}?sign={sign}&expire=1767196800"
return [302,'text/plain',None,{'Location':url}]
def liveContent(self, url):
pass
def aes(self, text, operation=True):
key = b'OPQT123412FRANME'
iv = b'MRDCQP12QPM13412'
cipher = AES.new(key, AES.MODE_CBC, iv)
if operation:
ct_bytes = cipher.encrypt(pad(json.dumps(text).encode("utf-8"), AES.block_size))
ct = b64encode(ct_bytes).decode("utf-8")
return ct
else:
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return json.loads(pt.decode("utf-8"))
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self,encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def getl(self,data):
videos = []
for i in data:
img = i.get('video_cover')
if img and 'http' in img:img = urlunparse(urlparse(self.phost)._replace(path=urlparse(img).path))
videos.append({
'vod_id': i.get('video_id'),
'vod_name': i.get('video_title'),
'vod_pic': img,
'vod_remarks': i.get('video_duration'),
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getdata(self,data):
uid = str(uuid.uuid4())
t = int(time.time())
json_data = {
'sign': self.md5(f"{self.e64(json.dumps(data))}{uid}{t}AjPuom638LmWfWyeM5YueKuJ9PuWLdRn"),
'nonce': uid,
'timestamp': t,
'data': self.aes(data),
}
res = self.post(f"{self.host}/v1", json=json_data, headers=self.headers).json()
res = self.aes(res['data'], False)
return res
+351
View File
@@ -0,0 +1,351 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import copy
import gzip
import json
import re
import sys
import time
import uuid
from base64 import b64decode
from urllib.parse import urlparse, urlunparse
from Crypto.Hash import SHA1, HMAC
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
'''
{
"": "",
"ext": {
"site": "https://missav.ai",
"cfproxy": ""
}
}
自备:过cf代理如https://xx.vvvv.cc/proxy?url=
'''
try:
ext=json.loads(extend)
self.host,self.pcf,self.phost=ext.get('site',''),ext.get('cfproxy',''),''
if self.pcf:
parsed_url=urlparse(self.pcf)
self.phost=parsed_url.scheme + "://" + parsed_url.netloc
except:
pass
self.headers = {
'referer': f'{self.host}',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
}
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
xhost='https://client-rapi-missav.recombee.com'
fts = 'H4sIAAAAAAAAA23P30rDMBQG8FeRXM8X8FVGGZk90rA0HU3SMcZgXjn8V6p2BS2KoOiFAwUn2iK+TBP7GBpYXbG9/c6Pc77TnaABjNHOFtojVIDPUQcx7IJJvl9ydX30GwSYSpN0J4iZgTqJiywrPlN1vm/GJiPMJgGxJaZo2qnc3WXDuZIKMqSwUcX7Ui8O1DJRH3Gldh3CgMM2l31BhNGW8euq3PNFrac+PVNZ2NYzjMrbY53c6/Sm2uwDBczB7mGxqaDTWfkV6atXvXiu4FD2KeHOf3nxViahjv8YxwHYtWfyQ3NvFZYP85oSno3HvYDAiNevPqnosWFHAAPahnU6b2DXY8Jp0bO8QdfEmlo/SBd5PPUBAAA='
actfts = 'H4sIAAAAAAAAA5WVS2sUQRRG/0rT6xTcqq5Xiwjm/X6sQxZjbBLRBBeOIEGIIEgWrtwI4lJEQsjGhU6Iv2bGcf6FVUUydW/d1SxT55sDfbpmsn9WP+/e1A+q+rh7dnT8qp6rT3snXTz4N7icXH4OB697L/rxZP+sPo1g+Ot8PPg+vvoyOb+IOJ7Vb+fuqGxkJSrZmMOTexiORDjAGxs3GvDGinCANjp5NPbo4NHYo5PHYI8OHoM9JnkM9pjgMdhjksdijwkeiz02eSz22OCx2GOTx2GPDR6HPS55HPa44HHY45LHY48LHo89Pnk89vjg8djjk6fFHh88bfAcxNXduz/sv0Qvfnz74+/X65lf/OMqfzD9ndF8geYzWijQQkaLBVrMaKlASxktF2g5o5UCrWS0WqDVjNYKtJbReoHWM9oo0EZGmwXazGirQFsZbRdoO6OdAu1ktFug3Yz2CrRH70TvqEN3YvT75+TP+5nvxMNKwf0pCIWur4JwM5spVCAaRJtI9ZQ2IPBPg47UTKkGgb/wJlI7pQYE/ho/QsiCaFv61E+7J338Izj6MJi8+xSefnhzO/PTK1CmGt58G118zM+pDBloPtBk0PBBQwaKDxQZSD6QZAB8QN6UbNlAtmTg+cCTgeMDRwaWDywZ8JKSlJS8pCQlJS8pSUnJS0pSUvKSkpSUvKQkJYGXBFISeEkgJYGXBFISeEkgJYGXBFISeEkgJYGXBFISeEkgJYGXBFISeElI/7QO/gOZ7bAksggAAA=='
def homeContent(self, filter):
html = self.getpq(f"{self.host}/cn",headers=self.headers)
result = {}
filters = {}
classes=[]
for i in list(html('.mt-4.space-y-4').items())[:2]:
for j in i('ul li').items():
id=j('a').attr('href').split('/')[-1]
classes.append({
'type_name': j.text(),
'type_id': id
})
filters[id] = copy.deepcopy(self.ungzip(self.fts))
if id=='actresses':filters[id].extend(self.ungzip(self.actfts))
result['class'] = classes
result['filters'] = filters
result['list'] = self.getlist(html('.grid-cols-2.md\\:grid-cols-3 .thumbnail.group'))
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
params={
'page':'' if pg=='1' else pg
}
ft = {
'filters': extend.get('filters', ''),
'sort': extend.get('sort', '')
}
if tid in ['makers', 'genres']:
ft = {}
elif tid == 'actresses':
ft = {
'height': extend.get('height', ''),
'cup': extend.get('cup', ''),
'debut': extend.get('debut', ''),
'age': extend.get('age', ''),
'sort': extend.get('sort', '')
}
params.update(ft)
params = {k: v for k, v in params.items() if v != ""}
url=tid if 'http' in tid else f"{self.host}/cn/{tid}"
data=self.getpq(url,headers=self.headers,params=params)
result = {}
if tid in ['makers', 'genres']:
videos = self.gmsca(data)
elif tid == 'actresses':
videos = self.actca(data)
else:
videos = self.getlist(data('.grid-cols-2.md\\:grid-cols-3 .thumbnail.group'))
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
v=self.getpq(ids[0],headers=self.headers)
sctx=v('body script').text()
urls=self.execute_js(sctx)
if not urls:urls=f"嗅探${ids[0]}"
c=v('.space-y-2 .text-secondary')
ac,dt,bq=[],[],[]
for i in c.items():
if re.search(r"导演:|女优:",i.text()):
ac.extend(['[a=cr:' + json.dumps({'id': j.attr('href'), 'name': j.text()}) + '/]' + j.text() + '[/a]' for j in i('a').items()])
elif '发行商:' in i.text():
dt.extend(['[a=cr:' + json.dumps({'id': j.attr('href'), 'name': j.text()}) + '/]' + j.text() + '[/a]' for j in i('a').items()])
elif re.search(r"标籤:|系列:|类型:",i.text()):
bq.extend(['[a=cr:' + json.dumps({'id': j.attr('href'), 'name': j.text()}) + '/]' + j.text() + '[/a]' for j in i('a').items()])
np={'MissAV':urls,'相关视频':self.getfov(ids[0])}
vod = {
'type_name': c.eq(-3)('a').text(),
'vod_year': c.eq(0)('span').text(),
'vod_remarks': ' '.join(bq),
'vod_actor': ' '.join(ac),
'vod_director': ' '.join(dt),
'vod_content': v('.text-secondary.break-all').text()
}
names,plist=[],[]
for i,j in np.items():
if j:
names.append(i)
plist.append(j)
vod['vod_play_from']='$$$'.join(names)
vod['vod_play_url']='$$$'.join(plist)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(f"{self.host}/cn/search/{key}",headers=self.headers,params={'page':pg})
return {'list': self.getlist(data('.grid-cols-2.md\\:grid-cols-3 .thumbnail.group')),'page':pg}
def playerContent(self, flag, id, vipFlags):
p=0 if '' in flag else 1
if '相关' in flag:
try:
v = self.getpq(id, headers=self.headers)
sctx = v('body script').text()
urls = self.execute_js(sctx)
if not urls: raise Exception("没有找到地址")
p,id=0,urls.split('#')[0].split('$')[-1]
except:
p=1
return {'parse': p, 'url': id, 'header': self.headers}
def localProxy(self, param):
pass
def josn_to_params(self, params, skip_empty=False):
query = []
for k, v in params.items():
if skip_empty and not v:
continue
query.append(f"{k}={v}")
return "&".join(query)
def getpq(self, url, headers=None,params='',min=0,max=3):
if not min and self.phost in url:
url=url.replace(self.phost,self.host)
if params=={}:params=''
if params:
params=f"?{self.josn_to_params(params)}"
response=self.fetch(f"{self.pcf}{url}{params}", headers=headers,verify=False)
res=response.text
if 300 <= response.status_code < 400:
if min >= max:raise Exception(f"重定向次数过多: {res}")
match = re.search(r"url=['\"](https?://[^'\"]+)['\"]", res)
if match:
url = match.group(1).replace(self.phost, self.host)
return self.getpq(url, headers=headers,params='',min=min+1,max=max)
try:
return pq(res)
except Exception as e:
print(f"{str(e)}")
return pq(res.encode('utf-8'))
def getlist(self,data):
videos = []
names,ids=[],[]
for i in data.items():
k = i('.overflow-hidden.shadow-lg a')
id=k.eq(0).attr('href')
name=i('.text-secondary').text()
if id and id not in ids and name not in names:
ids.append(id)
names.append(name)
videos.append({
'vod_id': id,
'vod_name': name,
'vod_pic': k.eq(0)('img').attr('data-src'),
'vod_year': '' if len(list(k.items())) < 3 else k.eq(1).text(),
'vod_remarks': k.eq(-1).text(),
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def gmsca(self,data):
acts=[]
for i in data('.grid.grid-cols-2.md\\:grid-cols-3 div').items():
acts.append({
'vod_id': i('.text-nord13').attr('href'),
'vod_name': i('.text-nord13').text(),
'vod_pic': '',
'vod_remarks': i('.text-nord10').text(),
'vod_tag': 'folder',
'style': {"type": "rect", "ratio": 1.33}
})
return acts
def actca(self,data):
acts=[]
for i in data('.max-w-full ul li').items():
acts.append({
'vod_id': i('a').attr('href'),
'vod_name': i('img').attr('alt'),
'vod_pic': i('img').attr('src'),
'vod_year': i('.text-nord10').eq(-1).text(),
'vod_remarks': i('.text-nord10').eq(0).text(),
'vod_tag': 'folder',
'style': {"type": "oval"}
})
return acts
def getfov(self, url):
try:
h=self.headers.copy()
ids=url.split('/')
h.update({'referer':f'{url}/'})
t=str(int(time.time()))
params = {
'frontend_timestamp': t,
'frontend_sign': self.getsign(f"/missav-default/batch/?frontend_timestamp={t}"),
}
uid=str(uuid.uuid4())
json_data = {
'requests': [
{
'method': 'POST',
'path': f'/recomms/items/{ids[-1]}/items/',
'params': {
'targetUserId': uid,
'count': 13,
'scenario': 'desktop-watch-next-side',
'returnProperties': True,
'includedProperties': [
'title_cn',
'duration',
'has_chinese_subtitle',
'has_english_subtitle',
'is_uncensored_leak',
'dm',
],
'cascadeCreate': True,
},
},
{
'method': 'POST',
'path': f'/recomms/items/{ids[-1]}/items/',
'params': {
'targetUserId': uid,
'count': 12,
'scenario': 'desktop-watch-next-bottom',
'returnProperties': True,
'includedProperties': [
'title_cn',
'duration',
'has_chinese_subtitle',
'has_english_subtitle',
'is_uncensored_leak',
'dm',
],
'cascadeCreate': True,
},
},
],
'distinctRecomms': True,
}
data = self.post(f'{self.xhost}/missav-default/batch/', params=params,headers=h, json=json_data).json()
vdata=[]
for i in data:
for j in i['json']['recomms']:
if j.get('id'):
vdata.append(f"{j['values']['title_cn']}${self.host}/cn/{j['id']}")
return '#'.join(vdata)
except Exception as e:
print(f"获取推荐失败: {e}")
return ''
def getsign(self, text):
message_bytes = text.encode('utf-8')
key_bytes = b'Ikkg568nlM51RHvldlPvc2GzZPE9R4XGzaH9Qj4zK9npbbbTly1gj9K4mgRn0QlV'
h = HMAC.new(key_bytes, digestmod=SHA1)
h.update(message_bytes)
signature = h.hexdigest()
return signature
def ungzip(self, data):
result=gzip.decompress(b64decode(data)).decode('utf-8')
return json.loads(result)
def execute_js(self, jstxt):
js_code = re.search(r"eval\(function\(p,a,c,k,e,d\).*?return p\}(.*?)\)\)", jstxt).group(0)
try:
from com.whl.quickjs.wrapper import QuickJSContext
ctx = QuickJSContext.create()
ctx.evaluate(js_code)
result = []
common_vars = ["source", "source842", "source1280"]
for var_name in common_vars:
try:
value = ctx.getGlobalObject().getProperty(var_name)
if value is not None:
if isinstance(value, str):
value_str = value
else:
value_str = value.toString()
if "http" in value_str:
result.append(f"{var_name}${value_str}")
self.log(f"找到变量 {var_name} = {value_str[:50]}...")
except Exception as var_err:
self.log(f"获取变量 {var_name} 失败: {var_err}")
ctx.destroy()
return '#'.join(result)
except Exception as e:
self.log(f"执行失败: {e}")
return None
+301
View File
@@ -0,0 +1,301 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import re
import sys
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from pyquery import PyQuery as pq
from requests import Session
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
'''
内置代理配置:真心jar为例
{
"key": "Phb",
"name": "Phb",
"type": 3,
"searchable": 1,
"quickSearch": 1,
"filterable": 1,
"api": "./py/Phb.py",
"ext": {
"http": "http://127.0.0.1:1072",
"https": "http://127.0.0.1:1072"
}
},
注:http(s)代理都是http
'''
try:self.proxies = json.loads(extend)
except:self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5410.0 Safari/537.36',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'dnt': '1',
'sec-ch-ua-mobile': '?0',
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
}
self.host = self.gethost()
self.headers.update({'referer': f'{self.host}/', 'origin': self.host})
self.session = Session()
self.session.proxies.update(self.proxies)
self.session.headers.update(self.headers)
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
result = {}
cateManual = {
"视频": "/video",
"片单": "/playlists",
"频道": "/channels",
"分类": "/categories",
"明星": "/pornstars"
}
classes = []
filters = {}
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
data = self.getpq('/recommended')
vhtml = data("#recommendedListings .pcVideoListItem .phimage")
return {'list': self.getlist(vhtml)}
def categoryContent(self, tid, pg, filter, extend):
vdata = []
result = {}
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
if tid == '/video' or '_this_video' in tid:
pagestr = f'&' if '?' in tid else f'?'
tid = tid.split('_this_video')[0]
data = self.getpq(f'{tid}{pagestr}page={pg}')
vdata = self.getlist(data('#videoCategory .pcVideoListItem'))
elif tid == '/playlists':
data = self.getpq(f'{tid}?page={pg}')
vhtml = data('#playListSection li')
vdata = []
for i in vhtml.items():
vdata.append({
'vod_id': 'playlists_click_' + i('.thumbnail-info-wrapper .display-block a').attr('href'),
'vod_name': i('.thumbnail-info-wrapper .display-block a').attr('title'),
'vod_pic': self.proxy(i('.largeThumb').attr('src')),
'vod_tag': 'folder',
'vod_remarks': i('.playlist-videos .number').text(),
'style': {"type": "rect", "ratio": 1.33}
})
elif tid == '/channels':
data = self.getpq(f'{tid}?o=rk&page={pg}')
vhtml = data('#filterChannelsSection li .description')
vdata = []
for i in vhtml.items():
vdata.append({
'vod_id': 'director_click_' + i('.avatar a').attr('href'),
'vod_name': i('.avatar img').attr('alt'),
'vod_pic': self.proxy(i('.avatar img').attr('src')),
'vod_tag': 'folder',
'vod_remarks': i('.descriptionContainer ul li').eq(-1).text(),
'style': {"type": "rect", "ratio": 1.33}
})
elif tid == '/categories' and pg == '1':
result['pagecount'] = 1
data = self.getpq(f'{tid}')
vhtml = data('.categoriesListSection li .relativeWrapper')
vdata = []
for i in vhtml.items():
vdata.append({
'vod_id': i('a').attr('href') + '_this_video',
'vod_name': i('a').attr('alt'),
'vod_pic': self.proxy(i('a img').attr('src')),
'vod_tag': 'folder',
'style': {"type": "rect", "ratio": 1.33}
})
elif tid == '/pornstars':
data = self.getpq(f'{tid}?o=t&page={pg}')
vhtml = data('#popularPornstars .performerCard .wrap')
vdata = []
for i in vhtml.items():
vdata.append({
'vod_id': 'pornstars_click_' + i('a').attr('href'),
'vod_name': i('.performerCardName').text(),
'vod_pic': self.proxy(i('a img').attr('src')),
'vod_tag': 'folder',
'vod_year': i('.performerVideosViewsCount span').eq(0).text(),
'vod_remarks': i('.performerVideosViewsCount span').eq(-1).text(),
'style': {"type": "rect", "ratio": 1.33}
})
elif 'playlists_click' in tid:
tid = tid.split('click_')[-1]
if pg == '1':
hdata = self.getpq(tid)
self.token = hdata('#searchInput').attr('data-token')
vdata = self.getlist(hdata('#videoPlaylist .pcVideoListItem .phimage'))
else:
tid = tid.split('playlist/')[-1]
data = self.getpq(f'/playlist/viewChunked?id={tid}&token={self.token}&page={pg}')
vdata = self.getlist(data('.pcVideoListItem .phimage'))
elif 'director_click' in tid:
tid = tid.split('click_')[-1]
data = self.getpq(f'{tid}/videos?page={pg}')
vdata = self.getlist(data('#showAllChanelVideos .pcVideoListItem .phimage'))
elif 'pornstars_click' in tid:
tid = tid.split('click_')[-1]
data = self.getpq(f'{tid}/videos?page={pg}')
vdata = self.getlist(data('#mostRecentVideosSection .pcVideoListItem .phimage'))
result['list'] = vdata
return result
def detailContent(self, ids):
url = f"{self.host}{ids[0]}"
data = self.getpq(ids[0])
vn = data('meta[property="og:title"]').attr('content')
dtext = data('.userInfo .usernameWrap a')
pdtitle = '[a=cr:' + json.dumps(
{'id': 'director_click_' + dtext.attr('href'), 'name': dtext.text()}) + '/]' + dtext.text() + '[/a]'
vod = {
'vod_name': vn,
'vod_director': pdtitle,
'vod_remarks': (data('.userInfo').text() + ' / ' + data('.ratingInfo').text()).replace('\n', ' / '),
'vod_play_from': 'Pornhub',
'vod_play_url': ''
}
js_content = data("#player script").eq(0).text()
plist = [f"{vn}${self.e64(f'{1}@@@@{url}')}"]
try:
pattern = r'"mediaDefinitions":\s*(\[.*?\]),\s*"isVertical"'
match = re.search(pattern, js_content, re.DOTALL)
if match:
json_str = match.group(1)
udata = json.loads(json_str)
plist = [
f"{media['height']}${self.e64(f'{0}@@@@{url}')}"
for media in udata[:-1]
if (url := media.get('videoUrl'))
]
except Exception as e:
print(f"提取mediaDefinitions失败: {str(e)}")
vod['vod_play_url'] = '#'.join(plist)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
data = self.getpq(f'/video/search?search={key}&page={pg}')
return {'list': self.getlist(data('#videoSearchResult .pcVideoListItem .phimage'))}
def playerContent(self, flag, id, vipFlags):
ids = self.d64(id).split('@@@@')
if '.m3u8' in ids[1]: ids[1] = self.proxy(ids[1], 'm3u8')
return {'parse': int(ids[0]), 'url': ids[1], 'header': self.headers}
def localProxy(self, param):
url = self.d64(param.get('url'))
if param.get('type') == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
def m3Proxy(self, url):
ydata = requests.get(url, headers=self.headers, proxies=self.proxies, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = requests.get(url, headers=self.headers, proxies=self.proxies).content.decode('utf-8')
lines = data.strip().split('\n')
last_r = url[:url.rfind('/')]
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
for index, string in enumerate(lines):
if '#EXT' not in string:
if 'http' not in string:
domain = last_r if string.count('/') < 2 else durl
string = domain + ('' if string.startswith('/') else '/') + string
lines[index] = self.proxy(string, string.split('.')[-1].split('?')[0])
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def tsProxy(self, url):
data = requests.get(url, headers=self.headers, proxies=self.proxies, stream=True)
return [200, data.headers['Content-Type'], data.content]
def gethost(self):
try:
response = requests.get('https://www.pornhub.com', headers=self.headers, proxies=self.proxies,
allow_redirects=False)
return response.headers['Location'][:-1]
except Exception as e:
print(f"获取主页失败: {str(e)}")
return "https://www.pornhub.com"
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def getlist(self, data):
vlist = []
for i in data.items():
vlist.append({
'vod_id': i('a').attr('href'),
'vod_name': i('a').attr('title'),
'vod_pic': self.proxy(i('img').attr('src')),
'vod_remarks': i('.bgShadeEffect').text() or i('.duration').text(),
'style': {'ratio': 1.33, 'type': 'rect'}
})
return vlist
def getpq(self, path):
try:
response = self.session.get(f'{self.host}{path}').text
return pq(response.encode('utf-8'))
except Exception as e:
print(f"请求失败: , {str(e)}")
return None
def proxy(self, data, type='img'):
if data and len(self.proxies):return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
else:return data
+288
View File
@@ -0,0 +1,288 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import sys
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from pyquery import PyQuery as pq
from requests import Session
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
try:self.proxies = json.loads(extend)
except:self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5410.0 Safari/537.36',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'dnt': '1',
'sec-ch-ua-mobile': '?0',
'origin': '',
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': f'',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
}
self.host = self.gethost()
self.session = Session()
self.headers.update({'origin': self.host,'referer': f'{self.host}/'})
self.session.proxies.update(self.proxies)
self.session.headers.update(self.headers)
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
result = {}
cateManual = {
"4K": "/4k",
"国产": "two_click_/categories/chinese",
"最新": "/newest",
"最佳": "/best",
"频道": "/channels",
"类别": "/categories",
"明星": "/pornstars"
}
classes = []
filters = {}
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
if k != '4K': filters[cateManual[k]] = [{'key': 'type', 'name': '类型', 'value': [{'n': '4K', 'v': '/4k'}]}]
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
data = self.getpq()
return {'list': self.getlist(data(".thumb-list--sidebar .thumb-list__item"))}
def categoryContent(self, tid, pg, filter, extend):
vdata = []
result = {}
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
if tid in ['/4k', '/newest', '/best'] or 'two_click_' in tid:
if 'two_click_' in tid: tid = tid.split('click_')[-1]
data = self.getpq(f'{tid}{extend.get("type", "")}/{pg}')
vdata = self.getlist(data(".thumb-list--sidebar .thumb-list__item"))
elif tid == '/channels':
data = self.getpq(f'{tid}/{pg}')
jsdata = self.getjsdata(data)
for i in jsdata['channels']:
vdata.append({
'vod_id': f"two_click_" + i.get('channelURL'),
'vod_name': i.get('channelName'),
'vod_pic': self.proxy(i.get('siteLogoURL')),
'vod_year': f'videos:{i.get("videoCount")}',
'vod_tag': 'folder',
'vod_remarks': f'subscribers:{i["subscriptionModel"].get("subscribers")}',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif tid == '/categories':
result['pagecount'] = pg
data = self.getpq(tid)
self.cdata = self.getjsdata(data)
for i in self.cdata['layoutPage']['store']['popular']['assignable']:
vdata.append({
'vod_id': "one_click_" + i.get('id'),
'vod_name': i.get('name'),
'vod_pic': '',
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif tid == '/pornstars':
data = self.getpq(f'{tid}/{pg}')
pdata = self.getjsdata(data)
for i in pdata['pagesPornstarsComponent']['pornstarListProps']['pornstars']:
vdata.append({
'vod_id': f"two_click_" + i.get('pageURL'),
'vod_name': i.get('name'),
'vod_pic': self.proxy(i.get('imageThumbUrl')),
'vod_remarks': i.get('translatedCountryName'),
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif 'one_click' in tid:
result['pagecount'] = pg
tid = tid.split('click_')[-1]
for i in self.cdata['layoutPage']['store']['popular']['assignable']:
if i.get('id') == tid:
for j in i['items']:
vdata.append({
'vod_id': f"two_click_" + j.get('url'),
'vod_name': j.get('name'),
'vod_pic': self.proxy(j.get('thumb')),
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
result['list'] = vdata
return result
def detailContent(self, ids):
data = self.getpq(ids[0])
djs = self.getjsdata(data)
vn = data('meta[property="og:title"]').attr('content')
dtext = data('#video-tags-list-container')
href = dtext('a').attr('href')
title = dtext('span[class*="body-bold-"]').eq(0).text()
pdtitle = ''
if href:
pdtitle = '[a=cr:' + json.dumps({'id': 'two_click_' + href, 'name': title}) + '/]' + title + '[/a]'
vod = {
'vod_name': vn,
'vod_director': pdtitle,
'vod_remarks': data('.rb-new__info').text(),
'vod_play_from': 'Xhamster',
'vod_play_url': ''
}
try:
plist = []
d = djs['xplayerSettings']['sources']
f = d.get('standard')
def custom_sort_key(url):
quality = url.split('$')[0]
number = ''.join(filter(str.isdigit, quality))
number = int(number) if number else 0
return -number, quality
if f:
for key, value in f.items():
if isinstance(value, list):
for info in value:
id = self.e64(f'{0}@@@@{info.get("url") or info.get("fallback")}')
plist.append(f"{info.get('label') or info.get('quality')}${id}")
plist.sort(key=custom_sort_key)
if d.get('hls'):
for format_type, info in d['hls'].items():
if url := info.get('url'):
encoded = self.e64(f'{0}@@@@{url}')
plist.append(f"{format_type}${encoded}")
except Exception as e:
plist = [f"{vn}${self.e64(f'{1}@@@@{ids[0]}')}"]
print(f"获取视频信息失败: {str(e)}")
vod['vod_play_url'] = '#'.join(plist)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
data = self.getpq(f'/search/{key}?page={pg}')
return {'list': self.getlist(data(".thumb-list--sidebar .thumb-list__item")), 'page': pg}
def playerContent(self, flag, id, vipFlags):
ids = self.d64(id).split('@@@@')
if '.m3u8' in ids[1]: ids[1] = self.proxy(ids[1], 'm3u8')
return {'parse': int(ids[0]), 'url': ids[1], 'header': self.headers}
def localProxy(self, param):
url = self.d64(param['url'])
if param.get('type') == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
def gethost(self):
try:
response = requests.get('https://xhamster.com',proxies=self.proxies,headers=self.headers,allow_redirects=False)
return response.headers['Location']
except Exception as e:
print(f"获取主页失败: {str(e)}")
return "https://zn.xhamster.com"
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def getlist(self, data):
vlist = []
for i in data.items():
vlist.append({
'vod_id': i('.role-pop').attr('href'),
'vod_name': i('.video-thumb-info a').text(),
'vod_pic': self.proxy(i('.role-pop img').attr('src')),
'vod_year': i('.video-thumb-info .video-thumb-views').text().split(' ')[0],
'vod_remarks': i('.role-pop div[data-role="video-duration"]').text(),
'style': {'ratio': 1.33, 'type': 'rect'}
})
return vlist
def getpq(self, path=''):
h = '' if path.startswith('http') else self.host
response = self.session.get(f'{h}{path}').text
try:
return pq(response)
except Exception as e:
print(f"{str(e)}")
return pq(response.encode('utf-8'))
def getjsdata(self, data):
vhtml = data("script[id='initials-script']").text()
jst = json.loads(vhtml.split('initials=')[-1][:-1])
return jst
def m3Proxy(self, url):
ydata = requests.get(url, headers=self.headers, proxies=self.proxies, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = requests.get(url, headers=self.headers, proxies=self.proxies).content.decode('utf-8')
lines = data.strip().split('\n')
last_r = url[:url.rfind('/')]
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
for index, string in enumerate(lines):
if '#EXT' not in string:
if 'http' not in string:
domain = last_r if string.count('/') < 2 else durl
string = domain + ('' if string.startswith('/') else '/') + string
lines[index] = self.proxy(string, string.split('.')[-1].split('?')[0])
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def tsProxy(self, url):
data = requests.get(url, headers=self.headers, proxies=self.proxies, stream=True)
return [200, data.headers['Content-Type'], data.content]
def proxy(self, data, type='img'):
if data and len(self.proxies):return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
else:return data
+276
View File
@@ -0,0 +1,276 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import re
import sys
from urllib.parse import urlparse
import requests
from pyquery import PyQuery as pq
from base64 import b64decode, b64encode
from requests import Session
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
try:self.proxies = json.loads(extend)
except:self.proxies = {}
self.session = Session()
self.session.proxies.update(self.proxies)
self.session.headers.update(self.headers)
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = "https://www.xvideos.com"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5410.0 Safari/537.36",
"pragma": "no-cache",
"cache-control": "no-cache",
"sec-ch-ua-platform": "\"Windows\"",
"sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
"dnt": "1",
"origin":host,
'referer':f'{host}/',
"sec-ch-ua-mobile": "?0",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
"priority": "u=1, i"
}
def homeContent(self, filter):
result = {}
cateManual = {
"最新": "/new",
"最佳": "/best",
"频道": "/channels-index",
"标签": "/tags",
"明星": "/pornstars-index"
}
classes = []
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
result['class'] = classes
return result
def homeVideoContent(self):
data = self.getpq()
return {'list':self.getlist(data(".mozaique .frame-block"))}
def categoryContent(self, tid, pg, filter, extend):
vdata = []
result = {}
page = f"/{int(pg) - 1}" if pg != '1' else ''
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
if tid=='/new' or 'tags_click' in tid:
if 'tags_click' in tid:tid=tid.split('click_')[-1]
data=self.getpq(f'{tid}/{pg}')
vdata=self.getlist(data(".mozaique .frame-block"))
elif tid=='/best':
if pg=='1':
self.path=self.session.get(f'{self.host}{tid}',allow_redirects=False).headers['Location']
data=self.getpq(f'{self.path}{page}')
vdata=self.getlist(data(".mozaique .frame-block"))
elif tid=='/channels-index' or tid=='/pornstars-index':
data = self.getpq(f'{tid}{page}')
vhtml=data(".mozaique .thumb-block")
for i in vhtml.items():
a = i('.thumb-inside .thumb a')
match = re.search(r'src="([^"]+)"', a('script').text())
img=''
if match:
img = match.group(1).strip()
vdata.append({
'vod_id': f"channels_click_{'/channels'if tid=='/channels-index' else ''}"+a.attr('href'),
'vod_name': a('.profile-name').text() or i('.profile-name').text().replace('\xa0','/'),
'vod_pic': self.proxy(img),
'vod_tag': 'folder',
'vod_remarks': i('.thumb-under .profile-counts').text(),
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif tid=='/tags':
result['pagecount'] = pg
vhtml = self.getpq(tid)
vhtml = vhtml('.tags-list')
for d in vhtml.items():
for i in d('li a').items():
vdata.append({
'vod_id': "tags_click_"+i.attr('href'),
'vod_name': i.attr('title') or i('b').text(),
'vod_pic': '',
'vod_tag': 'folder',
'vod_remarks': i('.navbadge').text(),
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif 'channels_click' in tid:
tid=tid.split('click_')[-1]
vhtml=self.session.post(f'{self.host}{tid}/videos/best/{int(pg)-1}').json()
for i in vhtml['videos']:
vdata.append({
'vod_id': i.get('u'),
'vod_name': i.get('tf'),
'vod_pic': self.proxy(i.get('il')),
'vod_year': i.get('n'),
'vod_remarks': i.get('d'),
'style': {'ratio': 1.33, 'type': 'rect'}
})
result['list'] = vdata
return result
def detailContent(self, ids):
url = f"{self.host}{ids[0]}"
data = self.getpq(ids[0])
vn=data('meta[property="og:title"]').attr('content')
dtext=data('.main-uploader a')
href=dtext.attr('href')
pdtitle=''
if href and href.count('/') < 2:
href=f'/channels{href}'
pdtitle = '[a=cr:' + json.dumps({'id': 'channels_click_'+href, 'name': dtext('.name').text()}) + '/]' + dtext('.name').text() + '[/a]'
vod = {
'vod_name': vn,
'vod_director':pdtitle,
'vod_remarks': data('.page-title').text().replace(vn,''),
'vod_play_from': 'Xvideos',
'vod_play_url': ''
}
js_content = data("#video-player-bg script")
jstr=''
for script in js_content.items():
content = script.text()
if 'setVideoUrlLow' in content and 'html5player' in content:
jstr = content
break
plist = [f"{vn}${self.e64(f'{1}@@@@{url}')}"]
def extract_video_urls(js_content):
try:
low = re.search(r'setVideoUrlLow\([\'"]([^\'"]+)[\'"]\)', js_content)
high = re.search(r'setVideoUrlHigh\([\'"]([^\'"]+)[\'"]\)', js_content)
hls = re.search(r'setVideoHLS\([\'"]([^\'"]+)[\'"]\)', js_content)
return {
'hls': hls.group(1) if hls else None,
'high': high.group(1) if high else None,
'low': low.group(1) if low else None
}
except Exception as e:
print(f"提取视频URL失败: {str(e)}")
return {}
if jstr:
try:
urls = extract_video_urls(jstr)
plist = [
f"{quality}${self.e64(f'{0}@@@@{url}')}"
for quality, url in urls.items()
if url
]
except Exception as e:
print(f"提取url失败: {str(e)}")
vod['vod_play_url'] = '#'.join(plist)
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(f'/?k={key}&p={int(pg)-1}')
return {'list':self.getlist(data(".mozaique .frame-block")),'page':pg}
def playerContent(self, flag, id, vipFlags):
ids=self.d64(id).split('@@@@')
if '.m3u8' in ids[1]: ids[1] = self.proxy(ids[1], 'm3u8')
return {'parse': int(ids[0]), 'url': ids[1], 'header': self.headers}
def localProxy(self, param):
url=self.d64(param['url'])
if param.get('type') == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self,encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def getlist(self, data):
vlist=[]
for i in data.items():
a=i('.thumb-inside .thumb a')
b=i('.thumb-under .title a')
vlist.append({
'vod_id': a.attr('href'),
'vod_name': b('a').attr('title'),
'vod_pic': self.proxy(a('img').attr('data-src')),
'vod_year': a('.video-hd-mark').text(),
'vod_remarks': b('.duration').text(),
'style': {'ratio': 1.33, 'type': 'rect'}
})
return vlist
def getpq(self, path=''):
response = self.session.get(f'{self.host}{path}').text
try:
return pq(response)
except Exception as e:
print(f"{str(e)}")
return pq(response.encode('utf-8'))
def m3Proxy(self, url):
ydata = requests.get(url, headers=self.headers, proxies=self.proxies, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = requests.get(url, headers=self.headers, proxies=self.proxies).content.decode('utf-8')
lines = data.strip().split('\n')
last_r = url[:url.rfind('/')]
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
for index, string in enumerate(lines):
if '#EXT' not in string:
if 'http' not in string:
domain=last_r if string.count('/') < 2 else durl
string = domain + ('' if string.startswith('/') else '/') + string
lines[index] = self.proxy(string, string.split('.')[-1].split('?')[0])
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def tsProxy(self, url):
data = requests.get(url, headers=self.headers, proxies=self.proxies, stream=True)
return [200, data.headers['Content-Type'], data.content]
def proxy(self, data, type='img'):
if data and len(self.proxies):return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
else:return data
+212
View File
@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import sys
from base64 import b64encode, b64decode
from Crypto.Hash import MD5, SHA256
sys.path.append('..')
from base.spider import Spider
from Crypto.Cipher import AES
import json
import time
class Spider(Spider):
def getName(self):
return "lav"
def init(self, extend=""):
self.id = self.ms(str(int(time.time() * 1000)))[:16]
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
host = "http://sir_new.tiansexyl.tv"
t = str(int(time.time() * 1000))
headers = {'User-Agent': 'okhttp-okgo/jeasonlzy', 'Connection': 'Keep-Alive',
'Content-Type': 'application/x-www-form-urlencoded'}
def homeContent(self, filter):
cateManual = {"演员": "actor", "分类": "avsearch", }
classes = []
for k in cateManual:
classes.append({'type_name': k, 'type_id': cateManual[k]})
j = {'code': 'homePage', 'mod': 'down', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id}
body = self.aes(j)
data = self.post(f'{self.host}/api.php?t={str(int(time.time() * 1000))}', data=body, headers=self.headers).json()['data']
data1 = self.aes(data, False)['data']
self.r = data1['r']
for i, d in enumerate(data1['avTag']):
# if i == 4:
# break
classes.append({'type_name': d['name'], 'type_id': d['tag']})
resutl = {}
resutl["class"] = classes
return resutl
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
id = tid.split("@@")
result = {}
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
if id[0] == 'avsearch':
if pg == '1':
j = {'code': 'avsearch', 'mod': 'search', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id}
if len(id) > 1:
j = {'code': 'find', 'mod': 'tag', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'type': 'av', 'dis': 'new', 'page': str(pg), 'tag': id[1]}
elif id[0] == 'actor':
j = {'mod': 'actor', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv', 'app_type': 'rn',
'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn', 'oauth_id': self.id,
'page': str(pg), 'filter': ''}
if len(id) > 1:
j = {'code': 'eq', 'mod': 'actor', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'page': str(pg), 'id': id[1], 'actor': id[2]}
else:
j = {'code': 'search', 'mod': 'av', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'page': str(pg), 'tag': id[0]}
body = self.aes(j)
data = self.post(f'{self.host}/api.php?t={str(int(time.time() * 1000))}', data=body, headers=self.headers).json()['data']
data1 = self.aes(data, False)['data']
videos = []
if tid == 'avsearch' and len(id) == 1:
for item in data1:
videos.append({"vod_id": id[0] + "@@" + str(item.get('tags')), 'vod_name': item.get('name'),
'vod_pic': self.imgs(item.get('ico')), 'vod_tag': 'folder',
'style': {"type": "rect", "ratio": 1.33}})
elif tid == 'actor' and len(id) == 1:
for item in data1:
videos.append({"vod_id": id[0] + "@@" + str(item.get('id')) + "@@" + item.get('name'),
'vod_name': item.get('name'), 'vod_pic': self.imgs(item.get('cover')),
'vod_tag': 'folder', 'style': {"type": "oval"}})
else:
for item in data1:
if item.get('_id'):
videos.append({"vod_id": str(item.get('id')), 'vod_name': item.get('title'),
'vod_pic': self.imgs(item.get('cover_thumb') or item.get('cover_full')),
'vod_remarks': item.get('good'), 'style': {"type": "rect", "ratio": 1.33}})
result["list"] = videos
return result
def detailContent(self, ids):
id = ids[0]
j = {'code': 'detail', 'mod': 'av', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'id': id}
body = self.aes(j)
data = self.post(f'{self.host}/api.php?t={str(int(time.time() * 1000))}', data=body, headers=self.headers).json()['data']
data1 = self.aes(data, False)['line']
vod = {}
play = []
for itt in data1:
a = itt['line'].get('s720')
if a:
b = a.split('.')
b[0] = 'https://m3u8'
a = '.'.join(b)
play.append(itt['info']['tips'] + "$" + a)
break
vod["vod_play_from"] = 'LAV'
vod["vod_play_url"] = "#".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
pass
def playerContent(self, flag, id, vipFlags):
url = self.getProxyUrl() + "&url=" + b64encode(id.encode('utf-8')).decode('utf-8') + "&type=m3u8"
self.hh = {'User-Agent': 'dd', 'Connection': 'Keep-Alive', 'Referer': self.r}
result = {}
result["parse"] = 0
result["url"] = url
result["header"] = self.hh
return result
def localProxy(self, param):
url = param["url"]
if param.get('type') == "m3u8":
return self.vod(b64decode(url).decode('utf-8'))
else:
return self.img(url)
def vod(self, url):
data = self.fetch(url, headers=self.hh).text
key = bytes.fromhex("13d47399bda541b85e55830528d4e66f1791585b2d2216f23215c4c63ebace31")
iv = bytes.fromhex(data[:32])
data = data[32:]
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
data_bytes = bytes.fromhex(data)
decrypted = cipher.decrypt(data_bytes)
encoded = decrypted.decode("utf-8").replace("\x08", "")
return [200, "application/vnd.apple.mpegur", encoded]
def imgs(self, url):
return self.getProxyUrl() + '&url=' + url
def img(self, url):
type = url.split('.')[-1]
data = self.fetch(url).text
key = bytes.fromhex("ba78f184208d775e1553550f2037f4af22cdcf1d263a65b4d5c74536f084a4b2")
iv = bytes.fromhex(data[:32])
data = data[32:]
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
data_bytes = bytes.fromhex(data)
decrypted = cipher.decrypt(data_bytes)
return [200, f"image/{type}", decrypted]
def ms(self, data, m=False):
h = MD5.new()
if m:
h = SHA256.new()
h.update(data.encode('utf-8'))
return h.hexdigest()
def aes(self, data, operation=True):
key = bytes.fromhex("620f15cfdb5c79c34b3940537b21eda072e22f5d7151456dec3932d7a2b22c53")
t = str(int(time.time()))
ivt = self.ms(t)
if operation:
data = json.dumps(data, separators=(',', ':'))
iv = bytes.fromhex(ivt)
else:
iv = bytes.fromhex(data[:32])
data = data[32:]
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
if operation:
data_bytes = data.encode('utf-8')
encrypted = cipher.encrypt(data_bytes)
ep = f'{ivt}{encrypted.hex()}'
edata = f"data={ep}&timestamp={t}0d27dfacef1338483561a46b246bf36d"
sign = self.ms(self.ms(edata, True))
edata = f"timestamp={t}&data={ep}&sign={sign}"
return edata
else:
data_bytes = bytes.fromhex(data)
decrypted = cipher.decrypt(data_bytes)
return json.loads(decrypted.decode('utf-8'))
+223
View File
@@ -0,0 +1,223 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import random
import re
import sys
import threading
import time
from base64 import b64decode
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host=self.host_late(self.get_domains())
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'dnt': '1',
'upgrade-insecure-requests': '1',
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'navigate',
'sec-fetch-user': '?1',
'sec-fetch-dest': 'document',
'accept-language': 'zh-CN,zh;q=0.9',
'priority': 'u=0, i'
}
def homeContent(self, filter):
data=self.getpq(self.fetch(self.host, headers=self.headers).text)
result = {}
classes = []
for k in data('.category-list ul li').items():
classes.append({
'type_name': k('a').text(),
'type_id': k('a').attr('href')
})
result['class'] = classes
result['list'] = self.getlist(data('#index article a'))
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
data=self.getpq(self.fetch(f"{self.host}{tid}{pg}", headers=self.headers).text)
result = {}
result['list'] = self.getlist(data('#archive article a'))
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
url=f"{self.host}{ids[0]}"
data=self.getpq(self.fetch(url, headers=self.headers).text)
vod = {
'vod_content': data('.post-title').text(),
'vod_play_from': '51吸瓜',
'vod_play_url': f"请停止活塞运动,可能没有视频${url}"
}
try:
clist = []
if data('.tags .keywords a'):
for k in data('.tags .keywords a').items():
title = k.text()
href = k.attr('href')
clist.append('[a=cr:' + json.dumps({'id': href, 'name': title}) + '/]' + title + '[/a]')
vod['vod_content'] = ' '.join(clist)
except:
pass
try:
plist=[]
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
config = json.loads(k.attr('data-config'))
plist.append(f"视频{c}${config['video']['url']}")
vod['vod_play_url']='#'.join(plist)
except:
pass
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(self.fetch(f"{self.host}/search/{key}/{pg}", headers=self.headers).text)
return {'list':self.getlist(data('#archive article a')),'page':pg}
def playerContent(self, flag, id, vipFlags):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'sec-ch-ua-platform': '"macOS"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'DNT': '1',
'sec-ch-ua-mobile': '?0',
'Origin': self.host,
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
return {'parse': 1, 'url': id, 'header': headers}
def localProxy(self, param):
res=self.fetch(param['url'], headers=self.headers, timeout=10)
return [200,res.headers.get('Content-Type'),self.aesimg(res.content)]
def get_domains(self):
html = self.getpq(self.fetch("https://www.qcbtifw.xyz", headers=self.headers).text)
html_pattern = r"Base64\.decode\('([^']+)'\)"
html_match = re.search(html_pattern, html('script').eq(-1).text(), re.DOTALL)
if not html_match:
raise Exception("未找到html_match")
html = b64decode(html_match.group(1)).decode()
words_pattern = r"words\s*=\s*'([^']+)'"
words_match = re.search(words_pattern, html)
if not words_match:
raise Exception("未找到words")
words = words_match.group(1).split(',')
main_pattern = r"lineAry\s*=.*?words\.random\(\)\s*\+\s*'\.([^']+)'"
domain_match = re.search(main_pattern, html, re.DOTALL)
if not domain_match:
raise Exception("未找到主域名")
domain_suffix = domain_match.group(1)
domains = []
for _ in range(3):
random_word = random.choice(words)
domain = f"https://{random_word}.{domain_suffix}"
domains.append(domain)
return domains
def host_late(self, url_list):
if isinstance(url_list, str):
urls = [u.strip() for u in url_list.split(',')]
else:
urls = url_list
if len(urls) <= 1:
return urls[0] if urls else ''
results = {}
threads = []
def test_host(url):
try:
start_time = time.time()
response = requests.head(url, timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
except Exception as e:
results[url] = float('inf')
for url in urls:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def getlist(self,data):
videos = []
for k in data.items():
a=k.attr('href')
b=k('h2').text()
c=k('span[itemprop="datePublished"]').text()
if a and b and c:
videos.append({
'vod_id': a,
'vod_name': b.replace('\n', ' '),
'vod_pic': self.getimg(k('script').text()),
'vod_remarks': c,
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getimg(self, text):
match = re.search(r"loadBannerDirect\('([^']+)'", text)
if match:
url = match.group(1)
return f"{self.getProxyUrl()}&url={url}&type=img"
else:
return ''
def aesimg(self, word):
key = b'f5d965df75336270'
iv = b'97b60394abc2fbe1'
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(word), AES.block_size)
return decrypted
def getpq(self, data):
try:
return pq(data)
except Exception as e:
print(f"{str(e)}")
return pq(data.encode('utf-8'))
+186
View File
@@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import random
import string
import sys
import time
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.did = self.getdid()
self.token,self.phost,self.host = self.gettoken()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
hs = ['fhoumpjjih', 'dyfcbkggxn', 'rggwiyhqtg', 'bpbbmplfxc']
def homeContent(self, filter):
data = self.fetch(f'{self.host}/api/video/queryClassifyList?mark=4', headers=self.headers()).json()['encData']
data1 = self.aes(data)
result = {}
classes = []
for k in data1['data']:
classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']})
result['class'] = classes
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
path=f'/api/short/video/getShortVideos?classifyId={tid}&videoMark=4&page={pg}&pageSize=20'
result = {}
videos = []
data=self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
vdata=self.aes(data)
for k in vdata['data']:
videos.append({"vod_id": k['videoId'], 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + k['coverImg'],
'vod_remarks': self.dtim(k.get('playTime')),'style': {"type": "rect", "ratio": 1.33}})
result["list"] = videos
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
path = f'/api/video/getVideoById?videoId={ids[0]}'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
v = self.aes(data)
d=f'{v["title"]}$auth_key={v["authKey"]}&path={v["videoUrl"]}'
vod = {'vod_name': v["title"], 'type_name': ''.join(v.get('tagTitles',[])),'vod_play_from': v.get('nickName') or "小红书官方", 'vod_play_url': d}
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg='1'):
pass
def playerContent(self, flag, id, vipFlags):
h=self.headers()
h['Authorization'] = h.pop('aut')
del h['deviceid']
result = {"parse": 0, "url": f"{self.host}/api/m3u8/decode/authPath?{id}", "header": h}
return result
def localProxy(self, param):
return self.action(param)
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def aes(self, word):
key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==")
iv = key
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size)
return json.loads(decrypted.decode('utf-8'))
def dtim(self, seconds):
try:
seconds = int(seconds)
hours = seconds // 3600
remaining_seconds = seconds % 3600
minutes = remaining_seconds // 60
remaining_seconds = remaining_seconds % 60
formatted_minutes = str(minutes).zfill(2)
formatted_seconds = str(remaining_seconds).zfill(2)
if hours > 0:
formatted_hours = str(hours).zfill(2)
return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}"
else:
return f"{formatted_minutes}:{formatted_seconds}"
except:
return ''
def getdid(self):
did = self.getCache('did')
if not did:
t = str(int(time.time()))
did = self.md5(t)
self.setCache('did', did)
return did
def getsign(self):
t=str(int(time.time() * 1000))
return self.md5(t[3:8]),t
def gettoken(self, i=0, max_attempts=10):
if i >= len(self.hs) or i >= max_attempts:
return ''
current_domain = f"https://{''.join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(5, 10)))}.{self.hs[i]}.work"
try:
sign,t=self.getsign()
url = f'{current_domain}/api/user/traveler'
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
'deviceid': self.did, 't': t, 's': sign, }
data = {'deviceId': self.did, 'tt': 'U', 'code': '', 'chCode': 'dafe13'}
data1 = self.post(url, json=data, headers=headers)
data1.raise_for_status()
data2 = data1.json()['data']
return data2['token'], data2['imgDomain'],current_domain
except:
return self.gettoken(i+1, max_attempts)
def headers(self):
sign,t=self.getsign()
henda = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
'deviceid': self.did, 't': t, 's': sign, 'aut': self.token}
return henda
def action(self, param):
headers = {
'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'}
data = self.fetch(f'{self.phost}{param["url"]}', headers=headers)
type=data.headers.get('Content-Type').split(';')[0]
base64_data = self.img(data.content, 100, '2020-zq3-888')
return [200, type, base64_data]
def img(self, data: bytes, length: int, key: str):
GIF = b'\x47\x49\x46'
JPG = b'\xFF\xD8\xFF'
PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'
def is_dont_need_decode_for_gif(data):
return len(data) > 2 and data[:3] == GIF
def is_dont_need_decode_for_jpg(data):
return len(data) > 7 and data[:3] == JPG
def is_dont_need_decode_for_png(data):
return len(data) > 7 and data[1:8] == PNG[1:8]
if is_dont_need_decode_for_png(data):
return data
elif is_dont_need_decode_for_gif(data):
return data
elif is_dont_need_decode_for_jpg(data):
return data
else:
key_bytes = key.encode('utf-8')
result = bytearray(data)
for i in range(length):
result[i] ^= key_bytes[i % len(key_bytes)]
return bytes(result)
+246
View File
@@ -0,0 +1,246 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import random
import string
import sys
import time
from base64 import b64decode
from urllib.parse import quote
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.did = self.getdid()
self.token,self.phost,self.host = self.gettoken()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
hs=['wcyfhknomg','pdcqllfomw','alxhzjvean','bqeaaxzplt','hfbtpixjso']
ua='Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/twitter/ver=1.4.4'
def homeContent(self, filter):
data = self.fetch(f'{self.host}/api/video/classifyList', headers=self.headers()).json()['encData']
data1 = self.aes(data)
result = {'filters': {"1": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "2": [{"key": "fl", "name": "分类",
"value": [
{"n": "最近更新", "v": "1"},
{"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}],
"3": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "4": [{"key": "fl", "name": "分类",
"value": [
{"n": "最近更新", "v": "1"},
{"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}],
"5": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "6": [{"key": "fl", "name": "分类",
"value": [
{"n": "最近更新", "v": "1"},
{"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}],
"7": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "jx": [{"key": "type", "name": "精选",
"value": [{"n": "日榜", "v": "1"},
{"n": "周榜", "v": "2"},
{"n": "月榜", "v": "3"},
{"n": "总榜",
"v": "4"}]}]}}
classes = [{'type_name': "精选", 'type_id': "jx"}]
for k in data1['data']:
classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']})
result['class'] = classes
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
path = f'/api/video/queryVideoByClassifyId?pageSize=20&page={pg}&classifyId={tid}&sortType={extend.get("fl", "1")}'
if 'click' in tid:
path = f'/api/video/queryPersonVideoByType?pageSize=20&page={pg}&userId={tid.replace("click", "")}'
if tid == 'jx':
path = f'/api/video/getRankVideos?pageSize=20&page={pg}&type={extend.get("type", "1")}'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
data1 = self.aes(data)['data']
result = {}
videos = []
for k in data1:
id = f'{k.get("videoId")}?{k.get("userId")}?{k.get("nickName")}'
if 'click' in tid:
id = id + 'click'
videos.append({"vod_id": id, 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + f"&url={k.get('coverImg')[0]}",
'vod_remarks': self.dtim(k.get('playTime')),'style': {"type": "rect", "ratio": 1.33}})
result["list"] = videos
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
vid = ids[0].replace('click', '').split('?')
path = f'/api/video/can/watch?videoId={vid[0]}'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
data1 = self.aes(data)['playPath']
clj = '[a=cr:' + json.dumps({'id': vid[1] + 'click', 'name': vid[2]}) + '/]' + vid[2] + '[/a]'
if 'click' in ids[0]:
clj = vid[2]
vod = {'vod_director': clj, 'vod_play_from': "推特", 'vod_play_url': vid[2] + "$" + data1}
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg='1'):
path = f'/api/search/keyWord?pageSize=20&page={pg}&searchWord={quote(key)}&searchType=1'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
data1 = self.aes(data)['videoList']
result = {}
videos = []
for k in data1:
id = f'{k.get("videoId")}?{k.get("userId")}?{k.get("nickName")}'
videos.append({"vod_id": id, 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + f"&url={k.get('coverImg')[0]}",
'vod_remarks': self.dtim(k.get('playTime')), 'style': {"type": "rect", "ratio": 1.33}})
result["list"] = videos
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def playerContent(self, flag, id, vipFlags):
return {"parse": 0, "url": id, "header": self.headers()}
def localProxy(self, param):
return self.imgs(param)
def getsign(self):
t = str(int(time.time() * 1000))
sign = self.md5(t)
return sign, t
def headers(self):
sign, t = self.getsign()
return {'User-Agent': self.ua,'deviceid': self.did, 't': t, 's': sign, 'aut': self.token}
def aes(self, word):
key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==")
iv = key
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size)
return json.loads(decrypted.decode('utf-8'))
def dtim(self, seconds):
try:
seconds = int(seconds)
hours = seconds // 3600
remaining_seconds = seconds % 3600
minutes = remaining_seconds // 60
remaining_seconds = remaining_seconds % 60
formatted_minutes = str(minutes).zfill(2)
formatted_seconds = str(remaining_seconds).zfill(2)
if hours > 0:
formatted_hours = str(hours).zfill(2)
return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}"
else:
return f"{formatted_minutes}:{formatted_seconds}"
except:
return "666"
def gettoken(self, i=0, max_attempts=10):
if i >= len(self.hs) or i >= max_attempts:
return ''
current_domain = f"https://{''.join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(5, 10)))}.{self.hs[i]}.work"
try:
url = f'{current_domain}/api/user/traveler'
sign, t = self.getsign()
headers = {
'User-Agent': self.ua,
'Accept': 'application/json',
'deviceid': self.did,
't': t,
's': sign,
}
data = {
'deviceId': self.did,
'tt': 'U',
'code': '##X-4m6Goo4zzPi1hF##',
'chCode': 'tt09'
}
response = self.post(url, json=data, headers=headers)
response.raise_for_status()
data1 = response.json()['data']
return data1['token'], data1['imgDomain'], current_domain
except Exception as e:
return self.gettoken(i + 1, max_attempts)
def getdid(self):
did = self.getCache('did')
if not did:
t = str(int(time.time()))
did = self.md5(t)
self.setCache('did', did)
return did
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def imgs(self, param):
headers = {'User-Agent': self.ua}
url = param['url']
data = self.fetch(f"{self.phost}{url}",headers=headers)
bdata = self.img(data.content, 100, '2020-zq3-888')
return [200, data.headers.get('Content-Type'), bdata]
def img(self, data: bytes, length: int, key: str):
GIF = b'\x47\x49\x46'
JPG = b'\xFF\xD8\xFF'
PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'
def is_dont_need_decode_for_gif(data):
return len(data) > 2 and data[:3] == GIF
def is_dont_need_decode_for_jpg(data):
return len(data) > 7 and data[:3] == JPG
def is_dont_need_decode_for_png(data):
return len(data) > 7 and data[1:8] == PNG[1:8]
if is_dont_need_decode_for_png(data):
return data
elif is_dont_need_decode_for_gif(data):
return data
elif is_dont_need_decode_for_jpg(data):
return data
else:
key_bytes = key.encode('utf-8')
result = bytearray(data)
for i in range(length):
result[i] ^= key_bytes[i % len(key_bytes)]
return bytes(result)
+349
View File
@@ -0,0 +1,349 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import re
import sys
import threading
import time
from base64 import b64decode, b64encode
import requests
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.did = self.getdid()
self.token=self.gettoken()
domain=self.domain()
self.phost=self.host_late(domain['domain_preview'])
self.bhost=domain['domain_original']
self.names=domain['name_original']
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = 'https://lulu-api-92mizw.jcdwn.com'
headers = {
'User-Agent': 'okhttp/4.11.0',
'referer': 'https://app.nova-traffic-1688.com',
}
def homeContent(self, filter):
BASE_CATEGORIES = [
{'type_name': '片商', 'type_id': 'makers'},
{'type_name': '演员', 'type_id': 'actor'}
]
SORT_OPTIONS = {
'key': 'sortby',
'name': 'sortby',
'value': [
{'n': '最新', 'v': 'on_shelf_at'},
{'n': '最热', 'v': 'hot'}
]
}
tags = self.getdata('/api/v1/video/tag?current=1&pageSize=100&level=1')
producers = self.getdata('/api/v1/video/producer?current=1&pageSize=100&status=1')
regions = self.getdata('/api/v1/video/region?current=1&pageSize=100')
result = {'class': [], 'filters': {}}
result['class'].extend(BASE_CATEGORIES)
for category in BASE_CATEGORIES:
result['filters'][category['type_id']] = [SORT_OPTIONS]
if tags.get('data'):
main_tag = tags['data'][0]
result['class'].append({
'type_name': '发现',
'type_id': f'{main_tag["id"]}_tag'
})
tag_values = [
{'n': tag['name'], 'v': f"{tag['id']}_tag"}
for tag in tags['data'][1:]
if tag.get('id')
]
result['filters'][f'{main_tag["id"]}_tag'] = [
{'key': 'tagtype', 'name': 'tagtype', 'value': tag_values},
SORT_OPTIONS
]
region_filter = {
'key': 'region_ids',
'name': 'region_ids',
'value': [
{'n': region['name'], 'v': region['id']}
for region in regions['data'][1:]
if region.get('id')
]
}
self.aid=regions['data'][0]['id']
result['filters']['actor'].append({
'key': 'region_id',
'name': 'region_id',
'value': region_filter['value'][:2]
})
complex_sort = {
'key': 'sortby',
'name': 'sortby',
'value': [
{'n': '综合', 'v': 'complex'},
*SORT_OPTIONS['value']
]
}
producer_filters = [region_filter, complex_sort]
for producer in producers['data']:
result['class'].append({
'type_name': producer['name'],
'type_id': f'{producer["id"]}_sx'
})
result['filters'][f'{producer["id"]}_sx'] = producer_filters
return result
def homeVideoContent(self):
data=self.getdata('/api/v1/video?current=1&pageSize=60&region_ids=&sortby=complex')
return {'list':self.getlist(data)}
def categoryContent(self, tid, pg, filter, extend):
if 'act' in tid:
data=self.getact(tid, pg, filter, extend)
elif 'tag' in tid:
data=self.gettag(tid, pg, filter, extend)
elif 'sx' in tid:
data=self.getsx(tid, pg, filter, extend)
elif 'make' in tid:
data=self.getmake(tid, pg, filter, extend)
result = {}
result['list'] = data
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
v=self.getdata(f'/api/v1/video?current=1&pageSize=1&id={ids[0]}&detail=1')
v=v['data'][0]
vod = {
'vod_name': v.get('title'),
'type_name': '/'.join(v.get('tag_names',[])),
'vod_play_from': '浴火社',
'vod_play_url': ''
}
p=[]
for i,j in enumerate(self.bhost):
p.append(f'{self.names[i]}${j}{v.get("highres_url") or v.get("preview_url")}@@@{v["id"]}')
vod['vod_play_url'] = '#'.join(p)
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data=self.getdata(f'/api/v1/video?current={pg}&pageSize=30&title={key}')
return {'list':self.getlist(data),'page':pg}
def playerContent(self, flag, id, vipFlags):
url=f'{self.getProxyUrl()}&url={self.e64(id)}&type=m3u8'
return {'parse': 0, 'url': url, 'header': self.headers}
def localProxy(self, param):
if param.get('type')=='image':
data=self.fetch(param.get('url'), headers=self.headers).text
content=b64decode(data.encode('utf-8'))
return [200, 'image/png', content]
if param.get('type')=='m3u8':
ids=self.d64(param.get('url')).split('@@@')
data=self.fetch(ids[0], headers=self.headers).text
lines = data.strip().split('\n')
for index, string in enumerate(lines):
if 'URI=' in string:
replacement = f'URI="{self.getProxyUrl()}&id={ids[1]}&type=mkey"'
lines[index]=re.sub(r'URI="[^"]+"', replacement, string)
continue
if '#EXT' not in string and 'http' not in string:
last_slash_index = ids[0].rfind('/')
lpath = ids[0][:last_slash_index + 1]
lines[index] = f'{lpath}{string}'
data = '\n'.join(lines)
return [200, 'audio/x-mpegurl', data]
if param.get('type')=='mkey':
id=param.get('id')
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36',
'authdog': self.token
}
response = self.fetch(f'{self.host}/api/v1/video/key/{id}', headers=headers)
type=response.headers.get('Content-Type')
return [200, type, response.content]
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self,encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def getdid(self):
did = self.md5(str(int(time.time() * 1000)))
try:
if self.getCache('did'):
return self.getCache('did')
else:
self.setCache('did', did)
return did
except Exception as e:
self.setCache('did', did)
return did
def host_late(self, url_list):
if isinstance(url_list, str):
urls = [u.strip() for u in url_list.split(',')]
else:
urls = url_list
if len(urls) <= 1:
return urls[0] if urls else ''
results = {}
threads = []
def test_host(url):
try:
start_time = time.time()
response = requests.head(url, timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
except Exception as e:
results[url] = float('inf')
for url in urls:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def domain(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36',
}
response = self.fetch(f'{self.host}/api/v1/system/domain', headers=headers)
return self.aes(response.content)
def aes(self, word):
key = b64decode("amtvaWc5ZnJ2Ym5taml1eQ==")
iv = b64decode("AAEFAwQFCQcICQoLDA0ODw==")
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(word), AES.block_size)
return json.loads(decrypted.decode('utf-8'))
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def gettoken(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36',
'cookei': self.md5(f'{self.did}+android'),
'siteid': '11',
'siteauthority': 'lls888.tv'
}
json_data = {
'app_id': 'jukjoe.zqgpi.hfzvde.sdot',
'phone_device': 'Redmi M2012K10C',
'device_id': self.did,
'device_type': 'android',
'invite_code': 'oi1o',
'is_first': 1,
'os_version': '11',
'version': '8.59',
}
response = self.post(f'{self.host}/api/v1/member/device', headers=headers, json=json_data)
tdata = self.aes(response.content)
return f'{tdata["token_type"]} {tdata["access_token"]}'
def getdata(self, path):
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36',
'authdog': self.token
}
response = self.fetch(f'{self.host}{path}', headers=headers)
return self.aes(response.content)
def getimg(self, path):
if not path.startswith('/'):
path = f'/{path}'
return f'{self.getProxyUrl()}&url={self.phost}{path}&type=image'
def getlist(self,data):
videos = []
for i in data['data']:
videos.append({
'vod_id': i['id'],
'vod_name': i['title'],
'vod_pic': self.getimg(i.get('coverphoto_h' or i.get('coverphoto_v'))),
'style': {"type": "rect", "ratio": 1.33}})
return videos
def geticon(self, data, st='',style=None):
if style is None:style = {"type": "oval"}
videos = []
for i in data['data']:
videos.append({
'vod_id': f'{i["id"]}{st}',
'vod_name': i['name'],
'vod_pic': self.getimg(i.get('icon_path')),
'vod_tag': 'folder',
'style': style})
return videos
def getact(self, tid, pg, filter, extend):
if tid == 'actor' and pg=='1':
data = self.getdata(f'/api/v1/video/actor?current=1&pageSize=999&region_id={extend.get("region_id",self.aid)}&discover_page={pg}')
return self.geticon(data, '_act')
elif '_act' in tid:
data = self.getdata(f'/api/v1/video?current={pg}&pageSize=50&actor_ids={tid.split("_")[0]}&sortby={extend.get("sortby","on_shelf_at")}')
return self.getlist(data)
def gettag(self, tid, pg, filter, extend):
if '_tag' in tid:
tid=extend.get('tagtype',tid)
data=self.getdata(f'/api/v1/video/tag?current={pg}&pageSize=100&level=2&parent_id={tid.split("_")[0]}')
return self.geticon(data, '_stag',{"type": "rect", "ratio": 1.33})
elif '_stag' in tid:
data = self.getdata(f'/api/v1/video?current={pg}&pageSize=50&tag_ids={tid.split("_")[0]}&sortby={extend.get("sortby","on_shelf_at")}')
return self.getlist(data)
def getsx(self, tid, pg, filter, extend):
data=self.getdata(f'/api/v1/video?current={pg}&pageSize=20&producer_ids={tid.split("_")[0]}&region_ids={extend.get("region_ids","")}&sortby={extend.get("sortby","complex")}')
return self.getlist(data)
def getmake(self, tid, pg, filter, extend):
if pg=='1':
data=self.getdata('/api/v1/video/producer?current=1&pageSize=100&status=1')
return self.geticon(data, '_sx',{"type": "rect", "ratio": 1.33})
+243
View File
@@ -0,0 +1,243 @@
# coding=utf-8
# !/usr/bin/python
import sys
import requests
from bs4 import BeautifulSoup
import re
from base.spider import Spider
import json
sys.path.append('..')
xurl = "https://ee55ff.com/video.html"
headerx = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36'
}
class Spider(Spider):
global xurl
global headerx
def getName(self):
return "首页"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
# https://yaselulu.autos/?page_id=9
data = {"name": "John", "age": 31, "city": "New York"}
res = requests.post('https://spiderscloudcn2.51111666.com/getDataInit', headers=headerx, json=data)
res.encoding = "utf-8"
json_dict = json.loads(res.text)
menu0ListMap = json_dict["data"]["menu0ListMap"]
result = {}
result['class'] = []
for item in menu0ListMap:
if item['typeName'] == "传媒" or item['typeName'] == "视频" or item['typeName'] == "电影":
for item1 in item['menu2List']:
result['class'].append({'type_id': item1['typeId2'], 'type_name': item1['typeName2']})
return result
def homeVideoContent(self):
videos = []
try:
data = {
"command": "WEB_GET_INFO",
"pageNumber": 1,
"RecordsPage": 20,
"typeId": "24",
"typeMid": "1",
"languageType": "CN",
"content": ""
}
res = requests.post('https://spiderscloudcn2.51111666.com/forward', headers=headerx, json=data)
res.encoding = "utf-8"
json_dict = json.loads(res.text)
menu0ListMap = json_dict["data"]["resultList"]
for item in menu0ListMap:
name1 = item['vod_name'].replace("yy8ycom", "")
pattern = r'(.*?)-(.*?)-\d+\s+'
name = re.sub(pattern, '', name1)
id = item['id']
pic = item['vod_pic']
id2 = item['vod_server_id']
video = {
"vod_id": str(id) + '#' + str(id2),
"vod_name": name,
"vod_pic": pic,
"vod_remarks": ''
}
videos.append(video)
result = {'list': videos}
return result
except:
pass
def categoryContent(self, cid, pg, filter, ext):
result = {}
videos = []
if not pg:
pg = 1
# https://yaselulu.autos/?cat=3754&paged=1
videos = []
try:
data = {
"command": "WEB_GET_INFO",
"pageNumber": pg,
"RecordsPage": 20,
"typeId": cid,
"typeMid": "1",
"languageType": "CN",
"content": ""
}
res = requests.post('https://spiderscloudcn2.51111666.com/forward', headers=headerx, json=data)
res.encoding = "utf-8"
json_dict = json.loads(res.text)
menu0ListMap = json_dict["data"]["resultList"]
for item in menu0ListMap:
name1 = item['vod_name'].replace("yy8ycom", "")
pattern = r'(.*?)-(.*?)-\d+\s+'
name = re.sub(pattern, '', name1)
id = item['id']
pic = item['vod_pic']
id2 = item['vod_server_id']
video = {
"vod_id": str(id) + '#' + str(id2),
"vod_name": name,
"vod_pic": pic,
"vod_remarks": ''
}
videos.append(video)
except:
pass
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
l10 = "https://server10.vuljers.com"
l11 = 'https://server11.vuljers.com'
l12 = 'https://server12.xylhwdu.com'
l13 = 'https://server13.benpsbp.com'
l14 = 'https://server14.connectr.cn'
did = ids[0]
cid, svid = did.split("#")
videos = []
result = {}
data = {
"command": "WEB_GET_INFO_DETAIL",
"type_Mid": "1",
"id": cid,
"languageType": "CN"
}
res = requests.post('https://spiderscloudcn2.51111666.com/forward', headers=headerx, json=data)
res.encoding = "utf-8"
json_dict = json.loads(res.text)
if svid == "10":
purl = l10 + json_dict['data']["result"]["vod_url"]
elif svid == "11":
purl = l11 + json_dict['data']["result"]["vod_url"]
elif svid == "12":
purl = l12 + json_dict['data']["result"]["vod_url"]
elif svid == "13":
purl = l13 + json_dict['data']["result"]["vod_url"]
elif svid == "14":
purl = l14 + json_dict['data']["result"]["vod_url"]
else:
purl = json_dict['data']["result"]["vod_url"]
videos.append({
"vod_id": '',
"vod_name": '',
"vod_pic": "",
"type_name": "ぃぅおか🍬 คิดถึง",
"vod_year": "",
"vod_area": "",
"vod_remarks": "",
"vod_actor": "",
"vod_director": "",
"vod_content": "",
"vod_play_from": "直链播放",
"vod_play_url": purl
})
result['list'] = videos
return result
def playerContent(self, flag, id, vipFlags):
result = {}
result["parse"] = 0
result["playUrl"] = ''
result["url"] = id
result["header"] = headerx
return result
def searchContentPage(self, key, quick, page):
# https://yaselulu.autos/?s=%E6%88%91%E7%9A%84&paged=2
result = {}
videos = []
if not page:
page = 1
data = {
"command": "WEB_GET_INFO",
"pageNumber": page,
"RecordsPage": 20,
"typeId": "0",
"typeMid": "1",
"languageType": "CN",
"content": key,
"type": "1"
}
res = requests.post('https://spiderscloudcn2.51111666.com/forward', headers=headerx, json=data)
res.encoding = "utf-8"
json_dict = json.loads(res.text)
menu0ListMap = json_dict["data"]["resultList"]
for item in menu0ListMap:
name = item['vod_name'].replace("yy8ycom", "")
id = item['id']
pic = item['vod_pic']
id2 = item['vod_server_id']
video = {
"vod_id": str(id) + '#' + str(id2),
"vod_name": name,
"vod_pic": pic,
"vod_remarks": ''
}
videos.append(video)
result['list'] = videos
result['page'] = page
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def searchContent(self, key, quick):
return self.searchContentPage(key, quick, '1')
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None
+242
View File
@@ -0,0 +1,242 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import re
import sys
import threading
import time
from base64 import b64encode, b64decode
from urllib.parse import urlparse
import requests
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
'''
如果一直访问不了,手动访问导航页:https://a.hdys.top,替换:
self.host = 'https://xxx.xxx.xxx'
'''
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
'sec-ch-ua-platform': '"Android"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="130", "Google Chrome";v="130"',
'dnt': '1',
'sec-ch-ua-mobile': '?1',
'sec-fetch-site': 'same-origin',
'sec-fetch-mode': 'no-cors',
'sec-fetch-dest': 'script',
'accept-language': 'zh-CN,zh;q=0.9',
'priority': 'u=2',
}
try:self.proxies = json.loads(extend)
except:self.proxies = {}
self.hsot=self.gethost()
# self.hsot='https://hd.hdys2.com'
self.headers.update({'referer': f"{self.hsot}/"})
self.session.proxies.update(self.proxies)
self.session.headers.update(self.headers)
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
pheader={
'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
'sec-ch-ua-platform': '"Android"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="130", "Google Chrome";v="130"',
'dnt': '1',
'sec-ch-ua-mobile': '?1',
'origin': 'https://jx.8852.top',
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'accept-language': 'zh-CN,zh;q=0.9',
'priority': 'u=1, i',
}
def homeContent(self, filter):
data=self.getpq(self.session.get(self.hsot))
cdata=data('.stui-header__menu.type-slide li')
ldata=data('.stui-vodlist.clearfix li')
result = {}
classes = []
for k in cdata.items():
i=k('a').attr('href')
if i and 'type' in i:
classes.append({
'type_name': k.text(),
'type_id': re.search(r'\d+', i).group(0)
})
result['class'] = classes
result['list'] = self.getlist(ldata)
return result
def homeVideoContent(self):
return {'list':''}
def categoryContent(self, tid, pg, filter, extend):
data=self.getpq(self.session.get(f"{self.hsot}/vodshow/{tid}--------{pg}---.html"))
result = {}
result['list'] = self.getlist(data('.stui-vodlist.clearfix li'))
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data=self.getpq(self.session.get(f"{self.hsot}{ids[0]}"))
v=data('.stui-vodlist__box a')
vod = {
'vod_play_from': '花都影视',
'vod_play_url': f"{v('img').attr('alt')}${v.attr('href')}"
}
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(self.session.get(f"{self.hsot}/vodsearch/{key}----------{pg}---.html"))
return {'list':self.getlist(data('.stui-vodlist.clearfix li')),'page':pg}
def playerContent(self, flag, id, vipFlags):
try:
data=self.getpq(self.session.get(f"{self.hsot}{id}"))
jstr=data('.stui-player.col-pd script').eq(0).text()
jsdata=json.loads(jstr.split("=", maxsplit=1)[-1])
p,url=0,jsdata['url']
if '.m3u8' in url:url=self.proxy(url,'m3u8')
except Exception as e:
print(f"{str(e)}")
p,url=1,f"{self.hsot}{id}"
return {'parse': p, 'url': url, 'header': self.pheader}
def liveContent(self, url):
pass
def localProxy(self, param):
url = self.d64(param['url'])
if param.get('type') == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url,param['type'])
def gethost(self):
params = {
'v': '1',
}
self.headers.update({'referer': 'https://a.hdys.top/'})
response = self.session.get('https://a.hdys.top/assets/js/config.js',proxies=self.proxies, params=params, headers=self.headers)
return self.host_late(response.text.split(';')[:-4])
def getlist(self,data):
videos=[]
for i in data.items():
videos.append({
'vod_id': i('a').attr('href'),
'vod_name': i('img').attr('alt'),
'vod_pic': self.proxy(i('img').attr('data-original')),
'vod_year': i('.pic-tag-t').text(),
'vod_remarks': i('.pic-tag-b').text()
})
return videos
def getpq(self, data):
try:
return pq(data.text)
except Exception as e:
print(f"{str(e)}")
return pq(data.text.encode('utf-8'))
def host_late(self, url_list):
if isinstance(url_list, str):
urls = [u.strip() for u in url_list.split(',')]
else:
urls = url_list
if len(urls) <= 1:
return urls[0] if urls else ''
results = {}
threads = []
def test_host(url):
try:
url=re.findall(r'"([^"]*)"', url)[0]
start_time = time.time()
self.headers.update({'referer': f'{url}/'})
response = requests.head(url,proxies=self.proxies,headers=self.headers,timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
except Exception as e:
results[url] = float('inf')
for url in urls:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def m3Proxy(self, url):
ydata = requests.get(url, headers=self.pheader, proxies=self.proxies, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = requests.get(url, headers=self.pheader, proxies=self.proxies).content.decode('utf-8')
lines = data.strip().split('\n')
last_r = url[:url.rfind('/')]
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
for index, string in enumerate(lines):
if '#EXT' not in string:
if 'http' not in string:
domain=last_r if string.count('/') < 2 else durl
string = domain + ('' if string.startswith('/') else '/') + string
lines[index] = self.proxy(string, string.split('.')[-1].split('?')[0])
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def tsProxy(self, url,type):
h=self.pheader.copy()
if type=='img':h=self.headers.copy()
data = requests.get(url, headers=h, proxies=self.proxies, stream=True)
return [200, data.headers['Content-Type'], data.content]
def proxy(self, data, type='img'):
if data and len(self.proxies):return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
else:return data
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self,encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
+132
View File
@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import random
import string
import sys
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host,self.headers = self.getat()
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
data=self.fetch(f'{self.host}/vod/listing-0-0-0-0-0-0-0-0-0-0',headers=self.headers).json()
result = {}
classes = [{
'type_name': '全部',
'type_id': '0'
}]
filters = {}
ft=[]
filter_keys = ['orders', 'areas', 'years', 'definitions', 'durations', 'mosaics', 'langvoices']
for key in filter_keys:
if key in data['data']:
filter_item = {
'key': key,
'name': key,
'value': []
}
for item in data['data'][key]:
first_two = dict(list(item.items())[:2])
filter_item['value'].append({
'v': list(first_two.values())[0],
'n': list(first_two.values())[1]
})
ft.append(filter_item)
filters['0']=ft
for k in data['data']['categories']:
classes.append({
'type_name': k['catename'],
'type_id': k['cateid']
})
filters[k['cateid']]=ft
result['class'] = classes
result['filters'] =filters
result['list'] = self.getlist(data['data']['vodrows'])
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
data=self.fetch(f'{self.host}/vod/listing-{tid}-{extend.get("areas","0")}-{extend.get("years","0")}-1-{extend.get("definitions","0")}-{extend.get("durations","0")}-{extend.get("mosaics","0")}-{extend.get("langvoices","0")}-{extend.get("orders","0")}-{pg}',headers=self.headers).json()
result = {}
result['list'] = self.getlist(data['data']['vodrows'])
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data=self.fetch(f'{self.host}/vod/reqplay/{ids[0]}',headers=self.headers).json()
vod = {
'vod_play_from': data['errmsg'],
'vod_play_url': '#'.join([f"{i['hdtype']}${i['httpurl']}" for i in data['data']['httpurls']]),
}
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data=self.fetch(f'{self.host}/search?page={pg}&wd={key}',headers=self.headers).json()
return {'list':self.getlist(data['data']['vodrows']),'page':pg}
def playerContent(self, flag, id, vipFlags):
return {'parse': 0, 'url': id, 'header': {'User-Agent':'ExoPlayer'}}
def localProxy(self, param):
pass
def getlist(self,data):
vlist=[]
for i in data:
if i['isvip'] !='1':
vlist.append({
'vod_id': i['vodid'],
'vod_name': i['title'],
'vod_pic': i['coverpic'],
'vod_year': i.get('duration'),
'vod_remarks': i.get('catename'),
'style': {"type": "rect", "ratio": 1.33}
})
return vlist
def getat(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'x-auth-uuid': self.random_str(32),
'x-system': 'Android',
'x-version': '5.0.5',
'x-channel': 'xj2',
'x-requested-with': 'com.uyvzkv.pnjzdv',
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
}
host=f'https://{self.random_str(6)}.bjhpz.com'
data=self.fetch(f'{host}/init',headers=headers).json()
headers.update({'x-cookie-auth': data['data']['globalData'].get('xxx_api_auth')})
return host,headers
def random_str(self,length=16):
chars = string.ascii_lowercase + string.digits
return ''.join(random.choice(chars) for _ in range(length))
+260
View File
@@ -0,0 +1,260 @@
# coding=utf-8
# !/python
import sys
import requests
from bs4 import BeautifulSoup
import re
from base.spider import Spider
sys.path.append('..')
xurl = "https://kb11.adultporna-av1sim111.xyz"
headerx = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36'
}
class Spider(Spider):
global xurl
global headerx
def getName(self):
return "首页"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeVideoContent(self):
videos = []
try:
res = requests.get(xurl + '/show/30/', headers=headerx)
res.encoding = "utf-8"
res = res.text
doc = BeautifulSoup(res, "html.parser")
vodss = doc.find('ul', class_='row row-space8 row-m-space8')
vods = vodss.find_all('li')
for vod in vods:
name = vod.select_one('section a')['title']
id = vod.select_one('section a')['href']
remarks = vod.select_one('section a span small').text
pic = vod.select_one('section a img')['src']
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remarks
}
videos.append(video)
except:
pass
result = {'list': videos}
return result
def homeContent(self, filter):
res = requests.get(xurl + '/zzzz', headers=headerx)
res.encoding = "utf-8"
res = res.text
doc = BeautifulSoup(res, "html.parser")
result = {}
result['class'] = []
vodss = doc.find_all('dd')
result['class'].append({'type_id': '/t/86', 'type_name': '女优'})
result['class'].append({'type_id': '/t/141', 'type_name': '日本番號'})
result['class'].append({'type_id': '/t/1', 'type_name': '日本有码'})
result['class'].append({'type_id': '/t/5', 'type_name': '日本无码'})
result['class'].append({'type_id': '/t/32', 'type_name': '日本巨乳无码'})
result['class'].append({'type_id': '/t/34', 'type_name': '日本人妻无码'})
result['class'].append({'type_id': '/t/35', 'type_name': '日本制服无码'})
result['class'].append({'type_id': '/t/223', 'type_name': '日本乱伦无码'})
result['class'].append({'type_id': '/t/224', 'type_name': '日本强奸无码'})
result['class'].append({'type_id': '/t/36', 'type_name': '日本丝袜美腿'})
result['class'].append({'type_id': '/t/13', 'type_name': '日本中文字幕'})
result['class'].append({'type_id': '/t/53', 'type_name': '日本绝美少女'})
result['class'].append({'type_id': '/t/6', 'type_name': '日本强奸乱伦'})
result['class'].append({'type_id': '/t/7', 'type_name': '日本巨乳'})
result['class'].append({'type_id': '/t/9', 'type_name': '日本制服诱惑'})
result['class'].append({'type_id': '/t/11', 'type_name': '日本调教'})
result['class'].append({'type_id': '/t/58', 'type_name': '日本口爆'})
result['class'].append({'type_id': '/t/30', 'type_name': '欧美'})
result['class'].append({'type_id': '/t/164', 'type_name': '成人动漫'})
result['class'].append({'type_id': '/t/85', 'type_name': '伦理电影'})
result['class'].append({'type_id': '/t/2', 'type_name': '国产传媒'})
result['class'].append({'type_id': '/t/163', 'type_name': '国产视频'})
result['class'].append({'type_id': '/t/67', 'type_name': '国产空姐模特'})
result['class'].append({'type_id': '/t/69', 'type_name': '国产学生'})
result['class'].append({'type_id': '/t/70', 'type_name': '国产人妻熟女'})
result['class'].append({'type_id': '/t/71', 'type_name': '国产乱伦'})
result['class'].append({'type_id': '/t/72', 'type_name': '国产自慰'})
result['class'].append({'type_id': '/t/73', 'type_name': '国产野合车震'})
result['class'].append({'type_id': '/t/75', 'type_name': '国产名人'})
result['class'].append({'type_id': '/t/74', 'type_name': '国产OL'})
result['class'].append({'type_id': '/t/18', 'type_name': '国产剧情'})
result['class'].append({'type_id': '/t/19', 'type_name': '国产偷怕'})
result['class'].append({'type_id': '/t/76', 'type_name': '国产网曝'})
result['class'].append({'type_id': '/t/227', 'type_name': '综合传媒'})
result['class'].append({'type_id': '/t/38', 'type_name': '麻豆合集'})
result['class'].append({'type_id': '/t/109', 'type_name': '葫芦影业'})
result['class'].append({'type_id': '/t/111', 'type_name': '天美传媒'})
result['class'].append({'type_id': '/t/112', 'type_name': '果冻传媒'})
result['class'].append({'type_id': '/t/131', 'type_name': '91制片厂'})
result['class'].append({'type_id': '/t/113', 'type_name': '蜜桃传媒'})
result['class'].append({'type_id': '/t/114', 'type_name': '精东影业'})
result['class'].append({'type_id': '/t/115', 'type_name': '皇家华人'})
result['class'].append({'type_id': '/t/116', 'type_name': 'SWAG'})
result['class'].append({'type_id': '/t/120', 'type_name': '兔子先生'})
result['class'].append({'type_id': '/t/122', 'type_name': 'PsychoPornTW'})
result['class'].append({'type_id': '/t/124', 'type_name': '微啪 & 陌丽影像传媒'})
result['class'].append({'type_id': '/t/125', 'type_name': '大象传媒'})
result['class'].append({'type_id': '/t/126', 'type_name': '乌鸦传媒'})
result['class'].append({'type_id': '/t/141', 'type_name': '日本番号'})
result['class'].append({'type_id': '/t/225', 'type_name': '综合番号'})
result['class'].append({'type_id': '/t/142', 'type_name': '200GANA'})
result['class'].append({'type_id': '/t/146', 'type_name': '259LUXU'})
result['class'].append({'type_id': '/t/147', 'type_name': '261ARA'})
result['class'].append({'type_id': '/t/148', 'type_name': '277DCV'})
result['class'].append({'type_id': '/t/143', 'type_name': '300MIUM'})
result['class'].append({'type_id': '/t/149', 'type_name': '300MAAN'})
result['class'].append({'type_id': '/t/150', 'type_name': '300NTK'})
result['class'].append({'type_id': '/t/152', 'type_name': '328HMDN'})
result['class'].append({'type_id': '/t/154', 'type_name': '336KNB'})
result['class'].append({'type_id': '/t/155', 'type_name': '348NTR'})
result['class'].append({'type_id': '/t/156', 'type_name': '390JAC'})
result['class'].append({'type_id': '/t/158', 'type_name': '428SUKE'})
result['class'].append({'type_id': '/t/181', 'type_name': 'AARM'})
result['class'].append({'type_id': '/t/180', 'type_name': 'ADN'})
result['class'].append({'type_id': '/t/185', 'type_name': 'ATID'})
result['class'].append({'type_id': '/t/192', 'type_name': 'DFDM'})
result['class'].append({'type_id': '/t/194', 'type_name': 'DLDSS'})
for vod in vodss:
id = vod.find('a')['href'].rstrip("/")
name = vod.find('a').text
if not any(d['type_name'] == name for d in result['class']):
result['class'].append({'type_id': id, 'type_name': name})
return result
def categoryContent(self, cid, pg, filter, ext):
result = {}
videos = []
if pg == "" or pg == 1:
url = xurl + cid
else:
url = xurl + cid + '-' + str(pg)
res = requests.get(url=url, headers=headerx)
res.encoding = "utf-8"
res = res.text
doc = BeautifulSoup(res, "html.parser")
vodss = doc.find('ul', class_='row row-space8 row-m-space8')
vods = vodss.find_all('li')
for vod in vods:
name = vod.select_one('section a')['title']
id = vod.select_one('section a')['href']
remarks = vod.select_one('section a span small').text
pic = vod.select_one('section a img')['src']
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remarks
}
videos.append(video)
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
did = ids[0].replace("voddetail", "v")
videos = []
result = {}
res = requests.get(url=xurl + did, headers=headerx)
res.encoding = "utf-8"
res = res.text
source_match = re.search(r'"","url":"(.*?)"', res)
if source_match:
purl = source_match.group(1).replace("\\", "")
videos.append({
"vod_id": did,
"vod_name": '',
"vod_pic": "",
"type_name": "",
"vod_year": "",
"vod_area": "",
"vod_remarks": "",
"vod_actor": "",
"vod_director": "",
"vod_content": "",
"vod_play_from": '直链播放',
"vod_play_url": purl
})
result['list'] = videos
return result
def playerContent(self, flag, id, vipFlags):
result = {}
result["parse"] = 0
result["playUrl"] = ''
result["url"] = id
result["header"] = headerx
return result
def searchContent(self, key, quick):
return self.searchContentPage(key, quick, '1')
def searchContentPage(self, key, quick, page):
result = {}
header2 = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6'
}
res = requests.get(xurl + '/s/page/' + str(page) + '/wd/' + key, headers=header2)
res.encoding = "utf-8"
res = res.text
videos = []
doc = BeautifulSoup(res, "html.parser")
vodss = doc.find('ul', class_='row row-space8 row-m-space8')
vods = vodss.find_all('li')
for vod in vods:
name = vod.select_one('section a')['title']
id = vod.select_one('section a')['href']
remarks = vod.select_one('section a span small').text
pic = vod.select_one('section a img')['src']
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remarks
}
videos.append(video)
result['list'] = videos
result['page'] = page
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None