mirror of
https://github.com/qist/tvbox.git
synced 2026-06-21 09:03:22 +00:00
修改自动更新脚本
This commit is contained in:
14
.github/workflows/run.yml
vendored
14
.github/workflows/run.yml
vendored
@@ -30,14 +30,17 @@ jobs:
|
||||
run: |
|
||||
pwd
|
||||
cd tools/
|
||||
pip install demjson3 --break-system-packages
|
||||
pip install demjson3 pycryptodome requests --break-system-packages
|
||||
python tvbox.py https://9877.kstore.space/ONE/one.json xs.json dec
|
||||
python build_local.py xs.json output
|
||||
\cp -pdr output/* ../xiaosa/
|
||||
python xiao.py ../xiaosa/api.json dianshi.json
|
||||
python xiao.py ../xiaosa/api.json jsm.json
|
||||
python copy_xbpq.py dianshi_with_app_sites.json
|
||||
\cp -pdr dianshi_with_app_sites.json ../dianshi.json
|
||||
\cp -pdr jsm_with_app_sites.json ../jsm.json
|
||||
\cp -pdr ../xiaosa/spider.jar ../jar/spider.jar
|
||||
rm -rf dianshi_with_app_sites.json jsm_with_app_sites.json
|
||||
rm -rf dianshi_with_app_sites.json jsm_with_app_sites.json output xs.json
|
||||
cd ../
|
||||
shell: bash
|
||||
- name: Upload xiaosa artifacts
|
||||
@@ -65,13 +68,14 @@ jobs:
|
||||
run: |
|
||||
pwd
|
||||
cd tools/
|
||||
pip install demjson3 --break-system-packages
|
||||
python fty.py
|
||||
pip install demjson3 pycryptodome requests --break-system-packages
|
||||
python tvbox.py http://www.饭太硬.art/tv ty.json dec
|
||||
python fty.py ty.json
|
||||
\cp -pdr tvbox_cleaned.json ../fty.json
|
||||
\cp -pdr fan.txt ../jar/fan.txt
|
||||
git clone --depth=1 --recursive https://github.com/fantaiying7/EXT.git
|
||||
\cp -pdr EXT/* ../FTY/
|
||||
rm -rf tvbox_cleaned.json fan.txt EXT
|
||||
rm -rf tvbox_cleaned.json fan.txt EXT ty.json
|
||||
cd ../
|
||||
shell: bash
|
||||
- name: Upload fty artifacts
|
||||
|
||||
312
tools/build_local.py
Normal file
312
tools/build_local.py
Normal file
@@ -0,0 +1,312 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
TVBox 本地构建脚本
|
||||
将解密的 JSON 配置文件转换为本地资源版本
|
||||
|
||||
功能:
|
||||
1. 下载 spider 并命名为 spider.jar
|
||||
2. 下载 ext 中的 JSON 文件到 json 目录
|
||||
3. 下载 JS 文件到 js 目录
|
||||
4. 下载 api 中的 PY 文件到 py 目录
|
||||
5. 生成本地版本的 JSON 配置文件
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import hashlib
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
class TVBox本地构建器:
|
||||
def __init__(self, 输入文件, 输出目录="output"):
|
||||
self.输入文件 = 输入文件
|
||||
self.输出目录 = Path(输出目录)
|
||||
self.数据 = None
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||||
})
|
||||
|
||||
def 加载数据(self):
|
||||
"""加载 JSON 数据"""
|
||||
with open(self.输入文件, 'r', encoding='utf-8') as f:
|
||||
self.数据 = json.load(f)
|
||||
print(f"✓ 加载配置文件: {self.输入文件}")
|
||||
return True
|
||||
|
||||
def 创建目录结构(self):
|
||||
"""创建输出目录结构"""
|
||||
dirs = ['json', 'js', 'py']
|
||||
for d in dirs:
|
||||
(self.输出目录 / d).mkdir(parents=True, exist_ok=True)
|
||||
print(f"✓ 创建目录结构: {', '.join(dirs)}")
|
||||
|
||||
def 计算MD5(self, 文件路径):
|
||||
"""计算文件的 MD5"""
|
||||
md5 = hashlib.md5()
|
||||
with open(文件路径, 'rb') as f:
|
||||
while chunk := f.read(8192):
|
||||
md5.update(chunk)
|
||||
return md5.hexdigest()
|
||||
|
||||
def 替换代理域名(self, url):
|
||||
"""将代理域名替换为直接访问 raw.githubusercontent.com"""
|
||||
# 匹配各种 GitHub 代理域名
|
||||
proxy_patterns = [
|
||||
(r'https?://git\.yylx\.win/raw\.githubusercontent\.com/', 'https://raw.githubusercontent.com/'),
|
||||
(r'https?://gh-proxy\.com/https://raw\.githubusercontent\.com/', 'https://raw.githubusercontent.com/'),
|
||||
(r'https?://[^/]+/https://raw\.githubusercontent\.com/', 'https://raw.githubusercontent.com/'),
|
||||
]
|
||||
|
||||
for pattern, replacement in proxy_patterns:
|
||||
if re.match(pattern, url):
|
||||
new_url = re.sub(pattern, replacement, url)
|
||||
if new_url != url:
|
||||
print(f" 替换代理: {url[:60]}...")
|
||||
print(f" ->: {new_url[:60]}...")
|
||||
return new_url
|
||||
|
||||
return url
|
||||
|
||||
def 下载文件(self, url, 保存路径):
|
||||
"""下载文件"""
|
||||
try:
|
||||
# 替换代理域名为直接访问
|
||||
url = self.替换代理域名(url)
|
||||
|
||||
print(f" 下载: {url}")
|
||||
resp = self.session.get(url, timeout=30, allow_redirects=True)
|
||||
resp.raise_for_status()
|
||||
with open(保存路径, 'wb') as f:
|
||||
f.write(resp.content)
|
||||
print(f" ✓ 保存到: {保存路径}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f" ✗ 下载失败: {e}")
|
||||
return False
|
||||
|
||||
def 下载spider(self):
|
||||
"""下载 spider 文件"""
|
||||
spider_url = self.数据.get('spider', '')
|
||||
if not spider_url:
|
||||
print("✗ 未找到 spider 配置")
|
||||
return False
|
||||
|
||||
# 提取 URL(去掉 ;md5;xxx 部分)
|
||||
url = spider_url.split(';')[0]
|
||||
spider_path = self.输出目录 / 'spider.jar'
|
||||
|
||||
print(f"\n=== 下载 Spider ===")
|
||||
if self.下载文件(url, spider_path):
|
||||
md5 = self.计算MD5(spider_path)
|
||||
print(f" MD5: {md5}")
|
||||
self.数据['spider'] = f"./spider.jar;md5;{md5}"
|
||||
return True
|
||||
return False
|
||||
|
||||
def 是URL(self, 路径):
|
||||
"""判断是否为URL"""
|
||||
try:
|
||||
result = urlparse(路径)
|
||||
return all([result.scheme, result.netloc])
|
||||
except:
|
||||
return False
|
||||
|
||||
def 提取文件名(self, url):
|
||||
"""从 URL 提取文件名"""
|
||||
path = urlparse(url).path
|
||||
name = Path(path).name
|
||||
# 如果没有扩展名,根据内容添加
|
||||
if '.' not in name:
|
||||
name += '.txt'
|
||||
return name
|
||||
|
||||
def 下载ext文件(self):
|
||||
"""下载 ext 中的 JSON 文件"""
|
||||
print(f"\n=== 下载 ext 文件 ===")
|
||||
下载计数 = 0
|
||||
|
||||
for site in self.数据.get('sites', []):
|
||||
ext = site.get('ext', '')
|
||||
|
||||
# 处理字符串类型的 ext(可能是 URL)
|
||||
if isinstance(ext, str) and self.是URL(ext):
|
||||
文件名 = self.提取文件名(ext)
|
||||
# 根据扩展名决定保存目录
|
||||
if 文件名.endswith('.json'):
|
||||
保存路径 = self.输出目录 / 'json' / 文件名
|
||||
if self.下载文件(ext, 保存路径):
|
||||
site['ext'] = f"./json/{文件名}"
|
||||
下载计数 += 1
|
||||
elif 文件名.endswith('.js'):
|
||||
保存路径 = self.输出目录 / 'js' / 文件名
|
||||
if self.下载文件(ext, 保存路径):
|
||||
site['ext'] = f"./js/{文件名}"
|
||||
下载计数 += 1
|
||||
|
||||
# 处理字典类型的 ext
|
||||
elif isinstance(ext, dict):
|
||||
for key, value in ext.items():
|
||||
if isinstance(value, str) and self.是URL(value):
|
||||
文件名 = self.提取文件名(value)
|
||||
if 文件名.endswith('.json'):
|
||||
保存路径 = self.输出目录 / 'json' / 文件名
|
||||
if self.下载文件(value, 保存路径):
|
||||
ext[key] = f"./json/{文件名}"
|
||||
下载计数 += 1
|
||||
|
||||
print(f" 共下载 {下载计数} 个 ext 文件")
|
||||
return 下载计数
|
||||
|
||||
def 下载api文件(self):
|
||||
"""下载 api 中的 PY 文件(JS 文件保持原链接)"""
|
||||
print(f"\n=== 下载 api 文件 ===")
|
||||
下载计数 = 0
|
||||
|
||||
for site in self.数据.get('sites', []):
|
||||
api = site.get('api', '')
|
||||
|
||||
# 处理以 http 开头的 api(仅下载 PY 文件,JS 保持原链接)
|
||||
if isinstance(api, str) and api.startswith('http'):
|
||||
文件名 = self.提取文件名(api)
|
||||
if 文件名.endswith('.py'):
|
||||
保存路径 = self.输出目录 / 'py' / 文件名
|
||||
if self.下载文件(api, 保存路径):
|
||||
site['api'] = f"./py/{文件名}"
|
||||
下载计数 += 1
|
||||
elif 文件名.endswith('.js'):
|
||||
# JS 文件不下载,保持原链接
|
||||
print(f" 跳过 JS (保持原链接): {api}")
|
||||
|
||||
print(f" 共下载 {下载计数} 个 api 文件")
|
||||
return 下载计数
|
||||
|
||||
def 下载lives文件(self):
|
||||
"""下载 lives 中的文件"""
|
||||
print(f"\n=== 下载 lives 文件 ===")
|
||||
下载计数 = 0
|
||||
|
||||
for live in self.数据.get('lives', []):
|
||||
url = live.get('url', '')
|
||||
if isinstance(url, str) and self.是URL(url):
|
||||
文件名 = self.提取文件名(url)
|
||||
保存路径 = self.输出目录 / 'json' / 文件名
|
||||
if self.下载文件(url, 保存路径):
|
||||
live['url'] = f"./json/{文件名}"
|
||||
下载计数 += 1
|
||||
|
||||
print(f" 共下载 {下载计数} 个 lives 文件")
|
||||
return 下载计数
|
||||
|
||||
def 压缩flags(self, flags):
|
||||
"""压缩 flags 列表,去除带空格的重复项"""
|
||||
if not isinstance(flags, list):
|
||||
return flags
|
||||
|
||||
# 去除带空格的重复项(如 "优 酷" -> "优酷")
|
||||
compressed = []
|
||||
seen = set()
|
||||
|
||||
for flag in flags:
|
||||
# 去除空格
|
||||
no_space = flag.replace(' ', '')
|
||||
if no_space not in seen:
|
||||
seen.add(no_space)
|
||||
compressed.append(flag)
|
||||
|
||||
print(f"\n=== 压缩 flags ===")
|
||||
print(f" 原始: {len(flags)} 项")
|
||||
print(f" 压缩后: {len(compressed)} 项")
|
||||
|
||||
return compressed
|
||||
|
||||
def 保存配置(self):
|
||||
"""保存本地版本的配置文件"""
|
||||
输出文件 = self.输出目录 / 'api.json'
|
||||
|
||||
# 压缩 flags(去除带空格的重复项)
|
||||
if 'flags' in self.数据:
|
||||
self.数据['flags'] = self.压缩flags(self.数据['flags'])
|
||||
|
||||
# 使用 CompactJSONEncoder 格式化输出
|
||||
class CompactJSONEncoder(json.JSONEncoder):
|
||||
def iterencode(self, o, _one_shot=False):
|
||||
def _compact_list(lst, indent_level):
|
||||
pad = ' ' * indent_level
|
||||
if all(isinstance(i, dict) for i in lst):
|
||||
return '[\n' + ',\n'.join([pad + ' ' + json.dumps(i, ensure_ascii=False, separators=(',', ': ')) for i in lst]) + '\n' + pad + ']'
|
||||
return json.dumps(lst, ensure_ascii=False, indent=2)
|
||||
def _encode(obj, indent_level=0):
|
||||
pad = ' ' * indent_level
|
||||
if isinstance(obj, dict):
|
||||
lines = [f'"{k}": {_encode(v, indent_level+1)}' for k, v in obj.items()]
|
||||
return '{\n' + pad + ' ' + (',\n' + pad + ' ').join(lines) + '\n' + pad + '}'
|
||||
elif isinstance(obj, list):
|
||||
return _compact_list(obj, indent_level)
|
||||
return json.dumps(obj, ensure_ascii=False)
|
||||
return iter([_encode(o)])
|
||||
|
||||
with open(输出文件, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.数据, f, ensure_ascii=False, indent=2, cls=CompactJSONEncoder)
|
||||
|
||||
print(f"\n✓ 保存配置文件: {输出文件}")
|
||||
return True
|
||||
|
||||
def 构建(self):
|
||||
"""执行构建流程"""
|
||||
print("=" * 50)
|
||||
print("TVBox 本地构建工具")
|
||||
print("=" * 50)
|
||||
|
||||
# 加载数据
|
||||
if not self.加载数据():
|
||||
return False
|
||||
|
||||
# 创建目录结构
|
||||
self.创建目录结构()
|
||||
|
||||
# 下载各类资源
|
||||
self.下载spider()
|
||||
self.下载ext文件()
|
||||
self.下载api文件()
|
||||
self.下载lives文件()
|
||||
|
||||
# 保存配置
|
||||
self.保存配置()
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("✓ 构建完成!")
|
||||
print(f"输出目录: {self.输出目录}")
|
||||
print("=" * 50)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("用法:")
|
||||
print(" python build_local.py 输入文件 [输出目录]")
|
||||
print("")
|
||||
print("示例:")
|
||||
print(" python build_local.py api.json")
|
||||
print(" python build_local.py api.json output")
|
||||
print(" python build_local.py https://example.com/api.json output")
|
||||
print("")
|
||||
print("说明:")
|
||||
print(" 将解密的 JSON 配置文件转换为本地资源版本")
|
||||
print(" 自动下载 spider、ext、api、lives 等资源到本地")
|
||||
sys.exit(1)
|
||||
|
||||
输入文件 = sys.argv[1]
|
||||
输出目录 = sys.argv[2] if len(sys.argv) > 2 else "output"
|
||||
|
||||
构建器 = TVBox本地构建器(输入文件, 输出目录)
|
||||
构建器.构建()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
69
tools/fty.py
69
tools/fty.py
@@ -3,6 +3,9 @@ import re
|
||||
import demjson3 as demjson
|
||||
import json
|
||||
import hashlib
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# 创建全局 session 并设置浏览器 UA
|
||||
session = requests.Session()
|
||||
@@ -19,6 +22,53 @@ def fetch_raw_json():
|
||||
resp.encoding = 'utf-8'
|
||||
return resp.text
|
||||
|
||||
# 读取本地 JSON 文件
|
||||
def read_local_json(file_path):
|
||||
"""读取本地JSON文件,支持带注释的JSON和图片中的base64数据"""
|
||||
with open(file_path, 'rb') as f:
|
||||
raw_content = f.read()
|
||||
|
||||
# 检查是否为图片文件
|
||||
image_headers = [
|
||||
b'\xff\xd8\xff\xe0', # JPEG
|
||||
b'\xff\xd8\xff\xe1', # JPEG
|
||||
b'\x89PNG', # PNG
|
||||
b'GIF87a', # GIF
|
||||
b'GIF89a', # GIF
|
||||
b'BM', # BMP
|
||||
]
|
||||
is_image = False
|
||||
for header in image_headers:
|
||||
if raw_content.startswith(header):
|
||||
is_image = True
|
||||
break
|
||||
|
||||
# 如果是图片,尝试提取嵌入的base64数据
|
||||
if is_image:
|
||||
import base64
|
||||
print(f" 检测到图片文件,尝试提取嵌入的base64数据...")
|
||||
try:
|
||||
text_content = raw_content.decode('latin-1')
|
||||
# 查找长base64字符串(至少50个字符)
|
||||
base64_pattern = r'[A-Za-z0-9+/=]{50,}'
|
||||
match = re.search(base64_pattern, text_content)
|
||||
if match:
|
||||
base64_str = match.group(0)
|
||||
print(f" 找到base64数据,长度: {len(base64_str)}")
|
||||
decoded = base64.b64decode(base64_str)
|
||||
content = decoded.decode('utf-8')
|
||||
# 移除JavaScript风格的注释
|
||||
content = re.sub(r'^//.*$', '', content, flags=re.MULTILINE).strip()
|
||||
return content
|
||||
except Exception as e:
|
||||
print(f" 提取图片数据失败: {e}")
|
||||
|
||||
# 普通文本文件处理
|
||||
content = raw_content.decode('utf-8')
|
||||
# 移除JavaScript风格的注释
|
||||
content = re.sub(r'^//.*$', '', content, flags=re.MULTILINE).strip()
|
||||
return content
|
||||
|
||||
# 下载 spider 文件
|
||||
def extract_and_save_spider(json_text):
|
||||
match = re.search(r'"spider"\s*:\s*"([^"]+)"', json_text)
|
||||
@@ -91,7 +141,26 @@ def save_json(data, filename="tvbox_cleaned.json"):
|
||||
# 主流程
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# 判断输入源
|
||||
if len(sys.argv) > 1:
|
||||
input_path = sys.argv[1]
|
||||
if input_path in ['-h', '--help']:
|
||||
print("用法:")
|
||||
print(" python fty.py [输入文件]")
|
||||
print(" python fty.py # 从URL获取数据")
|
||||
print(" python fty.py input.json # 从本地JSON文件读取")
|
||||
print(" python fty.py fff.json # 从本地图片文件读取(自动提取base64数据)")
|
||||
sys.exit(0)
|
||||
if os.path.isfile(input_path):
|
||||
print(f"📂 读取本地文件: {input_path}")
|
||||
raw_text = read_local_json(input_path)
|
||||
else:
|
||||
print(f"❌ 文件不存在: {input_path}")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("🌐 从URL获取数据...")
|
||||
raw_text = fetch_raw_json()
|
||||
|
||||
extract_and_save_spider(raw_text)
|
||||
data = clean_data(raw_text)
|
||||
# 更新 spider 为本地 fan.txt + 最新 MD5
|
||||
|
||||
385
tools/tvbox.py
Normal file
385
tools/tvbox.py
Normal file
@@ -0,0 +1,385 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import gzip
|
||||
import base64
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse, quote
|
||||
from urllib.request import urlopen, Request
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
|
||||
class 文件加解密器:
|
||||
def __init__(self, key="1234567890123", iv="1234567890123"):
|
||||
self.key = key
|
||||
self.iv = iv
|
||||
|
||||
def 是URL(self, 路径):
|
||||
"""判断是否为URL"""
|
||||
try:
|
||||
result = urlparse(路径)
|
||||
return all([result.scheme, result.netloc])
|
||||
except:
|
||||
return False
|
||||
|
||||
def 从URL获取内容(self, url, 处理gzip=True, 处理base64=True):
|
||||
"""从URL获取内容,支持gzip和base64解码"""
|
||||
import re
|
||||
try:
|
||||
print(f"正在从URL获取: {url}")
|
||||
# 处理中文域名:对域名部分进行IDNA编码
|
||||
parsed = urlparse(url)
|
||||
if parsed.netloc:
|
||||
# 尝试将域名转换为IDNA编码
|
||||
try:
|
||||
idna_domain = parsed.netloc.encode('idna').decode('ascii')
|
||||
url = parsed._replace(netloc=idna_domain).geturl()
|
||||
except:
|
||||
# 如果IDNA编码失败,尝试对非ASCII字符进行percent编码
|
||||
encoded_netloc = quote(parsed.netloc, safe='')
|
||||
url = parsed._replace(netloc=encoded_netloc).geturl()
|
||||
|
||||
req = Request(url, headers={
|
||||
'User-Agent': 'okhttp/3.12.0',
|
||||
'Accept-Encoding': 'gzip, deflate'
|
||||
})
|
||||
with urlopen(req, timeout=30) as response:
|
||||
raw_content = response.read()
|
||||
|
||||
# 检查是否为图片或二进制文件
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
if content_type.startswith('image/') or content_type.startswith('video/') or content_type.startswith('audio/'):
|
||||
print(f" 检测到{content_type}类型,尝试提取嵌入的数据...")
|
||||
|
||||
# 检查文件头是否为常见图片格式
|
||||
image_headers = [
|
||||
b'\xff\xd8\xff\xe0', # JPEG
|
||||
b'\xff\xd8\xff\xe1', # JPEG
|
||||
b'\x89PNG', # PNG
|
||||
b'GIF87a', # GIF
|
||||
b'GIF89a', # GIF
|
||||
b'BM', # BMP
|
||||
]
|
||||
is_image = False
|
||||
for header in image_headers:
|
||||
if raw_content.startswith(header):
|
||||
is_image = True
|
||||
break
|
||||
|
||||
# 如果是图片,尝试提取嵌入的base64数据
|
||||
if is_image:
|
||||
print(f" 检测到图片文件,尝试提取嵌入的base64数据...")
|
||||
try:
|
||||
# 转为文本查找base64数据
|
||||
text_content = raw_content.decode('latin-1')
|
||||
# 查找长base64字符串(至少50个字符)
|
||||
base64_pattern = r'[A-Za-z0-9+/=]{50,}'
|
||||
match = re.search(base64_pattern, text_content)
|
||||
if match:
|
||||
base64_str = match.group(0)
|
||||
print(f" 找到base64数据,长度: {len(base64_str)}")
|
||||
decoded = base64.b64decode(base64_str)
|
||||
# 将解码后的内容转为字符串
|
||||
for encoding in ['utf-8', 'gbk', 'gb2312', 'latin-1']:
|
||||
try:
|
||||
content = decoded.decode(encoding)
|
||||
print(f"✓ 成功提取图片中的base64数据 ({len(content)} 字节)")
|
||||
return content
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f" 提取图片数据失败: {e}")
|
||||
|
||||
# 处理gzip解压
|
||||
if 处理gzip:
|
||||
try:
|
||||
raw_content = gzip.decompress(raw_content)
|
||||
print(f" ✓ gzip解压成功")
|
||||
except:
|
||||
# 不是gzip格式,继续处理
|
||||
pass
|
||||
|
||||
# 尝试多种编码解码
|
||||
content = None
|
||||
for encoding in ['utf-8', 'gbk', 'gb2312', 'latin-1']:
|
||||
try:
|
||||
content = raw_content.decode(encoding)
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
if content is None:
|
||||
content = raw_content.decode('latin-1')
|
||||
|
||||
# 处理base64解码(仅当内容看起来像base64且不是hex数据时)
|
||||
if 处理base64:
|
||||
# 检查是否为hex数据(只包含0-9, a-f, A-F)
|
||||
stripped = content.strip()
|
||||
is_hex = re.match(r'^[0-9a-fA-F]+$', stripped) is not None
|
||||
|
||||
if not is_hex:
|
||||
try:
|
||||
# 尝试base64解码
|
||||
decoded = base64.b64decode(content)
|
||||
# 解码后再次尝试gzip解压
|
||||
try:
|
||||
decoded = gzip.decompress(decoded)
|
||||
print(f" ✓ base64+gzip解码成功")
|
||||
except:
|
||||
print(f" ✓ base64解码成功")
|
||||
# 将解码后的内容转为字符串
|
||||
for encoding in ['utf-8', 'gbk', 'gb2312', 'latin-1']:
|
||||
try:
|
||||
content = decoded.decode(encoding)
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
except:
|
||||
# 不是base64格式,继续使用原内容
|
||||
pass
|
||||
|
||||
print(f"✓ 成功获取URL内容 ({len(content)} 字节)")
|
||||
return content
|
||||
except Exception as e:
|
||||
print(f"✗ 获取URL内容失败: {e}")
|
||||
return None
|
||||
|
||||
def 保存到临时文件(self, 内容, 后缀=".json"):
|
||||
"""将内容保存到临时文件"""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix=后缀, delete=False, encoding='utf-8') as f:
|
||||
f.write(内容)
|
||||
return f.name
|
||||
|
||||
def 字符串转hex(self, 文本):
|
||||
return 文本.encode().hex()
|
||||
|
||||
def hex转字符串(self, hex文本):
|
||||
return bytes.fromhex(hex文本).decode()
|
||||
|
||||
def 加密文件(self, 输入文件, 输出文件):
|
||||
"""加密单个文件(支持本地文件或URL)"""
|
||||
try:
|
||||
print(f"加密: {输入文件} -> {输出文件}")
|
||||
|
||||
# 判断输入是URL还是本地文件
|
||||
if self.是URL(输入文件):
|
||||
内容 = self.从URL获取内容(输入文件)
|
||||
if 内容 is None:
|
||||
return False
|
||||
data = json.loads(内容)
|
||||
else:
|
||||
with open(输入文件, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# AES加密
|
||||
填充key = self.key.ljust(16, '0').encode()
|
||||
填充iv = self.iv.ljust(16, '0').encode()
|
||||
cipher = AES.new(填充key, AES.MODE_CBC, 填充iv)
|
||||
encrypted = cipher.encrypt(pad(json.dumps(data, ensure_ascii=False).encode(), 16))
|
||||
|
||||
# 数据包装并转Hex
|
||||
header_hex = self.字符串转hex(f"$#{self.key}#$")
|
||||
cipher_hex = encrypted.hex()
|
||||
iv_hex = self.字符串转hex(self.iv)
|
||||
final_hex = header_hex + cipher_hex + iv_hex
|
||||
|
||||
with open(输出文件, 'w') as f:
|
||||
f.write(final_hex)
|
||||
|
||||
print(f"✓ 加密成功: {输入文件}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ 加密失败 {输入文件}: {e}")
|
||||
return False
|
||||
|
||||
def 解密文件(self, 输入文件, 输出文件):
|
||||
"""解密单个文件(支持本地文件或URL)"""
|
||||
try:
|
||||
print(f"解密: {输入文件} -> {输出文件}")
|
||||
|
||||
# 判断输入是URL还是本地文件
|
||||
if self.是URL(输入文件):
|
||||
内容 = self.从URL获取内容(输入文件)
|
||||
if 内容 is None:
|
||||
return False
|
||||
hex数据 = 内容.strip()
|
||||
else:
|
||||
with open(输入文件, 'r', encoding='utf-8') as f:
|
||||
hex数据 = f.read().strip()
|
||||
|
||||
# 移除JavaScript风格的注释(// 开头的行)
|
||||
import re
|
||||
hex数据 = re.sub(r'^//.*$', '', hex数据, flags=re.MULTILINE).strip()
|
||||
|
||||
# 尝试直接解析为JSON(如果已经是JSON格式)
|
||||
try:
|
||||
data = json.loads(hex数据)
|
||||
with open(输出文件, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
print(f"✓ 直接保存JSON成功: {输入文件}")
|
||||
return True
|
||||
except json.JSONDecodeError:
|
||||
# 不是JSON格式,尝试解密
|
||||
pass
|
||||
|
||||
# 检查是否为有效的hex数据
|
||||
import re
|
||||
if not re.match(r'^[0-9a-fA-F]+$', hex数据):
|
||||
raise ValueError("内容既不是有效的JSON也不是有效的hex加密数据")
|
||||
|
||||
# 解析Hex数据
|
||||
header_marker = self.字符串转hex("#$")
|
||||
header_pos = hex数据.find(header_marker)
|
||||
if header_pos == -1:
|
||||
raise ValueError("文件格式错误,未找到有效的头部标记")
|
||||
|
||||
header_end = header_pos + 4
|
||||
header_hex = hex数据[:header_end]
|
||||
iv_hex = hex数据[-26:]
|
||||
cipher_hex = hex数据[header_end:-26]
|
||||
|
||||
# 提取key和iv
|
||||
real_key = self.hex转字符串(header_hex)[2:-2]
|
||||
real_iv = self.hex转字符串(iv_hex)
|
||||
|
||||
# AES解密
|
||||
填充key = real_key.ljust(16, '0').encode()
|
||||
填充iv = real_iv.ljust(16, '0').encode()
|
||||
cipher = AES.new(填充key, AES.MODE_CBC, 填充iv)
|
||||
decrypted = unpad(cipher.decrypt(bytes.fromhex(cipher_hex)), 16)
|
||||
|
||||
with open(输出文件, 'w', encoding='utf-8') as f:
|
||||
json.dump(json.loads(decrypted.decode()), f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"✓ 解密成功: {输入文件}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ 解密失败 {输入文件}: {e}")
|
||||
return False
|
||||
|
||||
def 获取URL内容(self, url, 输出文件):
|
||||
"""直接获取URL内容并保存(不进行加密/解密处理)"""
|
||||
try:
|
||||
print(f"获取URL内容: {url} -> {输出文件}")
|
||||
|
||||
内容 = self.从URL获取内容(url)
|
||||
if 内容 is None:
|
||||
return False
|
||||
|
||||
# 移除JavaScript风格的注释(// 开头的行)
|
||||
import re
|
||||
内容 = re.sub(r'^//.*$', '', 内容, flags=re.MULTILINE).strip()
|
||||
|
||||
# 尝试解析为JSON并格式化保存
|
||||
try:
|
||||
data = json.loads(内容)
|
||||
with open(输出文件, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
print(f"✓ 获取并保存JSON成功: {输出文件}")
|
||||
return True
|
||||
except json.JSONDecodeError:
|
||||
# 不是JSON格式,直接保存原始内容
|
||||
with open(输出文件, 'w', encoding='utf-8') as f:
|
||||
f.write(内容)
|
||||
print(f"✓ 获取并保存内容成功: {输出文件}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ 获取URL内容失败 {url}: {e}")
|
||||
return False
|
||||
|
||||
def 批量处理(self, 输入目录, 输出目录, 模式="enc"):
|
||||
"""批量处理目录中的文件或URL,保持原文件名"""
|
||||
输出路径 = Path(输出目录)
|
||||
|
||||
文件列表 = []
|
||||
|
||||
# 判断输入是否为URL
|
||||
if self.是URL(输入目录):
|
||||
文件列表 = [输入目录]
|
||||
else:
|
||||
输入路径 = Path(输入目录)
|
||||
if 输入路径.is_file():
|
||||
文件列表 = [输入路径]
|
||||
else:
|
||||
# 只有当输入是目录时,才创建输出目录
|
||||
输出路径.mkdir(parents=True, exist_ok=True)
|
||||
文件列表 = [f for f in 输入路径.iterdir() if f.is_file()]
|
||||
|
||||
if not 文件列表:
|
||||
print(f"在 {输入目录} 中未找到文件")
|
||||
return
|
||||
|
||||
print(f"找到 {len(文件列表)} 个文件进行处理...")
|
||||
|
||||
成功计数 = 0
|
||||
for 输入文件 in 文件列表:
|
||||
# 处理输出文件名
|
||||
if isinstance(输入文件, str) and self.是URL(输入文件):
|
||||
# 如果输出路径是文件(不是目录),直接使用它
|
||||
if 输出路径.suffix:
|
||||
输出文件 = 输出路径
|
||||
else:
|
||||
# 从URL提取文件名,如果没有则使用默认名称
|
||||
url_path = urlparse(输入文件).path
|
||||
文件名 = Path(url_path).name if url_path else "output.json"
|
||||
输出文件 = 输出路径 / 文件名
|
||||
输出文件.parent.mkdir(parents=True, exist_ok=True)
|
||||
else:
|
||||
输出文件 = 输出路径 / 输入文件.name
|
||||
|
||||
if 模式 == "get":
|
||||
if self.获取URL内容(str(输入文件), str(输出文件)):
|
||||
成功计数 += 1
|
||||
elif 模式 == "enc":
|
||||
if self.加密文件(str(输入文件), str(输出文件)):
|
||||
成功计数 += 1
|
||||
else:
|
||||
if self.解密文件(str(输入文件), str(输出文件)):
|
||||
成功计数 += 1
|
||||
|
||||
print(f"\n处理完成: 成功 {成功计数}/{len(文件列表)} 个文件")
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 3:
|
||||
print("用法:")
|
||||
print(" 单个文件: python tvbox.py 输入文件/URL 输出文件 [模式]")
|
||||
print(" 批量处理: python tvbox.py 输入目录/URL 输出目录 [模式] [--batch]")
|
||||
print("模式: enc-加密(默认) / dec-解密 / get-获取URL内容")
|
||||
print("说明: 支持本地文件和URL作为输入源")
|
||||
print("示例:")
|
||||
print(" 本地文件加密: python tvbox.py api.json api.json")
|
||||
print(" 本地文件解密: python tvbox.py api.json api.json dec")
|
||||
print(" URL获取: python tvbox.py https://example.com/data.txt output.json get")
|
||||
print(" URL解密: python tvbox.py https://example.com/encrypted.txt output.json dec")
|
||||
print(" 批量加密: python tvbox.py input_dir output_dir enc --batch")
|
||||
print(" 批量解密: python tvbox.py input_dir output_dir dec --batch")
|
||||
sys.exit(1)
|
||||
|
||||
输入路径, 输出路径 = sys.argv[1], sys.argv[2]
|
||||
|
||||
# 判断模式
|
||||
模式 = sys.argv[3] if len(sys.argv) > 3 else "enc"
|
||||
|
||||
# 判断是否批量模式
|
||||
批量模式 = len(sys.argv) > 4 and sys.argv[4] == "--batch"
|
||||
|
||||
加解密器 = 文件加解密器()
|
||||
|
||||
# 自动判断是否为目录或URL
|
||||
输入是URL = 加解密器.是URL(输入路径)
|
||||
输入路径是目录 = os.path.isdir(输入路径) if not 输入是URL else False
|
||||
|
||||
if 批量模式 or 输入路径是目录 or 输入是URL:
|
||||
加解密器.批量处理(输入路径, 输出路径, 模式)
|
||||
else:
|
||||
if 模式 == "enc":
|
||||
加解密器.加密文件(输入路径, 输出路径)
|
||||
else:
|
||||
加解密器.解密文件(输入路径, 输出路径)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
BIN
xiaosa/wex.jar
BIN
xiaosa/wex.jar
Binary file not shown.
Reference in New Issue
Block a user