mirror of
				https://github.com/qist/tvbox.git
				synced 2025-10-29 03:02:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			302 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			302 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| # by @嗷呜
 | |
| import sys
 | |
| from urllib.parse import urlparse
 | |
| sys.path.append("..")
 | |
| import re
 | |
| import hashlib
 | |
| import hmac
 | |
| import random
 | |
| import string
 | |
| from Crypto.Util.Padding import unpad
 | |
| from concurrent.futures import ThreadPoolExecutor
 | |
| from Crypto.PublicKey import RSA
 | |
| from Crypto.Cipher import PKCS1_v1_5, AES
 | |
| from base64 import b64encode, b64decode
 | |
| import json
 | |
| import time
 | |
| from base.spider import Spider
 | |
| 
 | |
| 
 | |
| class Spider(Spider):
 | |
| 
 | |
|     def init(self, extend=""):
 | |
|         self.device = self.device_id()
 | |
|         self.host = self.gethost()
 | |
|         pass
 | |
| 
 | |
|     def getName(self):
 | |
|         pass
 | |
| 
 | |
|     def isVideoFormat(self, url):
 | |
|         pass
 | |
| 
 | |
|     def manualVideoCheck(self):
 | |
|         pass
 | |
| 
 | |
|     def action(self, action):
 | |
|         pass
 | |
| 
 | |
|     def destroy(self):
 | |
|         pass
 | |
| 
 | |
|     def homeContent(self, filter):
 | |
|         result = {}
 | |
|         filters = {}
 | |
|         classes = []
 | |
|         bba = self.url()
 | |
|         data = self.fetch(f"{self.host}/api/v1/app/config?pack={bba[0]}&signature={bba[1]}", headers=self.header()).text
 | |
|         data1 = self.aes(data)
 | |
|         dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
 | |
|               "sort": "排序"}
 | |
|         data1['data']['movie_screen']['sort'].pop(0)
 | |
|         for item in data1['data']['movie_screen']['sort']:
 | |
|             item['n'] = item.pop('name')
 | |
|             item['v'] = item.pop('value')
 | |
|         for item in data1['data']['movie_screen']['filter']:
 | |
|             has_non_empty_field = False
 | |
|             classes.append({"type_name": item["name"], "type_id": str(item["id"])})
 | |
|             for key in dy:
 | |
|                 if key in item and item[key]:
 | |
|                     has_non_empty_field = True
 | |
|                     break
 | |
|             if has_non_empty_field:
 | |
|                 filters[str(item["id"])] = []
 | |
|                 filters[str(item["id"])].append(
 | |
|                     {"key": 'sort', "name": '排序', "value": data1['data']['movie_screen']['sort']})
 | |
|                 for dkey in item:
 | |
|                     if dkey in dy and item[dkey]:
 | |
|                         item[dkey].pop(0)
 | |
|                         value_array = [
 | |
|                             {"n": value.strip(), "v": value.strip()}
 | |
|                             for value in item[dkey]
 | |
|                             if value.strip() != ""
 | |
|                         ]
 | |
|                         filters[str(item["id"])].append(
 | |
|                             {"key": dkey, "name": dy[dkey], "value": value_array}
 | |
|                         )
 | |
|         result["class"] = classes
 | |
|         result["filters"] = filters
 | |
|         return result
 | |
| 
 | |
|     def homeVideoContent(self):
 | |
|         bba = self.url()
 | |
|         url = f'{self.host}/api/v1/movie/index_recommend?pack={bba[0]}&signature={bba[1]}'
 | |
|         data = self.fetch(url, headers=self.header()).json()
 | |
|         videos = []
 | |
|         for item in data['data']:
 | |
|             if len(item['list']) > 0:
 | |
|                 for it in item['list']:
 | |
|                     try:
 | |
|                         videos.append(self.voides(it))
 | |
|                     except Exception as e:
 | |
|                         continue
 | |
|         result = {"list": videos}
 | |
|         return result
 | |
| 
 | |
|     def categoryContent(self, tid, pg, filter, extend):
 | |
|         body = {"type_id": tid, "sort": extend.get("sort", "by_default"), "class": extend.get("class", "类型"),
 | |
|                 "area": extend.get("area", "地区"), "year": extend.get("year", "年份"), "page": str(pg),
 | |
|                 "pageSize": "21"}
 | |
|         result = {}
 | |
|         list = []
 | |
|         bba = self.url(body)
 | |
|         url = f"{self.host}/api/v1/movie/screen/list?pack={bba[0]}&signature={bba[1]}"
 | |
|         data = self.fetch(url, headers=self.header()).json()['data']['list']
 | |
|         for item in data:
 | |
|             list.append(self.voides(item))
 | |
|         result["list"] = list
 | |
|         result["page"] = pg
 | |
|         result["pagecount"] = 9999
 | |
|         result["limit"] = 90
 | |
|         result["total"] = 999999
 | |
|         return result
 | |
| 
 | |
|     def detailContent(self, ids):
 | |
|         body = {"id": ids[0]}
 | |
|         bba = self.url(body)
 | |
|         url = f'{self.host}/api/v1/movie/detail?pack={bba[0]}&signature={bba[1]}'
 | |
|         data = self.fetch(url, headers=self.header()).json()['data']
 | |
|         video = {'vod_name': data.get('name'), 'type_name': data.get('type_name'), 'vod_year': data.get('year'),
 | |
|                  'vod_area': data.get('area'), 'vod_remarks': data.get('dynami'), 'vod_content': data.get('content')}
 | |
|         play = []
 | |
|         names = []
 | |
|         tasks = []
 | |
|         for itt in data["play_from"]:
 | |
|             name = itt["name"]
 | |
|             a = []
 | |
|             if len(itt["list"]) > 0:
 | |
|                 names.append(name)
 | |
|                 play.append(self.playeach(itt['list']))
 | |
|             else:
 | |
|                 tasks.append({"movie_id": ids[0], "from_code": itt["code"]})
 | |
|                 names.append(name)
 | |
|         if tasks:
 | |
|             with ThreadPoolExecutor(max_workers=len(tasks)) as executor:
 | |
|                 results = executor.map(self.playlist, tasks)
 | |
|                 for result in results:
 | |
|                     if result:
 | |
|                         play.append(result)
 | |
|                     else:
 | |
|                         play.append("")
 | |
|         video["vod_play_from"] = "$$$".join(names)
 | |
|         video["vod_play_url"] = "$$$".join(play)
 | |
|         result = {"list": [video]}
 | |
|         return result
 | |
| 
 | |
|     def searchContent(self, key, quick, pg=1):
 | |
|         body = {"keyword": key, "sort": "", "type_id": "0", "page": str(pg), "pageSize": "10",
 | |
|                 "res_type": "by_movie_name"}
 | |
|         bba = self.url(body)
 | |
|         url = f"{self.host}/api/v1/movie/search?pack={bba[0]}&signature={bba[1]}"
 | |
|         data = self.fetch(url, headers=self.header()).json()['data'].get('list')
 | |
|         videos = []
 | |
|         for it in data:
 | |
|             try:
 | |
|                 videos.append(self.voides(it))
 | |
|             except Exception as e:
 | |
|                 continue
 | |
|         result = {"list": videos, "page": pg}
 | |
|         return result
 | |
| 
 | |
|     def playerContent(self, flag, id, vipFlags):
 | |
|         url = id
 | |
|         if not re.search(r"\.m3u8|\.mp4", url):
 | |
|             try:
 | |
|                 data = json.loads(b64decode(id.encode('utf-8')).decode('utf-8'))
 | |
|                 bba = self.url(data)
 | |
|                 data2 = self.fetch(f"{self.host}/api/v1/movie_addr/parse_url?pack={bba[0]}&signature={bba[1]}",
 | |
|                                    headers=self.header()).json()['data']
 | |
|                 url = data2.get('play_url') or data2.get('download_url')
 | |
|             except Exception as e:
 | |
|                 pass
 | |
|         if re.search(r'\.jpg|\.png|\.jpeg', url):
 | |
|             url = self.Mproxy(url)
 | |
|         result = {}
 | |
|         result["parse"] = 0
 | |
|         result["url"] = url
 | |
|         result["header"] = {'user-agent': 'okhttp/4.9.2'}
 | |
|         return result
 | |
| 
 | |
|     def localProxy(self, param):
 | |
|         return self.Mlocal(param)
 | |
| 
 | |
|     def Mproxy(self, url):
 | |
|         return self.getProxyUrl() + "&url=" + b64encode(url.encode('utf-8')).decode('utf-8') + "&type=m3u8"
 | |
| 
 | |
|     def Mlocal(self, param,header=None):
 | |
|         url = self.d64(param["url"])
 | |
|         ydata = self.fetch(url, headers=header, allow_redirects=False)
 | |
|         data = ydata.content.decode('utf-8')
 | |
|         if ydata.headers.get('Location'):
 | |
|             url = ydata.headers['Location']
 | |
|             data = self.fetch(url, headers=header).content.decode('utf-8')
 | |
|         parsed_url = urlparse(url)
 | |
|         durl = parsed_url.scheme + "://" + parsed_url.netloc
 | |
|         lines = data.strip().split('\n')
 | |
|         for index, string in enumerate(lines):
 | |
|             if '#EXT' not in string and 'http' not in string:
 | |
|                 last_slash_index = string.rfind('/')
 | |
|                 lpath = string[:last_slash_index + 1]
 | |
|                 lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
 | |
|         data = '\n'.join(lines)
 | |
|         return [200, "application/vnd.apple.mpegur", data]
 | |
| 
 | |
|     def device_id(self):
 | |
|         characters = string.ascii_lowercase + string.digits
 | |
|         random_string = ''.join(random.choices(characters, k=32))
 | |
|         return random_string
 | |
| 
 | |
|     def gethost(self):
 | |
|         try:
 | |
|             url = 'https://dns.alidns.com/dns-query'
 | |
|             headers = {
 | |
|                 'User-Agent': 'okhttp/4.9.2',
 | |
|                 'Accept': 'application/dns-message'
 | |
|             }
 | |
|             params = {
 | |
|                 'dns': 'AAABAAABAAAAAAAACWJmbTExYXM5ZgdmdXFpeXVuAmNuAAAcAAE'
 | |
|             }
 | |
|             response = self.fetch(url, headers=headers, params=params)
 | |
|             host=self.parse_dns_name(response.content, 12)
 | |
|             return f"https://{host}"
 | |
|         except:
 | |
|             return "https://bfm11as9f.fuqiyun.cn"
 | |
| 
 | |
|     def parse_dns_name(self, data, offset):
 | |
|         parts = []
 | |
|         while True:
 | |
|             length = data[offset]
 | |
|             if length == 0:
 | |
|                 break
 | |
|             offset += 1
 | |
|             parts.append(data[offset:offset + length].decode('utf-8'))
 | |
|             offset += length
 | |
|         return '.'.join(parts)
 | |
| 
 | |
|     def header(self):
 | |
|         headers = {
 | |
|             'User-Agent': 'Android',
 | |
|             'Accept': 'application/prs.55App.v2+json',
 | |
|             'timestamp': str(int(time.time())),
 | |
|             'x-client-setting': '{"pure-mode":0}',
 | |
|             'x-client-uuid': '{"device_id":' + self.device + '}, "type":1,"brand":"Redmi", "model":"M2012K10C", "system_version":30, "sdk_version":"3.1.0.7"}',
 | |
|             'x-client-version': '3096 '
 | |
|         }
 | |
|         return headers
 | |
| 
 | |
|     def url(self, id=None):
 | |
|         if not id:
 | |
|             id = {}
 | |
|         id["timestamp"] = str(int(time.time()))
 | |
|         public_key = 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA02F/kPg5A2NX4qZ5JSns+bjhVMCC6JbTiTKpbgNgiXU+Kkorg6Dj76gS68gB8llhbUKCXjIdygnHPrxVHWfzmzisq9P9awmXBkCk74Skglx2LKHa/mNz9ivg6YzQ5pQFUEWS0DfomGBXVtqvBlOXMCRxp69oWaMsnfjnBV+0J7vHbXzUIkqBLdXSNfM9Ag5qdRDrJC3CqB65EJ3ARWVzZTTcXSdMW9i3qzEZPawPNPe5yPYbMZIoXLcrqvEZnRK1oak67/ihf7iwPJqdc+68ZYEmmdqwunOvRdjq89fQMVelmqcRD9RYe08v+xDxG9Co9z7hcXGTsUquMxkh29uNawIDAQAB'
 | |
|         encrypted_text = json.dumps(id)
 | |
|         public_key = RSA.import_key(b64decode(public_key))
 | |
|         cipher = PKCS1_v1_5.new(public_key)
 | |
|         encrypted_message = cipher.encrypt(encrypted_text.encode('utf-8'))
 | |
|         encrypted_message_base64 = b64encode(encrypted_message).decode('utf-8')
 | |
|         result = encrypted_message_base64.replace('+', '-').replace('/', '_').replace('=', '')
 | |
|         key = '635a580fcb5dc6e60caa39c31a7bde48'
 | |
|         sign = hmac.new(key.encode(), result.encode(), hashlib.md5).hexdigest()
 | |
|         return result, sign
 | |
| 
 | |
|     def playlist(self, body):
 | |
|         try:
 | |
|             bba = self.url(body)
 | |
|             url = f'{self.host}/api/v1/movie_addr/list?pack={bba[0]}&signature={bba[1]}'
 | |
|             data = self.fetch(url, headers=self.header()).json()['data']
 | |
|             return self.playeach(data)
 | |
|         except Exception:
 | |
|             return []
 | |
| 
 | |
|     def playeach(self, data):
 | |
|         play_urls = []
 | |
|         for it in data:
 | |
|             if re.search(r"mp4|m3u8", it["play_url"]):
 | |
|                 play_urls.append(f"{it['episode_name']}${it['play_url']}")
 | |
|             else:
 | |
|                 vd={"from_code": it['from_code'], "play_url": it['play_url'], "episode_id": it['episode_id'], "type": "play"}
 | |
|                 play_urls.append(
 | |
|                     f"{it['episode_name']}${b64encode(json.dumps(vd).encode('utf-8')).decode('utf-8')}"
 | |
|                 )
 | |
|         return '#'.join(play_urls)
 | |
| 
 | |
|     def voides(self, item):
 | |
|         if item['name'] or item['title']:
 | |
|             voide = {
 | |
|                 "vod_id": item.get('id') or item.get('click'),
 | |
|                 'vod_name': item.get('name') or item.get('title'),
 | |
|                 'vod_pic': item.get('cover') or item.get('image'),
 | |
|                 'vod_year': item.get('year') or item.get('label'),
 | |
|                 'vod_remarks': item.get('dynamic') or item.get('sub_title')
 | |
|             }
 | |
|             return voide
 | |
| 
 | |
|     def aes(self, text):
 | |
|         text = text.replace('-', '+').replace('_', '/') + '=='
 | |
|         key = b"e6d5de5fcc51f53d"
 | |
|         iv = b"2f13eef7dfc6c613"
 | |
|         cipher = AES.new(key, AES.MODE_CBC, iv)
 | |
|         pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size).decode("utf-8")
 | |
|         return json.loads(pt)
 |