Refactor to use aiohttp and ujson, update dependencies

Replaced httpx and json with aiohttp and ujson throughout the codebase for improved async performance and faster JSON handling. Updated requirements.txt to reflect new dependencies and removed unused ones. Refactored manifest handling to remove steam.client.cdn dependency and implemented custom manifest serialization. Updated logger to use loguru instead of logzero. Adjusted i18n keys and tray menu logic to match new window-based UI. Updated about.html to reflect backend technology change from HTTPX to AIOHTTP.
This commit is contained in:
ikun0014
2026-01-11 03:45:59 +08:00
parent b91bf42981
commit 2cf9af811a
13 changed files with 170 additions and 226 deletions

49
main.py
View File

@@ -1,8 +1,7 @@
import os import os
import sys import sys
import time
import threading import threading
import webbrowser import webview
from pathlib import Path from pathlib import Path
from PIL import Image from PIL import Image
@@ -13,6 +12,12 @@ from src.utils.i18n import t
project_root = Path(__file__) project_root = Path(__file__)
config_manager = ConfigManager() config_manager = ConfigManager()
sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root))
window = webview.create_window(
title="Onekey",
url=f"http://localhost:{config_manager.app_config.port}",
width=1600,
height=900,
)
def hide_console() -> None: def hide_console() -> None:
@@ -33,7 +38,7 @@ def hide_console() -> None:
def create_icon() -> Image.Image: def create_icon() -> Image.Image:
"""创建托盘图标""" """创建托盘图标"""
try: try:
return Image.open("./icon.jpg") return Image.open(project_root.parent / "icon.jpg")
except Exception as e: except Exception as e:
if config_manager.app_config.show_console: if config_manager.app_config.show_console:
print(t("error.load_icon", error=str(e))) print(t("error.load_icon", error=str(e)))
@@ -50,11 +55,8 @@ def create_system_tray() -> bool:
icon.stop() icon.stop()
os._exit(0) os._exit(0)
def on_open_browser(icon, item): def on_show_window(icon, item):
try: window.show()
webbrowser.open(f"http://localhost:{config_manager.app_config.port}")
except Exception:
pass
def on_show_console(icon, item): def on_show_console(icon, item):
try: try:
@@ -70,7 +72,7 @@ def create_system_tray() -> bool:
# 创建托盘菜单 # 创建托盘菜单
menu = pystray.Menu( menu = pystray.Menu(
pystray.MenuItem(t("tray.open_browser"), on_open_browser), pystray.MenuItem(t("tray.show_window"), on_show_window),
pystray.MenuItem(t("tray.show_console"), on_show_console), pystray.MenuItem(t("tray.show_console"), on_show_console),
pystray.MenuItem(t("tray.exit"), on_quit), pystray.MenuItem(t("tray.exit"), on_quit),
) )
@@ -91,18 +93,6 @@ def create_system_tray() -> bool:
return False return False
def open_browser_delayed(port: int) -> None:
"""延迟打开浏览器"""
time.sleep(2)
try:
webbrowser.open(f"http://localhost:{port}")
if config_manager.app_config.show_console:
print(t("main.browser_opened", port=port))
except Exception:
if config_manager.app_config.show_console:
print(t("main.browser_open_failed", port=port))
def start_web_server() -> None: def start_web_server() -> None:
"""启动Web服务器""" """启动Web服务器"""
from web.app import app from web.app import app
@@ -136,16 +126,15 @@ def main() -> None:
if tray_created: if tray_created:
print(t("main.tray_created")) print(t("main.tray_created"))
def on_closing():
if window.create_confirmation_dialog("Onekey", "是否关闭Onekey"):
os._exit(0)
return False
window.events.closing += on_closing
# 启动浏览器 # 启动浏览器
browser_thread = threading.Thread( webview.start(func=start_web_server)
target=open_browser_delayed, args=(config.port,)
)
browser_thread.daemon = True
browser_thread.start()
# 启动Web应用
start_web_server()
except KeyboardInterrupt: except KeyboardInterrupt:
if config_manager.app_config.show_console: if config_manager.app_config.show_console:
print(f"\n{t('main.exit')}") print(f"\n{t('main.exit')}")

View File

@@ -1,9 +1,15 @@
vdf # Server
httpx loguru
Pillow fastapi
pystray
uvicorn uvicorn
logzero # Encode
colorama vdf
fastapi[all] ujson
steam[client] # Web
jinja2
aiohttp
pywebview
websockets
# Any things
Pillow
pystray

View File

@@ -1,6 +1,6 @@
import os import os
import sys import sys
import json import ujson
import winreg import winreg
from pathlib import Path from pathlib import Path
from typing import Dict, Optional from typing import Dict, Optional
@@ -34,7 +34,7 @@ class ConfigManager:
"""生成默认配置文件""" """生成默认配置文件"""
try: try:
with open(self.config_path, "w", encoding="utf-8") as f: with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(DEFAULT_CONFIG, f, indent=2, ensure_ascii=False) ujson.dump(DEFAULT_CONFIG, f, indent=2, ensure_ascii=False)
print(t("config.generated")) print(t("config.generated"))
os.system("pause") os.system("pause")
sys.exit(1) sys.exit(1)
@@ -50,7 +50,7 @@ class ConfigManager:
try: try:
with open(self.config_path, "r", encoding="utf-8") as f: with open(self.config_path, "r", encoding="utf-8") as f:
self._config_data = json.load(f) self._config_data = ujson.load(f)
self.app_config = AppConfig( self.app_config = AppConfig(
key=self._config_data.get("KEY", ""), key=self._config_data.get("KEY", ""),
@@ -63,7 +63,7 @@ class ConfigManager:
) )
self.steam_path = self._get_steam_path() self.steam_path = self._get_steam_path()
except json.JSONDecodeError: except ujson.JSONDecodeError:
print(t("config.corrupted")) print(t("config.corrupted"))
self._generate_config() self._generate_config()
print(t("config.regenerated")) print(t("config.regenerated"))

View File

@@ -1,50 +1,24 @@
"""常量定义""" """常量定义"""
from pathlib import Path from pathlib import Path
from httpx import Client
LOG_DIR: Path = Path("logs") LOG_DIR: Path = Path("logs")
CONFIG_FILE: Path = Path("config.json") CONFIG_FILE: Path = Path("config.json")
def check_ip():
try:
with Client(timeout=5.0) as client:
req = client.get(
"https://mips.kugou.com/check/iscn",
)
req.raise_for_status()
body = req.json()
print("已获取IP属地")
return bool(body["flag"])
except BaseException:
print("获取IP属地失败, 默认您位于中国大陆境内")
return True
IS_CN: bool = check_ip()
STEAM_API_BASE: str = "https://steam.ikunshare.com/api" STEAM_API_BASE: str = "https://steam.ikunshare.com/api"
STEAM_CACHE_CDN_LIST: list = ( STEAM_CACHE_CDN_LIST: list = [
[ "http://alibaba.cdn.steampipe.steamcontent.com",
"http://alibaba.cdn.steampipe.steamcontent.com", "http://steampipe.steamcontent.tnkjmec.com",
"http://steampipe.steamcontent.tnkjmec.com", "http://fastly.cdn.steampipe.steamcontent.com",
] "http://akamai.cdn.steampipe.steamcontent.com",
if IS_CN "http://telus.cdn.steampipe.steamcontent.com",
else [ "https://cache1-hkg1.steamcontent.com",
"http://fastly.cdn.steampipe.steamcontent.com", "https://cache2-hkg1.steamcontent.com",
"http://akamai.cdn.steampipe.steamcontent.com", "https://cache3-hkg1.steamcontent.com",
"http://telus.cdn.steampipe.steamcontent.com", "https://cache4-hkg1.steamcontent.com",
"https://cache1-hkg1.steamcontent.com", "https://cache5-hkg1.steamcontent.com",
"https://cache2-hkg1.steamcontent.com", "https://cache6-hkg1.steamcontent.com",
"https://cache3-hkg1.steamcontent.com", "https://cache7-hkg1.steamcontent.com",
"https://cache4-hkg1.steamcontent.com", "https://cache8-hkg1.steamcontent.com",
"https://cache5-hkg1.steamcontent.com", "https://cache9-hkg1.steamcontent.com",
"https://cache6-hkg1.steamcontent.com", "https://cache10-hkg1.steamcontent.com",
"https://cache7-hkg1.steamcontent.com", ]
"https://cache8-hkg1.steamcontent.com",
"https://cache9-hkg1.steamcontent.com",
"https://cache10-hkg1.steamcontent.com",
]
)

View File

@@ -1,9 +1,7 @@
"""日志模块""" """日志模块"""
import logging import sys
import colorama from loguru import logger
import logzero
from logzero import setup_logger, LogFormatter
from .constants import LOG_DIR from .constants import LOG_DIR
@@ -15,53 +13,51 @@ class Logger:
self.name = name self.name = name
self.debug_mode = debug_mode self.debug_mode = debug_mode
self.log_file = log_file self.log_file = log_file
self._logger = self._setup_logger() self._setup_logger()
self._logger = logger.bind(name=name)
def _setup_logger(self) -> logging.Logger: def _setup_logger(self):
"""设置日志器""" """设置日志器"""
level = logzero.DEBUG if self.debug_mode else logzero.INFO # 移除默认的 handler
logger.remove()
colors = { level = "DEBUG" if self.debug_mode else "INFO"
logzero.DEBUG: colorama.Fore.CYAN,
logzero.INFO: colorama.Fore.GREEN,
logzero.WARNING: colorama.Fore.YELLOW,
logzero.ERROR: colorama.Fore.RED,
logzero.CRITICAL: colorama.Fore.MAGENTA,
}
terminal_formatter = LogFormatter( # 控制台输出
color=True, logger.add(
fmt="%(color)s%(message)s%(end_color)s", sys.stderr, format="<level>{message}</level>", level=level, colorize=True
datefmt="%Y-%m-%d %H:%M:%S",
colors=colors,
) )
logger = setup_logger(self.name, level=level, formatter=terminal_formatter)
if self.log_file: if self.log_file:
LOG_DIR.mkdir(exist_ok=True) LOG_DIR.mkdir(exist_ok=True)
logfile = LOG_DIR / f"{self.name}.log" logfile = LOG_DIR / f"{self.name}.log"
file_handler = logging.FileHandler(logfile, encoding="utf-8")
file_formatter = logging.Formatter(
"[%(asctime)s] | [%(name)s:%(levelname)s] | [%(module)s.%(funcName)s:%(lineno)d] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger # 文件输出
file_format = (
"[{time:YYYY-MM-DD HH:mm:ss}] | "
"[{extra[name]}:{level}] | "
"[{module}.{function}:{line}] - {message}"
)
logger.add(
logfile,
format=file_format,
level=level,
encoding="utf-8",
filter=lambda record: record["extra"].get("name") == self.name,
)
def debug(self, msg: str): def debug(self, msg: str):
self._logger.debug(msg) self._logger.opt(depth=1).debug(msg)
def info(self, msg: str): def info(self, msg: str):
self._logger.info(msg) self._logger.opt(depth=1).info(msg)
def warning(self, msg: str): def warning(self, msg: str):
self._logger.warning(msg) self._logger.opt(depth=1).warning(msg)
def error(self, msg: str): def error(self, msg: str):
self._logger.error(msg) self._logger.opt(depth=1).error(msg)
def critical(self, msg: str): def critical(self, msg: str):
self._logger.critical(msg) self._logger.opt(depth=1).critical(msg)

View File

@@ -1,4 +1,6 @@
from typing import List, Dict, Tuple from typing import List, Dict, Tuple
import ujson
from .constants import STEAM_API_BASE from .constants import STEAM_API_BASE
from .config import ConfigManager from .config import ConfigManager
from .logger import Logger from .logger import Logger
@@ -27,7 +29,7 @@ class OnekeyApp:
f"{STEAM_API_BASE}/getKeyInfo", f"{STEAM_API_BASE}/getKeyInfo",
json={"key": self.config.app_config.key}, json={"key": self.config.app_config.key},
) )
body = response.json() body = ujson.loads(await response.content.read())
if not body["info"]: if not body["info"]:
self.logger.error(t("api.key_not_exist")) self.logger.error(t("api.key_not_exist"))
@@ -61,15 +63,15 @@ class OnekeyApp:
headers={"X-Api-Key": self.config.app_config.key}, headers={"X-Api-Key": self.config.app_config.key},
) )
if response.status_code == 401: if response.status == 401:
self.logger.error(t("api.invalid_key")) self.logger.error(t("api.invalid_key"))
return SteamAppInfo(), SteamAppManifestInfo(mainapp=[], dlcs=[]) return SteamAppInfo(), SteamAppManifestInfo(mainapp=[], dlcs=[])
if response.status_code != 200: if response.status != 200:
self.logger.error(t("api.request_failed", code=response.status_code)) self.logger.error(t("api.request_failed", code=response.status))
return SteamAppInfo(), SteamAppManifestInfo(mainapp=[], dlcs=[]) return SteamAppInfo(), SteamAppManifestInfo(mainapp=[], dlcs=[])
data = response.json() data = ujson.loads(await response.content.read())
if not data: if not data:
self.logger.error(t("api.no_manifest")) self.logger.error(t("api.no_manifest"))

View File

@@ -1,7 +1,8 @@
import vdf import io
import struct
import zipfile
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
from steam.client.cdn import DepotManifest
from .constants import STEAM_CACHE_CDN_LIST from .constants import STEAM_CACHE_CDN_LIST
from .models import ManifestInfo, SteamAppManifestInfo from .models import ManifestInfo, SteamAppManifestInfo
from .logger import Logger from .logger import Logger
@@ -26,33 +27,40 @@ class ManifestHandler:
url = cdn + manifest_info.url url = cdn + manifest_info.url
try: try:
r = await self.client.get(url) r = await self.client.get(url)
if r.status_code == 200: if r.status == 200:
return r.content return await r.content.read()
except Exception as e: except Exception as e:
self.logger.debug(t("manifest.download.failed", url=url, error=e)) self.logger.debug(t("manifest.download.failed", url=url, error=e))
@staticmethod
def _serialize_manifest_data(content: bytes) -> bytes:
magic_signature = struct.pack("<I", 0x71F617D0)
payload = content
if len(content) >= 4 and content[:4] == magic_signature:
payload = content[8:]
else:
try:
with zipfile.ZipFile(io.BytesIO(content)) as zf:
payload = zf.read("z")
except (zipfile.BadZipFile, KeyError):
pass
return magic_signature + struct.pack("<I", len(payload)) + payload
def process_manifest( def process_manifest(
self, manifest_data: bytes, manifest_info: ManifestInfo, remove_old: bool = True self, manifest_data: bytes, manifest_info: ManifestInfo, remove_old: bool = True
) -> bool: ) -> bool:
"""处理清单文件"""
try: try:
depot_id = manifest_info.depot_id depot_id = manifest_info.depot_id
manifest_id = manifest_info.manifest_id manifest_id = manifest_info.manifest_id
depot_key = bytes.fromhex(manifest_info.depot_key)
manifest = DepotManifest(manifest_data) _ = bytes.fromhex(manifest_info.depot_key)
serialized_data = self._serialize_manifest_data(manifest_data)
manifest_path = self.depot_cache / f"{depot_id}_{manifest_id}.manifest" manifest_path = self.depot_cache / f"{depot_id}_{manifest_id}.manifest"
config_path = self.depot_cache / "config.vdf"
if config_path.exists():
with open(config_path, "r", encoding="utf-8") as f:
d = vdf.load(f)
else:
d = {"depots": {}}
d["depots"][depot_id] = {"DecryptionKey": depot_key.hex()}
d = {"depots": dict(sorted(d["depots"].items()))}
if remove_old: if remove_old:
for file in self.depot_cache.iterdir(): for file in self.depot_cache.iterdir():
if file.suffix == ".manifest": if file.suffix == ".manifest":
@@ -66,10 +74,7 @@ class ManifestHandler:
self.logger.info(t("manifest.delete_old", name=file.name)) self.logger.info(t("manifest.delete_old", name=file.name))
with open(manifest_path, "wb") as f: with open(manifest_path, "wb") as f:
f.write(manifest.serialize(compress=False)) f.write(serialized_data)
with open(config_path, "w", encoding="utf-8") as f:
vdf.dump(d, f, pretty=True)
self.logger.info( self.logger.info(
t( t(
@@ -89,11 +94,9 @@ class ManifestHandler:
) -> List[ManifestInfo]: ) -> List[ManifestInfo]:
"""批量处理清单""" """批量处理清单"""
processed = [] processed = []
all_manifests = manifests.mainapp + manifests.dlcs
app_manifest = manifests.mainapp for manifest_info in all_manifests:
dlc_manifest = manifests.dlcs
for manifest_info in app_manifest + dlc_manifest:
manifest_path = ( manifest_path = (
self.depot_cache self.depot_cache
/ f"{manifest_info.depot_id}_{manifest_info.manifest_id}.manifest" / f"{manifest_info.depot_id}_{manifest_info.manifest_id}.manifest"

View File

@@ -1,6 +1,6 @@
"""HTTP客户端模块""" """HTTP客户端模块"""
import httpx import aiohttp
from typing import Optional, Dict from typing import Optional, Dict
@@ -8,15 +8,15 @@ class HttpClient:
"""HTTP客户端封装""" """HTTP客户端封装"""
def __init__(self): def __init__(self):
self._client = httpx.AsyncClient(timeout=60.0) self._client = aiohttp.ClientSession(conn_timeout=60.0)
async def get(self, url: str, headers: Optional[Dict] = None) -> httpx.Response: async def get(self, url: str, headers: Optional[Dict] = None) -> aiohttp.ClientResponse:
"""GET请求""" """GET请求"""
return await self._client.get(url, headers=headers) return await self._client.get(url, headers=headers)
async def close(self): async def close(self):
"""关闭客户端""" """关闭客户端"""
await self._client.aclose() await self._client.close()
async def __aenter__(self): async def __aenter__(self):
return self return self

View File

@@ -4,23 +4,27 @@ from typing import List
from .base import UnlockTool from .base import UnlockTool
from ..models import DepotInfo from ..models import DepotInfo
class GreenLuma(UnlockTool): class GreenLuma(UnlockTool):
"""GreenLuma解锁工具实现""" """GreenLuma解锁工具实现"""
async def setup(self, depot_data: List[DepotInfo], app_id: str, **kwargs) -> bool: async def setup(self, depot_data: List[DepotInfo], app_id: str, **kwargs) -> bool:
applist_dir = self.steam_path / "AppList" applist_dir = self.steam_path / "AppList"
if applist_dir.is_file(): if applist_dir.is_file():
applist_dir.unlink(missing_ok=True) applist_dir.unlink(missing_ok=True)
if not applist_dir.is_dir(): if not applist_dir.is_dir():
applist_dir.mkdir(parents=True, exist_ok=True) applist_dir.mkdir(parents=True, exist_ok=True)
depot_dict = {} depot_dict = {}
for i in applist_dir.iterdir(): for i in applist_dir.iterdir():
if i.stem.isdecimal() and i.suffix == '.txt': if i.stem.isdecimal() and i.suffix == ".txt":
with i.open('r', encoding='utf-8') as f: with i.open("r", encoding="utf-8") as f:
app_id_ = f.read().strip() app_id_ = f.read().strip()
depot_dict[int(i.stem)] = int(app_id_) if app_id_.isdecimal() else None depot_dict[int(i.stem)] = (
int(app_id_) if app_id_.isdecimal() else None
)
def find_next_index(): def find_next_index():
if not depot_dict: if not depot_dict:
return 0 return 0
@@ -28,23 +32,23 @@ class GreenLuma(UnlockTool):
if i not in depot_dict: if i not in depot_dict:
return i return i
return max(depot_dict.keys()) + 1 return max(depot_dict.keys()) + 1
if app_id and app_id.isdecimal(): if app_id and app_id.isdecimal():
app_id_int = int(app_id) app_id_int = int(app_id)
if app_id_int not in depot_dict.values(): if app_id_int not in depot_dict.values():
index = find_next_index() index = find_next_index()
with (applist_dir / f'{index}.txt').open('w', encoding='utf-8') as f: with (applist_dir / f"{index}.txt").open("w", encoding="utf-8") as f:
f.write(str(app_id)) f.write(str(app_id))
depot_dict[index] = app_id_int depot_dict[index] = app_id_int
for depot in depot_data: for depot in depot_data:
depot_id = int(depot.depot_id) depot_id = int(depot.depot_id)
if depot_id not in depot_dict.values(): if depot_id not in depot_dict.values():
index = find_next_index() index = find_next_index()
with (applist_dir / f'{index}.txt').open('w', encoding='utf-8') as f: with (applist_dir / f"{index}.txt").open("w", encoding="utf-8") as f:
f.write(str(depot_id)) f.write(str(depot_id))
depot_dict[index] = depot_id depot_dict[index] = depot_id
config_path = self.steam_path / "config" / "config.vdf" config_path = self.steam_path / "config" / "config.vdf"
try: try:
if config_path.is_file(): if config_path.is_file():
@@ -53,16 +57,18 @@ class GreenLuma(UnlockTool):
else: else:
content = {} content = {}
config_path.parent.mkdir(parents=True, exist_ok=True) config_path.parent.mkdir(parents=True, exist_ok=True)
if "depots" not in content: if "depots" not in content:
content["depots"] = {} content["depots"] = {}
for depot in depot_data: for depot in depot_data:
content["depots"][depot.depot_id] = {"DecryptionKey": depot.decryption_key} content["depots"][depot.depot_id] = {
"DecryptionKey": depot.decryption_key
}
with open(config_path, "w", encoding="utf-8") as f: with open(config_path, "w", encoding="utf-8") as f:
f.write(vdf.dumps(content)) f.write(vdf.dumps(content))
return True return True
except Exception as e: except Exception as e:
print(f"GreenLuma配置失败: {e}") print(f"GreenLuma配置失败: {e}")
return False return False

View File

@@ -14,14 +14,12 @@ class I18n:
# 中文翻译 # 中文翻译
self.translations["zh"] = { self.translations["zh"] = {
# 系统托盘 # 系统托盘
"tray.open_browser": "打开浏览器", "tray.show_window": "显示程序",
"tray.show_console": "显示控制台", "tray.show_console": "显示控制台",
"tray.exit": "退出程序", "tray.exit": "退出程序",
# 主程序 # 主程序
"main.starting": "正在启动Onekey...", "main.starting": "正在启动Onekey...",
"main.tray_created": "系统托盘已创建", "main.tray_created": "系统托盘已创建",
"main.browser_opened": "浏览器已自动打开",
"main.browser_open_failed": "无法自动打开浏览器,请手动访问: http://localhost:{port}",
"main.exit": "程序已退出", "main.exit": "程序已退出",
"main.start_error": "启动错误: {error}", "main.start_error": "启动错误: {error}",
"main.press_enter": "按回车键退出...", "main.press_enter": "按回车键退出...",
@@ -72,7 +70,6 @@ class I18n:
"error.ensure_root": "请确保在项目根目录中运行此程序", "error.ensure_root": "请确保在项目根目录中运行此程序",
# Web相关 # Web相关
"web.starting": "启动Onekey Web GUI...", "web.starting": "启动Onekey Web GUI...",
"web.visit": "请在浏览器中访问: http://localhost:{port}",
"web.task_running": "已有任务正在运行", "web.task_running": "已有任务正在运行",
"web.invalid_appid": "请输入有效的App ID", "web.invalid_appid": "请输入有效的App ID",
"web.invalid_format": "App ID格式无效", "web.invalid_format": "App ID格式无效",
@@ -104,14 +101,12 @@ class I18n:
# 英文翻译 # 英文翻译
self.translations["en"] = { self.translations["en"] = {
# System tray # System tray
"tray.open_browser": "Open Browser", "tray.show_window": "Show Window",
"tray.show_console": "Show Console", "tray.show_console": "Show Console",
"tray.exit": "Exit", "tray.exit": "Exit",
# Main program # Main program
"main.starting": "Starting Onekey...", "main.starting": "Starting Onekey...",
"main.tray_created": "System tray created", "main.tray_created": "System tray created",
"main.browser_opened": "Browser opened automatically",
"main.browser_open_failed": "Failed to open browser automatically, please visit: http://localhost:{port}",
"main.exit": "Program exited", "main.exit": "Program exited",
"main.start_error": "Startup error: {error}", "main.start_error": "Startup error: {error}",
"main.press_enter": "Press Enter to exit...", "main.press_enter": "Press Enter to exit...",
@@ -162,7 +157,6 @@ class I18n:
"error.ensure_root": "Please ensure running this program from project root", "error.ensure_root": "Please ensure running this program from project root",
# Web related # Web related
"web.starting": "Starting Onekey Web GUI...", "web.starting": "Starting Onekey Web GUI...",
"web.visit": "Please visit: http://localhost:{port}",
"web.task_running": "A task is already running", "web.task_running": "A task is already running",
"web.invalid_appid": "Please enter a valid App ID", "web.invalid_appid": "Please enter a valid App ID",
"web.invalid_format": "Invalid App ID format", "web.invalid_format": "Invalid App ID format",

View File

@@ -1,8 +1,8 @@
import os import os
import sys import sys
import time import time
import json import ujson
import httpx import aiohttp
import asyncio import asyncio
from pathlib import Path from pathlib import Path
@@ -16,14 +16,11 @@ from fastapi.templating import Jinja2Templates
from src.constants import STEAM_API_BASE from src.constants import STEAM_API_BASE
from src.utils.i18n import t from src.utils.i18n import t
# 添加项目根目录到Python路径
project_root = Path(__file__) project_root = Path(__file__)
sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root))
def get_base_path(): def get_base_path():
"""获取程序基础路径"""
if hasattr(sys, "_MEIPASS"): if hasattr(sys, "_MEIPASS"):
return Path(sys._MEIPASS) return Path(sys._MEIPASS)
elif getattr(sys, "frozen", False): elif getattr(sys, "frozen", False):
@@ -44,7 +41,6 @@ except ImportError as e:
class ConnectionManager: class ConnectionManager:
"""WebSocket 连接管理器"""
def __init__(self): def __init__(self):
self.active_connections: List[WebSocket] = [] self.active_connections: List[WebSocket] = []
@@ -64,23 +60,19 @@ class ConnectionManager:
try: try:
await connection.send_text(message) await connection.send_text(message)
except BaseException: except BaseException:
# 连接可能已关闭
pass pass
class WebOnekeyApp: class WebOnekeyApp:
"""Web版本的Onekey应用"""
def __init__(self, manager: ConnectionManager): def __init__(self, manager: ConnectionManager):
self.onekey_app = None self.onekey_app = None
self.current_task = None self.current_task = None
self.task_status = "idle" # idle, running, completed, error self.task_status = "idle"
self.task_progress = [] self.task_progress = []
self.task_result = None self.task_result = None
self.manager = manager self.manager = manager
def init_app(self): def init_app(self):
"""初始化Onekey应用"""
try: try:
self.onekey_app = OnekeyApp() self.onekey_app = OnekeyApp()
return True return True
@@ -88,18 +80,14 @@ class WebOnekeyApp:
return False, str(e) return False, str(e)
async def run_unlock_task(self, app_id: str, tool_type: str, dlc: bool): async def run_unlock_task(self, app_id: str, tool_type: str, dlc: bool):
"""运行解锁任务"""
try: try:
self.task_status = "running" self.task_status = "running"
self.task_progress = [] self.task_progress = []
# 重新初始化应用以确保新的任务状态
self.onekey_app = OnekeyApp() self.onekey_app = OnekeyApp()
# 添加自定义日志处理器来捕获进度
self._add_progress_handler() self._add_progress_handler()
# 执行解锁任务
result = await self.onekey_app.run(app_id, tool_type, dlc) result = await self.onekey_app.run(app_id, tool_type, dlc)
if result: if result:
@@ -116,7 +104,6 @@ class WebOnekeyApp:
self.task_status = "error" self.task_status = "error"
self.task_result = {"success": False, "message": f"配置失败: {str(e)}"} self.task_result = {"success": False, "message": f"配置失败: {str(e)}"}
finally: finally:
# 确保应用资源被清理
if hasattr(self, "onekey_app") and self.onekey_app: if hasattr(self, "onekey_app") and self.onekey_app:
try: try:
if hasattr(self.onekey_app, "client"): if hasattr(self.onekey_app, "client"):
@@ -126,7 +113,6 @@ class WebOnekeyApp:
self.onekey_app = None self.onekey_app = None
def _add_progress_handler(self): def _add_progress_handler(self):
"""添加进度处理器"""
if self.onekey_app and self.onekey_app.logger: if self.onekey_app and self.onekey_app.logger:
original_info = self.onekey_app.logger.info original_info = self.onekey_app.logger.info
original_warning = self.onekey_app.logger.warning original_warning = self.onekey_app.logger.warning
@@ -136,10 +122,9 @@ class WebOnekeyApp:
self.task_progress.append( self.task_progress.append(
{"type": "info", "message": str(msg), "timestamp": time.time()} {"type": "info", "message": str(msg), "timestamp": time.time()}
) )
# 广播进度消息
asyncio.create_task( asyncio.create_task(
self.manager.broadcast( self.manager.broadcast(
json.dumps( ujson.dumps(
{ {
"type": "task_progress", "type": "task_progress",
"data": {"type": "info", "message": str(msg)}, "data": {"type": "info", "message": str(msg)},
@@ -155,7 +140,7 @@ class WebOnekeyApp:
) )
asyncio.create_task( asyncio.create_task(
self.manager.broadcast( self.manager.broadcast(
json.dumps( ujson.dumps(
{ {
"type": "task_progress", "type": "task_progress",
"data": {"type": "warning", "message": str(msg)}, "data": {"type": "warning", "message": str(msg)},
@@ -171,7 +156,7 @@ class WebOnekeyApp:
) )
asyncio.create_task( asyncio.create_task(
self.manager.broadcast( self.manager.broadcast(
json.dumps( ujson.dumps(
{ {
"type": "task_progress", "type": "task_progress",
"data": {"type": "error", "message": str(msg)}, "data": {"type": "error", "message": str(msg)},
@@ -186,7 +171,6 @@ class WebOnekeyApp:
self.onekey_app.logger.error = error_with_progress self.onekey_app.logger.error = error_with_progress
# 创建FastAPI应用
app = FastAPI(title="Onekey") app = FastAPI(title="Onekey")
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
@@ -198,7 +182,6 @@ app.add_middleware(
manager = ConnectionManager() manager = ConnectionManager()
# 修复为静态文件路由添加name参数
config = ConfigManager() config = ConfigManager()
app.mount( app.mount(
"/static", "/static",
@@ -209,7 +192,6 @@ templates = Jinja2Templates(
directory=f"{base_path}/{config.app_config.language}/templates" directory=f"{base_path}/{config.app_config.language}/templates"
) )
# 创建Web应用实例
web_app = WebOnekeyApp(manager) web_app = WebOnekeyApp(manager)
@@ -267,7 +249,6 @@ async def start_unlock(request: Request):
if not app_id: if not app_id:
return JSONResponse({"success": False, "message": "请输入有效的App ID"}) return JSONResponse({"success": False, "message": "请输入有效的App ID"})
# 验证App ID格式
app_id_list = [id for id in app_id.split("-") if id.isdigit()] app_id_list = [id for id in app_id.split("-") if id.isdigit()]
if not app_id_list: if not app_id_list:
return JSONResponse({"success": False, "message": "App ID格式无效"}) return JSONResponse({"success": False, "message": "App ID格式无效"})
@@ -293,9 +274,7 @@ async def get_task_status():
return JSONResponse( return JSONResponse(
{ {
"status": web_app.task_status, "status": web_app.task_status,
"progress": ( "progress": (web_app.task_progress[-10:] if web_app.task_progress else []),
web_app.task_progress[-10:] if web_app.task_progress else []
), # 只返回最近10条
"result": web_app.task_result, "result": web_app.task_result,
} }
) )
@@ -319,14 +298,11 @@ async def update_config(request: Request):
try: try:
data = await request.json() data = await request.json()
# 验证必需的字段
if not isinstance(data, dict): if not isinstance(data, dict):
return {"success": False, "message": "无效的配置数据"} return {"success": False, "message": "无效的配置数据"}
# 加载当前配置
config_manager = ConfigManager() config_manager = ConfigManager()
# 准备新的配置数据
new_config = { new_config = {
"KEY": data.get("key", ""), "KEY": data.get("key", ""),
"Port": config_manager.app_config.port, "Port": config_manager.app_config.port,
@@ -337,12 +313,11 @@ async def update_config(request: Request):
"Language": data.get("language", "zh"), "Language": data.get("language", "zh"),
} }
# 保存配置 import ujson
import json
config_path = config_manager.config_path config_path = config_manager.config_path
with open(config_path, "w", encoding="utf-8") as f: with open(config_path, "w", encoding="utf-8") as f:
json.dump(new_config, f, indent=2, ensure_ascii=False) ujson.dump(new_config, f, indent=2, ensure_ascii=False)
return {"success": True, "message": "配置已保存"} return {"success": True, "message": "配置已保存"}
@@ -355,13 +330,13 @@ async def reset_config():
"""重置配置为默认值""" """重置配置为默认值"""
try: try:
from src.config import DEFAULT_CONFIG from src.config import DEFAULT_CONFIG
import json import ujson
config_manager = ConfigManager() config_manager = ConfigManager()
config_path = config_manager.config_path config_path = config_manager.config_path
with open(config_path, "w", encoding="utf-8") as f: with open(config_path, "w", encoding="utf-8") as f:
json.dump(DEFAULT_CONFIG, f, indent=2, ensure_ascii=False) ujson.dump(DEFAULT_CONFIG, f, indent=2, ensure_ascii=False)
return {"success": True, "message": "配置已重置为默认值"} return {"success": True, "message": "配置已重置为默认值"}
@@ -402,19 +377,19 @@ async def get_key_info(request: Request):
if not key: if not key:
return JSONResponse({"success": False, "message": "卡密不能为空"}) return JSONResponse({"success": False, "message": "卡密不能为空"})
async with httpx.AsyncClient(timeout=10.0) as client: async with aiohttp.ClientSession(conn_timeout=10.0) as client:
response = await client.post( response = await client.post(
f"{STEAM_API_BASE}/getKeyInfo", url=f"{STEAM_API_BASE}/getKeyInfo",
json={"key": key}, json={"key": key},
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
if response.status_code == 200: if response.status == 200:
result = response.json() result = ujson.loads(await response.content.read())
return JSONResponse(result) return JSONResponse(result)
else: else:
return JSONResponse({"success": False, "message": "卡密验证服务不可用"}) return JSONResponse({"success": False, "message": "卡密验证服务不可用"})
except httpx.TimeoutException: except aiohttp.ConnectionTimeoutError:
return JSONResponse({"success": False, "message": "验证超时,请检查网络连接"}) return JSONResponse({"success": False, "message": "验证超时,请检查网络连接"})
except Exception as e: except Exception as e:
return JSONResponse({"success": False, "message": f"验证失败: {str(e)}"}) return JSONResponse({"success": False, "message": f"验证失败: {str(e)}"})
@@ -426,15 +401,15 @@ async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket) await manager.connect(websocket)
try: try:
await websocket.send_text( await websocket.send_text(
json.dumps({"type": "connected", "data": {"message": "已连接到服务器"}}) ujson.dumps({"type": "connected", "data": {"message": "已连接到服务器"}})
) )
while True: while True:
data = await websocket.receive_text() data = await websocket.receive_text()
message = json.loads(data) message = ujson.loads(data)
if message.get("type") == "ping": if message.get("type") == "ping":
await websocket.send_text( await websocket.send_text(
json.dumps({"type": "pong", "data": {"timestamp": time.time()}}) ujson.dumps({"type": "pong", "data": {"timestamp": time.time()}})
) )
except WebSocketDisconnect: except WebSocketDisconnect:
manager.disconnect(websocket) manager.disconnect(websocket)
@@ -445,4 +420,3 @@ async def websocket_endpoint(websocket: WebSocket):
print(t("web.starting")) print(t("web.starting"))
print(t("web.visit", port=config.app_config.port))

View File

@@ -33,7 +33,7 @@
</p> </p>
</div> </div>
<div class="project-version"> <div class="project-version">
<span class="version-label">v2.1.1</span> <span class="version-label">v2.1.2</span>
<span class="version-type">Web UI</span> <span class="version-type">Web UI</span>
</div> </div>
</div> </div>
@@ -117,7 +117,7 @@
<div class="tech-content"> <div class="tech-content">
<div class="tech-item"> <div class="tech-item">
<strong>🐍 Backend Technology</strong> <strong>🐍 Backend Technology</strong>
<span>Python 3.8+ • FastAPI • AsyncIO • HTTPX</span> <span>Python 3.8+ • FastAPI • AsyncIO • AIOHTTP</span>
</div> </div>
<div class="tech-item"> <div class="tech-item">
<strong>🌐 Frontend Technology</strong> <strong>🌐 Frontend Technology</strong>

View File

@@ -31,7 +31,7 @@
<p class="project-subtitle">直观,优雅的游戏解锁解决方案</p> <p class="project-subtitle">直观,优雅的游戏解锁解决方案</p>
</div> </div>
<div class="project-version"> <div class="project-version">
<span class="version-label">v2.1.1</span> <span class="version-label">v2.1.2</span>
<span class="version-type">Web UI</span> <span class="version-type">Web UI</span>
</div> </div>
</div> </div>
@@ -115,7 +115,7 @@
<div class="tech-content"> <div class="tech-content">
<div class="tech-item"> <div class="tech-item">
<strong>🐍 后端技术</strong> <strong>🐍 后端技术</strong>
<span>Python 3.8+ • FastAPI • AsyncIO • HTTPX</span> <span>Python 3.8+ • FastAPI • AsyncIO • AIOHTTP</span>
</div> </div>
<div class="tech-item"> <div class="tech-item">
<strong>🌐 前端技术</strong> <strong>🌐 前端技术</strong>