Files
tvbox-qist/py/ITalkBBTV.py
qist 23a127de2c 1
2026-05-11 18:47:39 +08:00

225 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# by @Qist
"""
ITalkBB TV - 海外华人影视
"""
import json
import requests
from base.spider import Spider
class Spider(Spider):
def getName(self):
return 'ITalkBB TV'
def init(self, extend=""):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def __init__(self):
self.name = 'ITalkBB TV'
self.host = 'https://www.italkbbtv.com'
self.api = 'https://api.italkbbtv.com/classictv'
self.token = 'Bearer 9e370010ea624adfbc1c50a7622ec1ee'
self.header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0',
'Referer': 'https://www.italkbbtv.com/',
'Origin': 'https://www.italkbbtv.com',
'Authorization': self.token,
}
self.timeout = 20
self.cats = {
'drama': ('62ac4df64beefe53586474ff', '62c670dc1dca2d424404499c'),
'movie': ('62ac4e644beefe535864785c', '62ac4ef36e0b5a13ed291544'),
'variety': ('62ac4e1f4beefe5358647642', '62ce7417c7daaa4a5d3fea14'),
'cartoon': ('62ac4e6a4beefe53586478a5', '62ac4e6e4beefe53586478ca'),
'shorts': ('66a9e3e49f7e8378f2152312', '66b1d25cf2dde82c215f9b59'),
}
self.class_names = '电视剧&电影&综艺&动画&短剧&直播频道'.split('&')
self.class_urls = 'drama&movie&variety&cartoon&shorts&live'.split('&')
def _api_get(self, path, params=None):
try:
r = requests.get(self.api + path, headers=self.header, params=params, timeout=self.timeout)
if r.status_code == 200:
return r.json()
except:
pass
return None
def _get_live_stream(self, ch_id):
try:
r = requests.get('https://api.italkbbtv.com/playauth/v1/live', headers=self.header, params={'series_id': ch_id, 'hl': 'zh_CN'}, timeout=self.timeout)
if r.status_code == 200:
return r.json().get('manifest', '')
except:
pass
return ''
def _make_vod(self, s):
sid = s.get('id', '')
name = s.get('name', '')
images = s.get('images', {}) or {}
poster = (images.get('poster') or [''])[0] if images.get('poster') else ''
landscape = (images.get('landscape') or [''])[0] if images.get('landscape') else ''
pic = poster or landscape
ep_count = s.get('episode_count', 0) or 0
latest = s.get('latest_episode_shortname', '') or s.get('latest_episode_name', '')
if ep_count and ep_count > 1:
remarks = f'更新至{latest}' if latest else f'{ep_count}'
elif ep_count == 1:
remarks = '全1集'
else:
remarks = latest
return {'vod_id': sid, 'vod_name': name, 'vod_pic': pic, 'vod_remarks': remarks}
def homeContent(self, filter):
result = {'class': [], 'list': []}
for name, cid in zip(self.class_names, self.class_urls):
result['class'].append({'type_name': name, 'type_id': cid})
data = self._api_get('/vod/v1/series', {'root_id': self.cats['drama'][0], 'category_id': self.cats['drama'][1], 'page': 1, 'size': 24, 'hl': 'zh_CN'})
if data:
result['list'] = [self._make_vod(s) for s in data.get('series', [])]
return result
def homeVideoContent(self):
return {}
def categoryContent(self, tid, pg, filter, extend):
pg = int(pg)
result = {'list': [], 'page': pg, 'pagecount': 999, 'limit': 24, 'total': 0}
# 直播频道
if tid == 'live':
data = self._api_get('/live/v1/lives', {'root_id': '62ac4e2e4beefe535864769d', 'category_id': '62ac4e314beefe53586476c2', 'page': 1, 'size': 100, 'hl': 'zh_CN'})
if data:
vods = []
for ch in data.get('lives', []):
ch_id = ch.get('id', '')
ch_name = ch.get('name', '')
images = ch.get('images', {}) or {}
icon = (images.get('icon') or [''])[0] if images.get('icon') else ''
vods.append({'vod_id': 'live@' + ch_id, 'vod_name': ch_name, 'vod_pic': icon, 'vod_remarks': '直播'})
result['list'] = vods
result['total'] = len(vods)
result['pagecount'] = 1
return result
# 点播
cat = self.cats.get(tid)
if not cat:
return result
root_id, cat_id = cat
data = self._api_get('/vod/v1/series', {'root_id': root_id, 'category_id': cat_id, 'page': pg, 'size': 24, 'hl': 'zh_CN'})
if data:
result['list'] = [self._make_vod(s) for s in data.get('series', [])]
result['total'] = data.get('total', 0)
result['pagecount'] = (result['total'] + 23) // 24
return result
def detailContent(self, ids):
if not ids or not ids[0]:
return {'list': []}
vid = ids[0]
# 直播频道
if vid.startswith('live@'):
ch_id = vid.replace('live@', '')
stream = self._get_live_stream(ch_id)
return {'list': [{
'vod_id': vid, 'vod_name': ch_id, 'vod_pic': '',
'vod_play_from': 'ITalkBB直播',
'vod_play_url': f'直播${stream}' if stream else '',
}]}
# 点播
series_data = self._api_get(f'/vod/v1/series/{vid}', {'hl': 'zh_CN'})
s = (series_data or {}).get('series', {})
name = s.get('name', '')
desc = s.get('description', '')
images = s.get('images', {}) or {}
pic = (images.get('poster') or [''])[0] if images.get('poster') else ''
stars = s.get('stars', {}) or {}
actor = '/'.join([a.get('name', '') for a in (stars.get('actor') or [])[:5]])
director = '/'.join([d.get('name', '') for d in (stars.get('director') or [])])
eps_data = self._api_get(f'/vod/v1/series/{vid}/episodes', {'hl': 'zh_CN'})
eps = eps_data if isinstance(eps_data, list) else (eps_data or {}).get('episodes', [])
play_urls = []
for ep in eps:
ep_name = ep.get('shortname', '') or ep.get('name', '') or ep.get('id', '')[-4:]
ep_id = ep.get('id', '')
play_urls.append(f'{ep_name}$play@{vid}@{ep_id}')
if not play_urls:
play_urls.append(f'播放$play@{vid}@')
return {'list': [{
'vod_id': vid, 'vod_name': name, 'vod_pic': pic,
'vod_remarks': '', 'vod_year': '', 'type_name': '',
'vod_content': desc, 'vod_actor': actor, 'vod_director': director,
'vod_play_from': 'ITalkBB', 'vod_play_url': '#'.join(play_urls),
}]}
def searchContent(self, key, quick, pg="1"):
result = {'list': []}
for tid in self.cats:
cat = self.cats[tid]
data = self._api_get('/vod/v1/series', {'root_id': cat[0], 'category_id': cat[1], 'page': 1, 'size': 50, 'hl': 'zh_CN'})
if data:
for s in data.get('series', []):
if key in s.get('name', ''):
result['list'].append(self._make_vod(s))
seen = set()
unique = []
for v in result['list']:
if v['vod_id'] not in seen:
seen.add(v['vod_id'])
unique.append(v)
result['list'] = unique
return result
def playerContent(self, flag, id, vipFlags):
# 直播: 直接用m3u8地址
if id.startswith('live@'):
m3u8 = id.replace('live@', '')
if m3u8.startswith('http'):
return {'parse': 0, 'url': m3u8, 'header': {}, 'playUrl': ''}
stream = self._get_live_stream(m3u8)
return {'parse': 0, 'url': stream, 'header': {}, 'playUrl': ''}
# m3u8直链从detailContent传过来的
if id.startswith('http'):
return {'parse': 0, 'url': id, 'header': {}, 'playUrl': ''}
# 点播
parts = id.split('@')
route = parts[0] if len(parts) > 1 else 'play'
sid = parts[1] if len(parts) > 1 else ''
eid = parts[2] if len(parts) > 2 else ''
url = f'{self.host}/{route}/{sid}'
if eid:
url += f'?ep={eid}'
return {'parse': 1, 'url': url, 'header': self.header, 'playUrl': ''}
def fetch(self, url):
try:
resp = requests.get(url, headers=self.header, timeout=self.timeout)
resp.encoding = 'utf-8'
if resp.status_code == 200:
return resp.text
return None
except:
return None
def localProxy(self, param):
return None