更新线路

整体线路:v08.27
潇洒线路:v08.27(20)
This commit is contained in:
Liu
2025-08-28 10:27:36 +08:00
parent 459a67d55b
commit 1d042a8341
9 changed files with 1295 additions and 2140 deletions

View File

@@ -1,316 +0,0 @@
# coding = utf-8
# !/usr/bin/python
# 新时代青年 2025.06.25 getApp第三版
import re,sys,uuid,json,base64,urllib3
from Crypto.Cipher import AES
from base.spider import Spider
from Crypto.Util.Padding import pad,unpad
sys.path.append('..')
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class Spider(Spider):
xurl,key,iv,init_data,search_verify = '','','','',''
header = {'User-Agent': 'okhttp/3.14.9'}
def getName(self):
return "首页"
def init(self, extend=''):
ext = json.loads(extend.strip())
host = ext['host']
if not re.match(r'^https?:\/\/[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*(:\d+)?(\/)?$', host):
host = self.fetch(host, headers=self.header, timeout=10, verify=False).text.rstrip('/')
ua = ext.get('ua')
if ua:
self.header['User-Agent'] = ua
api = ext.get('api', '/api.php/getappapi')
if str(api) == '2':
api = '/api.php/qijiappapi'
self.xurl = host + api
self.key = ext['datakey']
self.iv = ext.get('dataiv', self.key)
res = self.fetch(self.xurl + '.index/initV119', headers=self.header, verify=False).json()
encrypted_data = res['data']
response = self.decrypt(encrypted_data)
init_data = json.loads(response)
self.init_data = init_data
self.search_verify = init_data['config'].get('system_search_verify_status', False)
def homeContent(self, filter):
kjson = self.init_data
result = {"class": [], "filters": {}}
for i in kjson['type_list']:
if not(i['type_name'] in {'全部', 'QQ', 'juo.one'} or '企鹅群' in i['type_name']):
result['class'].append({
"type_id": i['type_id'],
"type_name": i['type_name']
})
name_mapping = {'class': '类型', 'area': '地区', 'lang': '语言', 'year': '年份', 'sort': '排序'}
filter_items = []
for filter_type in i.get('filter_type_list', []):
filter_name = filter_type.get('name')
values = filter_type.get('list', [])
if not values:
continue
value_list = [{"n": value, "v": value} for value in values]
display_name = name_mapping.get(filter_name, filter_name)
key = 'by' if filter_name == 'sort' else filter_name
filter_items.append({
"key": key,
"name": display_name,
"value": value_list
})
type_id = i.get('type_id')
if filter_items:
result["filters"][str(type_id)] = filter_items
return result
def homeVideoContent(self):
videos = []
kjson = self.init_data
for i in kjson['type_list']:
for item in i['recommend_list']:
vod_id = item['vod_id']
name = item['vod_name']
pic = item['vod_pic']
remarks = item['vod_remarks']
video = {
"vod_id": vod_id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remarks
}
videos.append(video)
return {'list': videos}
def categoryContent(self, cid, pg, filter, ext):
videos = []
payload = {
'area': ext.get('area','全部'),
'year': ext.get('year','全部'),
'type_id': cid,
'page': str(pg),
'sort': ext.get('sort','最新'),
'lang': ext.get('lang','全部'),
'class': ext.get('class','全部')
}
url = f'{self.xurl}.index/typeFilterVodList'
res = self.post(url=url, headers=self.header, data=payload, verify=False).json()
encrypted_data = res['data']
kjson = self.decrypt(encrypted_data)
kjson1 = json.loads(kjson)
for i in kjson1['recommend_list']:
id = i['vod_id']
name = i['vod_name']
pic = i['vod_pic']
remarks = i['vod_remarks']
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remarks
}
videos.append(video)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
def detailContent(self, ids):
did = ids[0]
payload = {
'vod_id': did,
}
api_endpoints = ['vodDetail', 'vodDetail2']
for endpoint in api_endpoints:
url = f'{self.xurl}.index/{endpoint}'
response = self.post(url=url, headers=self.header, data=payload, verify=False)
if response.status_code == 200:
response_data = response.json()
encrypted_data = response_data['data']
kjson1 = self.decrypt(encrypted_data)
kjson = json.loads(kjson1)
break
videos = []
play_form = ''
play_url = ''
lineid = 1
name_count = {}
for line in kjson['vod_play_list']:
keywords = {'防走丢', '', '防失群', '官网'}
player_show = line['player_info']['show']
if any(keyword in player_show for keyword in keywords):
player_show = f'{lineid}线'
line['player_info']['show'] = player_show
count = name_count.get(player_show, 0) + 1
name_count[player_show] = count
if count > 1:
line['player_info']['show'] = f"{player_show}{count}"
play_form += line['player_info']['show'] + '$$$'
parse = line['player_info']['parse']
parse_type = line['player_info']['parse_type']
player_parse_type = line['player_info']['player_parse_type']
kurls = ""
for vod in line['urls']:
token = 'token+' + vod['token']
kurls += f"{str(vod['name'])}${parse},{vod['url']},{token},{player_parse_type},{parse_type}#"
kurls = kurls.rstrip('#')
play_url += kurls + '$$$'
lineid += 1
play_form = play_form.rstrip('$$$')
play_url = play_url.rstrip('$$$')
videos.append({
"vod_id": did,
"vod_name": kjson['vod']['vod_name'],
"vod_actor": kjson['vod']['vod_actor'].replace('演员', ''),
"vod_director": kjson['vod'].get('vod_director', '').replace('导演', ''),
"vod_content": kjson['vod']['vod_content'],
"vod_remarks": kjson['vod']['vod_remarks'],
"vod_year": kjson['vod']['vod_year'] + '',
"vod_area": kjson['vod']['vod_area'],
"vod_play_from": play_form,
"vod_play_url": play_url
})
return {'list': videos}
def playerContent(self, flag, id, vipFlags):
url = ''
aid = id.split(',')
uid = aid[0]
kurl = aid[1]
token = aid[2].replace('token+', '')
player_parse_type = aid[3]
parse_type = aid[4]
if parse_type == '0':
res = {"parse": 0, "url": kurl, "header": {'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 14; 23113RK12C Build/SKQ1.231004.001)'}}
elif parse_type == '2':
res = {"parse": 1, "url": uid+kurl, "header": {'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 14; 23113RK12C Build/SKQ1.231004.001)'}}
elif player_parse_type == '2':
response = self.fetch(url=f'{uid}{kurl}',headers=self.header, verify=False)
if response.status_code == 200:
kjson1 = response.json()
res = {"parse": 0, "url": kjson1['url'], "header": {'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 14; 23113RK12C Build/SKQ1.231004.001)'}}
else:
id1 = self.encrypt(kurl)
payload = {
'parse_api': uid,
'url': id1,
'player_parse_type': player_parse_type,
'token': token
}
url1 = f"{self.xurl}.index/vodParse"
response = self.post(url=url1, headers=self.header, data=payload, verify=False)
if response.status_code == 200:
response_data = response.json()
encrypted_data = response_data['data']
kjson = self.decrypt(encrypted_data)
kjson1 = json.loads(kjson)
kjson2 = kjson1['json']
kjson3 = json.loads(kjson2)
url = kjson3['url']
res = {"parse": 0, "playUrl": '', "url": url, "header": {'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 14; 23113RK12C Build/SKQ1.231004.001)'}}
return res
def searchContent(self, key, quick, pg="1"):
videos = []
if 'xiaohys.com' in self.xurl:
host = self.xurl.split('api.php')[0]
data = self.fetch(f'{host}index.php/ajax/suggest?mid=1&wd={key}').json()
for i in data['list']:
videos.append({
"vod_id": i['id'],
"vod_name": i['name'],
"vod_pic": i.get('pic')
})
else:
payload = {
'keywords': key,
'type_id': "0",
'page': str(pg)
}
if self.search_verify:
verifi = self.verification()
if verifi is None:
return {'list':[]}
payload['code'] = verifi['code']
payload['key'] = verifi['uuid']
url = f'{self.xurl}.index/searchList'
res = self.post(url=url, data=payload, headers=self.header, verify=False).json()
if not res.get('data'):
return {'list':[] ,'msg': res.get('msg')}
encrypted_data = res['data']
kjson = self.decrypt(encrypted_data)
kjson1 = json.loads(kjson)
for i in kjson1['search_list']:
id = i['vod_id']
name = i['vod_name']
pic = i['vod_pic']
remarks = i['vod_year'] + ' ' + i['vod_class']
videos.append({
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remarks
})
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def decrypt(self, encrypted_data_b64):
key_bytes = self.key.encode('utf-8')
iv_bytes = self.iv.encode('utf-8')
encrypted_data = base64.b64decode(encrypted_data_b64)
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
decrypted_padded = cipher.decrypt(encrypted_data)
decrypted = unpad(decrypted_padded, AES.block_size)
return decrypted.decode('utf-8')
def encrypt(self, sencrypted_data):
key_bytes = self.key.encode('utf-8')
iv_bytes = self.iv.encode('utf-8')
data_bytes = sencrypted_data.encode('utf-8')
padded_data = pad(data_bytes, AES.block_size)
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
encrypted_bytes = cipher.encrypt(padded_data)
encrypted_data_b64 = base64.b64encode(encrypted_bytes).decode('utf-8')
return encrypted_data_b64
def ocr(self, base64img):
dat2 = self.post("http://154.222.22.188:9898/ocr/b64/text", data=base64img, headers=self.header, verify=False).text
if dat2:
return dat2
else:
return None
def verification(self):
random_uuid = str(uuid.uuid4())
dat = self.fetch(f'{self.xurl}.verify/create?key={random_uuid}', headers=self.header, verify=False).content
base64_img = base64.b64encode(dat).decode('utf-8')
if not dat:
return None
code = self.ocr(base64_img)
if not code:
return None
code = self.replace_code(code)
if not (len(code) == 4 and code.isdigit()):
return None
return {'uuid': random_uuid, 'code': code}
def replace_code(self, text):
replacements = {'y': '9', '': '0', 'q': '0', 'u': '0', 'o': '0', '>': '1', 'd': '0', 'b': '8', '': '2','D': '0', '': '5'}
if len(text) == 3:
text = text.replace('566', '5066')
text = text.replace('066', '1666')
return ''.join(replacements.get(c, c) for c in text)