This commit is contained in:
qist
2026-05-11 18:37:20 +08:00
parent 3e5ddc1a32
commit a49268bb5c

View File

@@ -3,7 +3,7 @@
"""
ITalkBB TV - 海外华人影视
"""
import re
import json
import requests
from base.spider import Spider
@@ -27,111 +27,103 @@ class Spider(Spider):
def __init__(self):
self.name = 'ITalkBB TV'
self.host = 'https://www.italkbbtv.com'
self.api = 'https://api.italkbbtv.com/classictv'
self.token = 'Bearer 9e370010ea624adfbc1c50a7622ec1ee'
self.header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0',
'Referer': 'https://www.italkbbtv.com/'
'Referer': 'https://www.italkbbtv.com/',
'Origin': 'https://www.italkbbtv.com',
'Authorization': self.token,
}
self.timeout = 20
self.class_names = '电视剧&直播频道&短剧&综艺&电影&动画'.split('&')
self.class_urls = 'drama/62c670dc1dca2d424404499c&live/62ac4e2e4beefe535864769d&shorts/66b1d25cf2dde82c215f9b59&variety/62ce7417c7daaa4a5d3fea14&movie/62ac4ef36e0b5a13ed291544&cartoon/62ac4e6e4beefe53586478ca'.split('&')
def parse_list_page(self, html):
if not html:
return []
cards = re.findall(r'<a[^>]*href="(/(?:play|shortsPlay)/[a-f0-9]+)"[^>]*>(.*?)</a>', html, re.DOTALL)
vods = []
seen = set()
for href, content in cards:
sid = href.split('/')[-1]
if sid in seen:
continue
seen.add(sid)
name = ''
title_match = re.search(r'title="([^"]+)"', content)
if title_match:
name = title_match.group(1).strip()
if not name:
info_match = re.search(r'info-title[^>]*>([^<]+)', content)
if info_match:
name = info_match.group(1).strip()
if not name:
alt_match = re.search(r'alt="[^"]*[《]([^》]+)[》]', content)
if alt_match:
name = alt_match.group(1).strip()
img_match = re.search(r'<img[^>]*src="([^"]+)"', content)
pic = img_match.group(1) if img_match else ''
remarks = ''
ep_match = re.search(r'(全\d+集|更新至\d+集)', content)
if ep_match:
remarks = ep_match.group(1)
if name:
route = 'shortsPlay' if '/shortsPlay/' in href else 'play'
vods.append({
'vod_id': route + '$' + sid,
'vod_name': name,
'vod_pic': pic,
'vod_remarks': remarks
})
return vods
self.cats = {
'drama': ('62ac4df64beefe53586474ff', '62c670dc1dca2d424404499c'),
'movie': ('62ac4e644beefe535864785c', '62ac4ef36e0b5a13ed291544'),
'variety': ('62ac4e1f4beefe5358647642', '62ce7417c7daaa4a5d3fea14'),
'cartoon': ('62ac4e6a4beefe53586478a5', '62ac4e6e4beefe53586478ca'),
'shorts': ('66a9e3e49f7e8378f2152312', '66b1d25cf2dde82c215f9b59'),
}
self.class_names = '电视剧&电影&综艺&动画&短剧&直播频道'.split('&')
self.class_urls = 'drama&movie&variety&cartoon&shorts&live'.split('&')
def parse_live_page(self, html):
if not html:
return []
m = re.search(r'window\.__NUXT__=([\s\S]*?);</script>', html)
if not m:
return []
js = m.group(1)
channels = re.findall(r'\{[^{}]*id:"([a-f0-9]+)"[^{}]*name:"([^"]+)"', js)
vods = []
seen = set()
for ch_id, name in channels:
if ch_id in seen:
continue
seen.add(ch_id)
vods.append({
'vod_id': 'live@' + ch_id + '@' + name,
'vod_name': name,
'vod_pic': '',
'vod_remarks': '直播'
})
return vods
def _api_get(self, path, params=None):
try:
r = requests.get(self.api + path, headers=self.header, params=params, timeout=self.timeout)
if r.status_code == 200:
return r.json()
except:
pass
return None
def _get_live_name(self, ch_id):
"""从直播页获取频道名称"""
html = self.fetch(self.host + '/live/62ac4e2e4beefe535864769d')
if not html:
return ch_id
m = re.search(r'window\.__NUXT__=([\s\S]*?);</script>', html)
if not m:
return ch_id
js = m.group(1)
match = re.search(r'\{[^{}]*id:"' + ch_id + r'"[^{}]*name:"([^"]+)"', js)
return match.group(1) if match else ch_id
def _get_live_stream(self, ch_id):
try:
r = requests.get('https://api.italkbbtv.com/playauth/v1/live', headers=self.header, params={'series_id': ch_id, 'hl': 'zh_CN'}, timeout=self.timeout)
if r.status_code == 200:
return r.json().get('manifest', '')
except:
pass
return ''
def _make_vod(self, s):
sid = s.get('id', '')
name = s.get('name', '')
images = s.get('images', {}) or {}
poster = (images.get('poster') or [''])[0] if images.get('poster') else ''
landscape = (images.get('landscape') or [''])[0] if images.get('landscape') else ''
pic = poster or landscape
ep_count = s.get('episode_count', 0) or 0
latest = s.get('latest_episode_shortname', '') or s.get('latest_episode_name', '')
if ep_count and ep_count > 1:
remarks = f'更新至{latest}' if latest else f'{ep_count}'
elif ep_count == 1:
remarks = '全1集'
else:
remarks = latest
return {'vod_id': sid, 'vod_name': name, 'vod_pic': pic, 'vod_remarks': remarks}
def homeContent(self, filter):
result = {'class': [], 'list': []}
for name, cid in zip(self.class_names, self.class_urls):
result['class'].append({'type_name': name, 'type_id': cid})
html = self.fetch(self.host + '/drama/62c670dc1dca2d424404499c')
result['list'] = self.parse_list_page(html)
data = self._api_get('/vod/v1/series', {'root_id': self.cats['drama'][0], 'category_id': self.cats['drama'][1], 'page': 1, 'size': 24, 'hl': 'zh_CN'})
if data:
result['list'] = [self._make_vod(s) for s in data.get('series', [])]
return result
def homeVideoContent(self):
return {}
def categoryContent(self, tid, pg, filter, extend):
result = {'list': [], 'page': int(pg), 'pagecount': 999, 'limit': 24, 'total': 999999}
alias = tid.split('/')[0]
url = self.host + '/' + tid
if int(pg) > 1:
url += '?page=' + str(pg)
html = self.fetch(url)
if alias == 'live':
result['list'] = self.parse_live_page(html)
result['total'] = len(result['list'])
result['pagecount'] = 1
else:
result['list'] = self.parse_list_page(html)
pg = int(pg)
result = {'list': [], 'page': pg, 'pagecount': 999, 'limit': 24, 'total': 0}
# 直播频道
if tid == 'live':
data = self._api_get('/live/v1/lives', {'root_id': '62ac4e2e4beefe535864769d', 'category_id': '62ac4e314beefe53586476c2', 'page': 1, 'size': 100, 'hl': 'zh_CN'})
if data:
vods = []
for ch in data.get('lives', []):
ch_id = ch.get('id', '')
ch_name = ch.get('name', '')
images = ch.get('images', {}) or {}
icon = (images.get('icon') or [''])[0] if images.get('icon') else ''
vods.append({'vod_id': 'live@' + ch_id, 'vod_name': ch_name, 'vod_pic': icon, 'vod_remarks': '直播'})
result['list'] = vods
result['total'] = len(vods)
result['pagecount'] = 1
return result
# 点播
cat = self.cats.get(tid)
if not cat:
return result
root_id, cat_id = cat
data = self._api_get('/vod/v1/series', {'root_id': root_id, 'category_id': cat_id, 'page': pg, 'size': 24, 'hl': 'zh_CN'})
if data:
result['list'] = [self._make_vod(s) for s in data.get('series', [])]
result['total'] = data.get('total', 0)
result['pagecount'] = (result['total'] + 23) // 24
return result
def detailContent(self, ids):
@@ -139,125 +131,81 @@ class Spider(Spider):
return {'list': []}
vid = ids[0]
# 直播频道: live@ch_id 或 live@ch_id@name
# 直播频道
if vid.startswith('live@'):
parts = vid.split('@', 2)
ch_id = parts[1] if len(parts) > 1 else ''
ch_name = parts[2] if len(parts) > 2 else self._get_live_name(ch_id)
ch_id = vid.replace('live@', '')
stream = self._get_live_stream(ch_id)
return {'list': [{
'vod_id': vid,
'vod_name': ch_name,
'vod_id': vid, 'vod_name': ch_id, 'vod_pic': '',
'vod_play_from': 'ITalkBB直播',
'vod_play_url': '直播$' + ch_id
'vod_play_url': f'直播${stream}' if stream else '',
}]}
parts = vid.split('$')
route = parts[0] if len(parts) > 1 else 'play'
sid = parts[1] if len(parts) > 1 else vid
html = self.fetch(self.host + '/' + route + '/' + sid)
if not html:
return {'list': []}
# 点播
series_data = self._api_get(f'/vod/v1/series/{vid}', {'hl': 'zh_CN'})
s = (series_data or {}).get('series', {})
name = s.get('name', '')
desc = s.get('description', '')
images = s.get('images', {}) or {}
pic = (images.get('poster') or [''])[0] if images.get('poster') else ''
stars = s.get('stars', {}) or {}
actor = '/'.join([a.get('name', '') for a in (stars.get('actor') or [])[:5]])
director = '/'.join([d.get('name', '') for d in (stars.get('director') or [])])
# 从<title>提取名称
name = ''
title_match = re.search(r'<title>([^<|]+)', html)
if title_match:
name = title_match.group(1).strip()
eps_data = self._api_get(f'/vod/v1/series/{vid}/episodes', {'hl': 'zh_CN'})
eps = eps_data if isinstance(eps_data, list) else (eps_data or {}).get('episodes', [])
m = re.search(r'window\.__NUXT__=([\s\S]*?);</script>', html)
if not m:
return {'list': [{'vod_id': vid, 'vod_name': name}]}
js = m.group(1)
# 提取SeriesInfo (用花括号匹配)
desc = ''
pic = ''
si_start = js.find('SeriesInfo:{')
if si_start >= 0:
depth = 0
i = si_start + len('SeriesInfo:')
while i < len(js):
if js[i] == '{': depth += 1
elif js[i] == '}':
depth -= 1
if depth == 0: break
i += 1
info = js[si_start:i+1]
dm = re.search(r'description:"([^"]*)"', info)
if dm: desc = dm.group(1)
pm = re.search(r'poster:\["([^"]*)"', info)
if pm: pic = pm.group(1).replace('\\u002F', '/').replace('\\u002f', '/')
# name可能是变量引用只有当是字符串时才用
nm = re.search(r',name:"([^"]*)"', info)
if nm and nm.group(1):
name = nm.group(1)
actor = '/'.join(re.findall(r'actor:\[\{[^}]*name:"([^"]+)"', js)[:5])
director = '/'.join(re.findall(r'director:\[\{[^}]*name:"([^"]+)"', js))
# 提取EpisodeList
eps = []
ep_start = js.find('EpisodeList:[')
if ep_start >= 0:
depth = 0
i = ep_start + len('EpisodeList:')
while i < len(js):
if js[i] == '[': depth += 1
elif js[i] == ']':
depth -= 1
if depth == 0: break
i += 1
ep_section = js[ep_start:i+1]
eps = re.findall(r'id:"([a-f0-9]{24})".*?name:"([^"]*?)".*?shortname:"([^"]*?)"', ep_section)
# 如果没匹配到shortname尝试只匹配id和name
if not eps:
eps = re.findall(r'id:"([a-f0-9]{24})".*?name:"([^"]*)"', ep_section)
eps = [(eid, nm, '') for eid, nm in eps]
tabs = 'ITalkBB短剧' if route == 'shortsPlay' else 'ITalkBB'
play_urls = []
for ep_id, ep_name, ep_short in eps:
display = ep_short or ep_name or ep_id[-4:]
play_urls.append(display + '$' + route + '@' + sid + '@' + ep_id)
for ep in eps:
ep_name = ep.get('shortname', '') or ep.get('name', '') or ep.get('id', '')[-4:]
ep_id = ep.get('id', '')
play_urls.append(f'{ep_name}$play@{vid}@{ep_id}')
# 电影如果没有剧集,构造单集播放
if not play_urls:
play_urls.append('播放$' + route + '@' + sid + '@')
play_urls.append(f'播放$play@{vid}@')
return {'list': [{
'vod_id': vid,
'vod_name': name,
'vod_pic': pic,
'vod_remarks': '',
'vod_year': '',
'type_name': '',
'vod_content': desc,
'vod_actor': actor,
'vod_director': director,
'vod_play_from': tabs,
'vod_play_url': '#'.join(play_urls)
'vod_id': vid, 'vod_name': name, 'vod_pic': pic,
'vod_remarks': '', 'vod_year': '', 'type_name': '',
'vod_content': desc, 'vod_actor': actor, 'vod_director': director,
'vod_play_from': 'ITalkBB', 'vod_play_url': '#'.join(play_urls),
}]}
def searchContent(self, key, quick, pg="1"):
html = self.fetch(self.host + '/?keyword=' + key)
vods = self.parse_list_page(html)
filtered = [v for v in vods if key in (v.get('vod_name', '') + v.get('vod_remarks', ''))]
return {'list': filtered}
result = {'list': []}
for tid in self.cats:
cat = self.cats[tid]
data = self._api_get('/vod/v1/series', {'root_id': cat[0], 'category_id': cat[1], 'page': 1, 'size': 50, 'hl': 'zh_CN'})
if data:
for s in data.get('series', []):
if key in s.get('name', ''):
result['list'].append(self._make_vod(s))
seen = set()
unique = []
for v in result['list']:
if v['vod_id'] not in seen:
seen.add(v['vod_id'])
unique.append(v)
result['list'] = unique
return result
def playerContent(self, flag, id, vipFlags):
# 直播: 直接用m3u8地址
if id.startswith('live@'):
parts = id.split('@', 2)
ch_id = parts[1] if len(parts) > 1 else ''
return {'parse': 1, 'url': self.host + '/live/' + ch_id,
'header': self.header, 'playUrl': ''}
m3u8 = id.replace('live@', '')
if m3u8.startswith('http'):
return {'parse': 0, 'url': m3u8, 'header': {}, 'playUrl': ''}
stream = self._get_live_stream(m3u8)
return {'parse': 0, 'url': stream, 'header': {}, 'playUrl': ''}
# 点播
parts = id.split('@')
route = parts[0] if len(parts) > 1 else 'play'
sid = parts[1] if len(parts) > 1 else ''
eid = parts[2] if len(parts) > 2 else ''
url = self.host + '/' + route + '/' + sid
url = f'{self.host}/{route}/{sid}'
if eid:
url += '?ep=' + eid
url += f'?ep={eid}'
return {'parse': 1, 'url': url, 'header': self.header, 'playUrl': ''}
def fetch(self, url):
@@ -267,8 +215,7 @@ class Spider(Spider):
if resp.status_code == 200:
return resp.text
return None
except Exception as e:
print(f"fetch error: {e}")
except:
return None
def localProxy(self, param):