Files
Onekey/web/app.py
ikun0014 2cf9af811a Refactor to use aiohttp and ujson, update dependencies
Replaced httpx and json with aiohttp and ujson throughout the codebase for improved async performance and faster JSON handling. Updated requirements.txt to reflect new dependencies and removed unused ones. Refactored manifest handling to remove steam.client.cdn dependency and implemented custom manifest serialization. Updated logger to use loguru instead of logzero. Adjusted i18n keys and tray menu logic to match new window-based UI. Updated about.html to reflect backend technology change from HTTPX to AIOHTTP.
2026-01-11 03:46:45 +08:00

423 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
import time
import ujson
import aiohttp
import asyncio
from pathlib import Path
from typing import List
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from src.constants import STEAM_API_BASE
from src.utils.i18n import t
project_root = Path(__file__)
sys.path.insert(0, str(project_root))
def get_base_path():
if hasattr(sys, "_MEIPASS"):
return Path(sys._MEIPASS)
elif getattr(sys, "frozen", False):
return Path(os.path.dirname(os.path.abspath(sys.executable)))
else:
return Path(os.path.dirname(os.path.abspath(__file__)))
base_path = get_base_path()
try:
from src.main import OnekeyApp
from src.config import ConfigManager
except ImportError as e:
print(t("error.import", error=str(e)))
print(t("error.ensure_root"))
sys.exit(1)
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except BaseException:
pass
class WebOnekeyApp:
def __init__(self, manager: ConnectionManager):
self.onekey_app = None
self.current_task = None
self.task_status = "idle"
self.task_progress = []
self.task_result = None
self.manager = manager
def init_app(self):
try:
self.onekey_app = OnekeyApp()
return True
except Exception as e:
return False, str(e)
async def run_unlock_task(self, app_id: str, tool_type: str, dlc: bool):
try:
self.task_status = "running"
self.task_progress = []
self.onekey_app = OnekeyApp()
self._add_progress_handler()
result = await self.onekey_app.run(app_id, tool_type, dlc)
if result:
self.task_status = "completed"
self.task_result = {
"success": True,
"message": "游戏解锁配置成功重启Steam后生效",
}
else:
self.task_status = "error"
self.task_result = {"success": False, "message": "配置失败"}
except Exception as e:
self.task_status = "error"
self.task_result = {"success": False, "message": f"配置失败: {str(e)}"}
finally:
if hasattr(self, "onekey_app") and self.onekey_app:
try:
if hasattr(self.onekey_app, "client"):
await self.onekey_app.client.close()
except BaseException:
pass
self.onekey_app = None
def _add_progress_handler(self):
if self.onekey_app and self.onekey_app.logger:
original_info = self.onekey_app.logger.info
original_warning = self.onekey_app.logger.warning
original_error = self.onekey_app.logger.error
def info_with_progress(msg):
self.task_progress.append(
{"type": "info", "message": str(msg), "timestamp": time.time()}
)
asyncio.create_task(
self.manager.broadcast(
ujson.dumps(
{
"type": "task_progress",
"data": {"type": "info", "message": str(msg)},
}
)
)
)
return original_info(msg)
def warning_with_progress(msg):
self.task_progress.append(
{"type": "warning", "message": str(msg), "timestamp": time.time()}
)
asyncio.create_task(
self.manager.broadcast(
ujson.dumps(
{
"type": "task_progress",
"data": {"type": "warning", "message": str(msg)},
}
)
)
)
return original_warning(msg)
def error_with_progress(msg):
self.task_progress.append(
{"type": "error", "message": str(msg), "timestamp": time.time()}
)
asyncio.create_task(
self.manager.broadcast(
ujson.dumps(
{
"type": "task_progress",
"data": {"type": "error", "message": str(msg)},
}
)
)
)
return original_error(msg)
self.onekey_app.logger.info = info_with_progress
self.onekey_app.logger.warning = warning_with_progress
self.onekey_app.logger.error = error_with_progress
app = FastAPI(title="Onekey")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
manager = ConnectionManager()
config = ConfigManager()
app.mount(
"/static",
StaticFiles(directory=f"{base_path}/{config.app_config.language}/static"),
name="static",
)
templates = Jinja2Templates(
directory=f"{base_path}/{config.app_config.language}/templates"
)
web_app = WebOnekeyApp(manager)
@app.get("/")
async def index(request: Request):
"""主页"""
config = ConfigManager()
if not config.app_config.key:
return RedirectResponse(request.url_for("oobe"))
else:
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/oobe")
async def oobe(request: Request):
"""OOBE页面"""
return templates.TemplateResponse("oobe.html", {"request": request})
@app.post("/api/init")
async def init_app():
"""初始化应用"""
result = web_app.init_app()
if isinstance(result, tuple):
return JSONResponse({"success": False, "message": result[1]})
return JSONResponse({"success": True})
@app.get("/api/config")
async def get_config():
"""获取配置信息"""
try:
config = ConfigManager()
return JSONResponse(
{
"success": True,
"config": {
"steam_path": str(config.steam_path) if config.steam_path else "",
"debug_mode": config.app_config.debug_mode,
},
}
)
except Exception as e:
return JSONResponse({"success": False, "message": str(e)})
@app.post("/api/start_unlock")
async def start_unlock(request: Request):
"""开始解锁任务"""
data = await request.json()
app_id = data.get("app_id", "").strip()
tool_type = data.get("tool_type", "steamtools")
dlc = data.get("dlc")
if not app_id:
return JSONResponse({"success": False, "message": "请输入有效的App ID"})
app_id_list = [id for id in app_id.split("-") if id.isdigit()]
if not app_id_list:
return JSONResponse({"success": False, "message": "App ID格式无效"})
if web_app.task_status == "running":
return JSONResponse({"success": False, "message": "已有任务正在运行"})
try:
await web_app.run_unlock_task(app_id_list[0], tool_type, dlc)
except Exception as e:
web_app.task_status = "error"
web_app.task_result = {
"success": False,
"message": f"任务执行失败: {str(e)}",
}
return JSONResponse({"success": True, "message": "任务已开始"})
@app.get("/api/task_status")
async def get_task_status():
"""获取任务状态"""
return JSONResponse(
{
"status": web_app.task_status,
"progress": (web_app.task_progress[-10:] if web_app.task_progress else []),
"result": web_app.task_result,
}
)
@app.get("/about")
async def about_page(request: Request):
"""关于页面"""
return templates.TemplateResponse("about.html", {"request": request})
@app.get("/settings")
async def settings_page(request: Request):
"""设置页面"""
return templates.TemplateResponse("settings.html", {"request": request})
@app.post("/api/config/update")
async def update_config(request: Request):
"""更新配置"""
try:
data = await request.json()
if not isinstance(data, dict):
return {"success": False, "message": "无效的配置数据"}
config_manager = ConfigManager()
new_config = {
"KEY": data.get("key", ""),
"Port": config_manager.app_config.port,
"Custom_Steam_Path": data.get("steam_path", ""),
"Debug_Mode": data.get("debug_mode", False),
"Logging_Files": data.get("logging_files", True),
"Show_Console": data.get("show_console", True),
"Language": data.get("language", "zh"),
}
import ujson
config_path = config_manager.config_path
with open(config_path, "w", encoding="utf-8") as f:
ujson.dump(new_config, f, indent=2, ensure_ascii=False)
return {"success": True, "message": "配置已保存"}
except Exception as e:
return {"success": False, "message": f"保存配置失败: {str(e)}"}
@app.post("/api/config/reset")
async def reset_config():
"""重置配置为默认值"""
try:
from src.config import DEFAULT_CONFIG
import ujson
config_manager = ConfigManager()
config_path = config_manager.config_path
with open(config_path, "w", encoding="utf-8") as f:
ujson.dump(DEFAULT_CONFIG, f, indent=2, ensure_ascii=False)
return {"success": True, "message": "配置已重置为默认值"}
except Exception as e:
return {"success": False, "message": f"重置配置失败: {str(e)}"}
@app.get("/api/config/detailed")
async def get_detailed_config():
"""获取详细配置信息"""
try:
config = ConfigManager()
return {
"success": True,
"config": {
"steam_path": str(config.steam_path) if config.steam_path else "",
"debug_mode": config.app_config.debug_mode,
"logging_files": config.app_config.logging_files,
"show_console": config.app_config.show_console,
"steam_path_exists": (
config.steam_path.exists() if config.steam_path else False
),
"key": getattr(config.app_config, "key", ""),
"language": config.app_config.language,
},
}
except Exception as e:
return {"success": False, "message": str(e)}
@app.post("/api/getKeyInfo")
async def get_key_info(request: Request):
"""获取卡密信息"""
try:
data = await request.json()
key = data.get("key", "").strip()
if not key:
return JSONResponse({"success": False, "message": "卡密不能为空"})
async with aiohttp.ClientSession(conn_timeout=10.0) as client:
response = await client.post(
url=f"{STEAM_API_BASE}/getKeyInfo",
json={"key": key},
headers={"Content-Type": "application/json"},
)
if response.status == 200:
result = ujson.loads(await response.content.read())
return JSONResponse(result)
else:
return JSONResponse({"success": False, "message": "卡密验证服务不可用"})
except aiohttp.ConnectionTimeoutError:
return JSONResponse({"success": False, "message": "验证超时,请检查网络连接"})
except Exception as e:
return JSONResponse({"success": False, "message": f"验证失败: {str(e)}"})
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket 端点"""
await manager.connect(websocket)
try:
await websocket.send_text(
ujson.dumps({"type": "connected", "data": {"message": "已连接到服务器"}})
)
while True:
data = await websocket.receive_text()
message = ujson.loads(data)
if message.get("type") == "ping":
await websocket.send_text(
ujson.dumps({"type": "pong", "data": {"timestamp": time.time()}})
)
except WebSocketDisconnect:
manager.disconnect(websocket)
print(t("web.client_disconnected"))
except Exception as e:
print(t("web.websocket_error", error=str(e)))
manager.disconnect(websocket)
print(t("web.starting"))