Initial project setup and source code import

Add project files including Python source code, web assets, configuration, and CI/CD workflows. Includes main application logic, web interface, supporting modules, and documentation for the Onekey Steam Depot Manifest Downloader.
This commit is contained in:
ikun0014
2025-08-04 17:48:35 +08:00
parent 343c86808d
commit 911f6f39e3
42 changed files with 7209 additions and 1 deletions

0
src/__init__.py Normal file
View File

104
src/config.py Normal file
View File

@@ -0,0 +1,104 @@
import os
import sys
import json
import winreg
from pathlib import Path
from typing import Dict, Optional
from .constants import CONFIG_FILE
from .models import AppConfig
DEFAULT_CONFIG = {
"KEY": "",
"Debug_Mode": False,
"Logging_Files": True,
"Show_Console": False,
"Custom_Steam_Path": "",
}
class ConfigManager:
"""配置管理器"""
def __init__(self):
self.config_path = CONFIG_FILE
self._config_data: Dict = {}
self.app_config: AppConfig = AppConfig()
self.steam_path: Optional[Path] = None
self._load_config()
def _generate_config(self) -> None:
"""生成默认配置文件"""
try:
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(DEFAULT_CONFIG, f, indent=2, ensure_ascii=False)
print("配置文件已生成")
os.system("pause")
sys.exit(1)
except IOError as e:
print(f"配置文件创建失败: {str(e)}")
os.system("pause")
sys.exit(1)
def _load_config(self) -> None:
"""加载配置文件"""
if not self.config_path.exists():
self._generate_config()
try:
with open(self.config_path, "r", encoding="utf-8") as f:
self._config_data = json.load(f)
self.app_config = AppConfig(
key=self._config_data.get("KEY", ""),
custom_steam_path=self._config_data.get("Custom_Steam_Path", ""),
debug_mode=self._config_data.get("Debug_Mode", False),
logging_files=self._config_data.get("Logging_Files", True),
show_console=self._config_data.get("Show_Console", True),
)
self.steam_path = self._get_steam_path()
except json.JSONDecodeError:
print("配置文件损坏,正在重新生成...")
self._generate_config()
print("配置文件已重新生成,使用默认配置继续运行")
self.app_config = AppConfig(
key=DEFAULT_CONFIG.get("KEY", ""),
custom_steam_path=DEFAULT_CONFIG.get("Custom_Steam_Path", ""),
debug_mode=DEFAULT_CONFIG.get("Debug_Mode", False),
logging_files=DEFAULT_CONFIG.get("Logging_Files", True),
show_console=DEFAULT_CONFIG.get("Show_Console", True),
)
try:
self.steam_path = self._get_steam_path()
except:
self.steam_path = None
except Exception as e:
print(f"配置加载失败: {str(e)}")
print("使用默认配置继续运行")
self.app_config = AppConfig(
key=DEFAULT_CONFIG.get("KEY", ""),
custom_steam_path=DEFAULT_CONFIG.get("Custom_Steam_Path", ""),
debug_mode=DEFAULT_CONFIG.get("Debug_Mode", False),
logging_files=DEFAULT_CONFIG.get("Logging_Files", True),
show_console=DEFAULT_CONFIG.get("Show_Console", True),
)
try:
self.steam_path = self._get_steam_path()
except:
self.steam_path = None
def _get_steam_path(self) -> Optional[Path]:
"""获取Steam安装路径"""
try:
if self.app_config.custom_steam_path:
return Path(self.app_config.custom_steam_path)
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER, r"Software\Valve\Steam"
) as key:
return Path(winreg.QueryValueEx(key, "SteamPath")[0])
except Exception as e:
print(f"Steam路径获取失败: {str(e)}")
print("程序将继续运行,但部分功能可能不可用")
return None

20
src/constants.py Normal file
View File

@@ -0,0 +1,20 @@
"""常量定义"""
from pathlib import Path
LOG_DIR = Path("logs")
CONFIG_FILE = Path("config.json")
STEAM_API_BASE = "https://steam.ikunshare.com/api"
STEAM_CACHE_CDN_LIST = [
"https://cache1-hkg1.steamcontent.com",
"https://cache2-hkg1.steamcontent.com",
"https://cache3-hkg1.steamcontent.com",
"https://cache4-hkg1.steamcontent.com",
"https://cache5-hkg1.steamcontent.com",
"https://cache6-hkg1.steamcontent.com",
"https://cache7-hkg1.steamcontent.com",
"https://cache8-hkg1.steamcontent.com",
"https://cache9-hkg1.steamcontent.com",
"https://cache10-hkg1.steamcontent.com",
]

67
src/logger.py Normal file
View File

@@ -0,0 +1,67 @@
"""日志模块"""
import logging
import colorama
import logzero
from logzero import setup_logger, LogFormatter
from .constants import LOG_DIR
class Logger:
"""统一的日志管理器"""
def __init__(self, name: str, debug_mode: bool = False, log_file: bool = True):
self.name = name
self.debug_mode = debug_mode
self.log_file = log_file
self._logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
"""设置日志器"""
level = logzero.DEBUG if self.debug_mode else logzero.INFO
colors = {
logzero.DEBUG: colorama.Fore.CYAN,
logzero.INFO: colorama.Fore.GREEN,
logzero.WARNING: colorama.Fore.YELLOW,
logzero.ERROR: colorama.Fore.RED,
logzero.CRITICAL: colorama.Fore.MAGENTA,
}
terminal_formatter = LogFormatter(
color=True,
fmt="%(color)s%(message)s%(end_color)s",
datefmt="%Y-%m-%d %H:%M:%S",
colors=colors,
)
logger = setup_logger(self.name, level=level, formatter=terminal_formatter)
if self.log_file:
LOG_DIR.mkdir(exist_ok=True)
logfile = LOG_DIR / f"{self.name}.log"
file_handler = logging.FileHandler(logfile, encoding="utf-8")
file_formatter = logging.Formatter(
"[%(asctime)s] | [%(name)s:%(levelname)s] | [%(module)s.%(funcName)s:%(lineno)d] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def debug(self, msg: str):
self._logger.debug(msg)
def info(self, msg: str):
self._logger.info(msg)
def warning(self, msg: str):
self._logger.warning(msg)
def error(self, msg: str):
self._logger.error(msg)
def critical(self, msg: str):
self._logger.critical(msg)

203
src/main.py Normal file
View File

@@ -0,0 +1,203 @@
from typing import List, Dict, Tuple
from .constants import STEAM_API_BASE
from .config import ConfigManager
from .logger import Logger
from .models import DepotInfo, ManifestInfo, SteamAppInfo, SteamAppManifestInfo
from .network.client import HttpClient
from .manifest_handler import ManifestHandler
class OnekeyApp:
"""Onekey主应用"""
def __init__(self):
self.config = ConfigManager()
self.logger = Logger(
"Onekey",
debug_mode=self.config.app_config.debug_mode,
log_file=self.config.app_config.logging_files,
)
self.client = HttpClient()
async def fetch_key(self):
trans = {
"week": "周卡",
"month": "月卡",
"year": "年卡",
"permanent": "永久卡",
}
try:
response = await self.client._client.post(
f"{STEAM_API_BASE}/getKeyInfo",
json={"key": self.config.app_config.key},
)
body = response.json()
if not body["info"]:
self.logger.error("卡密不存在")
return False
self.logger.info(f"卡密类型: {trans[body['info']['type']]}")
if trans[body["info"]["type"]] != "永久卡":
self.logger.info(f"卡密过期时间: {body['info']['expiresAt']}")
return True
except Exception as e:
self.logger.error(f"获取卡密信息失败: {str(e)}")
return True
async def fetch_app_data(
self, app_id: str, and_dlc: bool = True
) -> Tuple[SteamAppInfo, SteamAppManifestInfo]:
"""
从API获取应用数据
"""
main_app_manifests = []
dlc_manifests = []
try:
self.logger.info(f"正在获取游戏 {app_id} 的信息...")
response = await self.client._client.post(
f"{STEAM_API_BASE}/getGame",
json={"appId": int(app_id), "dlc": and_dlc},
headers={"X-Api-Key": self.config.app_config.key},
)
if response.status_code == 401:
self.logger.error("API密钥无效")
return []
if response.status_code != 200:
self.logger.error(f"API请求失败状态码: {response.status_code}")
return []
data = response.json()
if not data:
self.logger.error("未找到此游戏的清单信息")
return []
self.logger.info(f"游戏名称: {data.get('name', 'Unknown')}")
self.logger.info(f"Depot数量: {data.get('depotCount', 'Unknown')}")
if and_dlc:
for item in data["gameManifests"]:
manifest = ManifestInfo(
app_id=item["app_id"],
depot_id=item["depot_id"],
depot_key=item["depot_key"],
manifest_id=item["manifest_id"],
url=item["url"],
)
main_app_manifests.append(manifest)
for item in data["dlcManifests"]:
self.logger.info(f"DLC名称: {item.get('dlcName', 'Unknown')}")
self.logger.info(f"DLC AppId: {item.get('dlcId', 'Unknown')}")
for manifests in item["manifests"]:
manifest = ManifestInfo(
app_id=manifests["app_id"],
depot_id=manifests["depot_id"],
depot_key=manifests["depot_key"],
manifest_id=manifests["manifest_id"],
url=manifests["url"],
)
dlc_manifests.append(manifest)
else:
for item in data["manifests"]:
manifest = ManifestInfo(
app_id=item["app_id"],
depot_id=item["depot_id"],
depot_key=item["depot_key"],
manifest_id=item["manifest_id"],
url=item["url"],
)
main_app_manifests.append(manifest)
except Exception as e:
self.logger.error(f"获取主游戏信息失败: {str(e)}")
return SteamAppManifestInfo(mainapp=[], dlcs=[])
return SteamAppInfo(
app_id,
data["name"],
data.get("totalDLCCount", data.get("dlcCount", 0)),
data["depotCount"],
data.get("workshopDecryptionKey", "None"),
), SteamAppManifestInfo(mainapp=main_app_manifests, dlcs=dlc_manifests)
def prepare_depot_data(
self, manifests: List[ManifestInfo]
) -> tuple[List[DepotInfo], Dict[str, List[str]]]:
"""准备仓库数据"""
depot_data = []
depot_dict = {}
for manifest in manifests:
if manifest.depot_id not in depot_dict:
depot_dict[manifest.depot_id] = {
"key": manifest.depot_key,
"manifests": [],
}
depot_dict[manifest.depot_id]["manifests"].append(manifest.manifest_id)
for depot_id, info in depot_dict.items():
depot_info = DepotInfo(
depot_id=depot_id,
decryption_key=info["key"],
manifest_ids=info["manifests"],
)
depot_data.append(depot_info)
return depot_data, depot_dict
async def run(self, app_id: str, tool_type: str, dlc: bool):
"""
为Web版本提供的运行方法。
"""
try:
if not self.config.steam_path:
self.logger.error("Steam路径未配置或无效无法继续")
return False
await self.fetch_key()
manifests = []
app_info, manifests = await self.fetch_app_data(app_id, dlc)
if not manifests:
return False
manifest_handler = ManifestHandler(
self.client, self.logger, self.config.steam_path
)
processed_manifests = await manifest_handler.process_manifests(manifests)
if not processed_manifests:
self.logger.error("没有成功处理的清单")
return False
depot_data, _ = self.prepare_depot_data(processed_manifests)
self.logger.info(f"选择的解锁工具: {tool_type}")
if tool_type == "steamtools":
from .tools.steamtools import SteamTools
tool = SteamTools(self.config.steam_path)
success = await tool.setup(depot_data, app_info)
elif tool_type == "greenluma":
from .tools.greenluma import GreenLuma
tool = GreenLuma(self.config.steam_path)
success = await tool.setup(depot_data, app_id)
else:
self.logger.error("无效的工具选择")
return False
if success:
self.logger.info("游戏解锁配置成功!")
self.logger.info("重启Steam后生效")
return True
else:
self.logger.error("配置失败")
return False
except Exception as e:
self.logger.error(f"运行错误: {e}")
return False
finally:
await self.client.close()

117
src/manifest_handler.py Normal file
View File

@@ -0,0 +1,117 @@
import vdf
from pathlib import Path
from typing import List, Optional
from steam.client.cdn import DepotManifest
from .constants import STEAM_CACHE_CDN_LIST
from .models import ManifestInfo, SteamAppManifestInfo
from .logger import Logger
from .network.client import HttpClient
class ManifestHandler:
"""清单处理器"""
def __init__(self, client: HttpClient, logger: Logger, steam_path: Path):
self.client = client
self.logger = logger
self.steam_path = steam_path
self.depot_cache = steam_path / "depotcache"
self.depot_cache.mkdir(exist_ok=True)
async def download_manifest(self, manifest_info: ManifestInfo) -> Optional[bytes]:
"""下载清单文件"""
for _ in range(3):
for cdn in STEAM_CACHE_CDN_LIST:
url = cdn + manifest_info.url
try:
r = await self.client.get(url)
if r.status_code == 200:
return r.content
except Exception as e:
self.logger.debug(f"{url} 下载失败: {str(e)}")
def process_manifest(
self, manifest_data: bytes, manifest_info: ManifestInfo, remove_old: bool = True
) -> bool:
"""处理清单文件"""
try:
depot_id = manifest_info.depot_id
manifest_id = manifest_info.manifest_id
depot_key = bytes.fromhex(manifest_info.depot_key)
manifest = DepotManifest(manifest_data)
manifest_path = self.depot_cache / f"{depot_id}_{manifest_id}.manifest"
config_path = self.depot_cache / "config.vdf"
if config_path.exists():
with open(config_path, "r", encoding="utf-8") as f:
d = vdf.load(f)
else:
d = {"depots": {}}
d["depots"][depot_id] = {"DecryptionKey": depot_key.hex()}
d = {"depots": dict(sorted(d["depots"].items()))}
if remove_old:
for file in self.depot_cache.iterdir():
if file.suffix == ".manifest":
parts = file.stem.split("_")
if (
len(parts) == 2
and parts[0] == str(depot_id)
and parts[1] != str(manifest_id)
):
file.unlink(missing_ok=True)
self.logger.info(f"删除旧清单: {file.name}")
with open(manifest_path, "wb") as f:
f.write(manifest.serialize(compress=False))
with open(config_path, "w", encoding="utf-8") as f:
vdf.dump(d, f, pretty=True)
self.logger.info(f"清单处理成功: {depot_id}_{manifest_id}.manifest")
return True
except Exception as e:
self.logger.error(f"处理清单时出错: {str(e)}")
return False
async def process_manifests(
self, manifests: SteamAppManifestInfo
) -> List[ManifestInfo]:
"""批量处理清单"""
processed = []
app_manifest = manifests.mainapp
dlc_manifest = manifests.dlcs
for manifest_info in app_manifest + dlc_manifest:
manifest_path = (
self.depot_cache
/ f"{manifest_info.depot_id}_{manifest_info.manifest_id}.manifest"
)
if manifest_path.exists():
self.logger.warning(f"清单已存在: {manifest_path.name}")
processed.append(manifest_info)
continue
self.logger.info(
f"正在下载清单: {manifest_info.depot_id}_{manifest_info.manifest_id}"
)
manifest_data = await self.download_manifest(manifest_info)
if manifest_data:
if self.process_manifest(manifest_data, manifest_info):
processed.append(manifest_info)
else:
self.logger.error(
f"处理清单失败: {manifest_info.depot_id}_{manifest_info.manifest_id}"
)
else:
self.logger.error(
f"下载清单失败: {manifest_info.depot_id}_{manifest_info.manifest_id}"
)
return processed

52
src/models.py Normal file
View File

@@ -0,0 +1,52 @@
from typing import List
from dataclasses import dataclass
@dataclass
class DepotInfo:
"""仓库信息"""
depot_id: str
decryption_key: str
manifest_ids: List[str] = None
def __post_init__(self):
if self.manifest_ids is None:
self.manifest_ids = []
@dataclass
class ManifestInfo:
"""清单信息"""
app_id: int
depot_id: str
depot_key: str
manifest_id: str
url: str
@dataclass
class SteamAppInfo:
appId: int
name: str
dlcCount: int
depotCount: int
workshopDecryptionKey: str
@dataclass
class SteamAppManifestInfo:
mainapp: List[ManifestInfo]
dlcs: List[ManifestInfo]
@dataclass
class AppConfig:
"""应用配置"""
key: str = ""
custom_steam_path: str = ""
debug_mode: bool = False
logging_files: bool = True
show_console: bool = True

25
src/network/client.py Normal file
View File

@@ -0,0 +1,25 @@
"""HTTP客户端模块"""
import httpx
from typing import Optional, Dict
class HttpClient:
"""HTTP客户端封装"""
def __init__(self):
self._client = httpx.AsyncClient(verify=False, timeout=60, proxy=None)
async def get(self, url: str, headers: Optional[Dict] = None) -> httpx.Response:
"""GET请求"""
return await self._client.get(url, headers=headers)
async def close(self):
"""关闭客户端"""
await self._client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

17
src/tools/base.py Normal file
View File

@@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
from typing import List
from pathlib import Path
from ..models import DepotInfo
class UnlockTool(ABC):
"""解锁工具基类"""
def __init__(self, steam_path: Path):
self.steam_path = steam_path
@abstractmethod
async def setup(self, depot_data: List[DepotInfo], app_id: str, **kwargs) -> bool:
"""设置解锁"""
pass

39
src/tools/greenluma.py Normal file
View File

@@ -0,0 +1,39 @@
import vdf
from typing import List
from .base import UnlockTool
from ..models import DepotInfo
class GreenLuma(UnlockTool):
"""GreenLuma解锁工具实现"""
async def setup(self, depot_data: List[DepotInfo], app_id: str, **kwargs) -> bool:
"""设置GreenLuma解锁"""
applist_dir = self.steam_path / "AppList"
applist_dir.mkdir(exist_ok=True)
for f in applist_dir.glob("*.txt"):
f.unlink()
for idx, depot in enumerate(depot_data, 1):
(applist_dir / f"{idx}.txt").write_text(depot.depot_id)
config_path = self.steam_path / "config" / "config.vdf"
try:
with open(config_path, "r", encoding="utf-8") as f:
content = vdf.loads(f.read())
content.setdefault("depots", {}).update(
{
depot.depot_id: {"DecryptionKey": depot.decryption_key}
for depot in depot_data
}
)
with open(config_path, "w", encoding="utf-8") as f:
f.write(vdf.dumps(content))
return True
except Exception:
return False

42
src/tools/steamtools.py Normal file
View File

@@ -0,0 +1,42 @@
from typing import List
from datetime import datetime
from .base import UnlockTool
from ..models import DepotInfo, SteamAppInfo
class SteamTools(UnlockTool):
"""SteamTools解锁工具实现"""
async def setup(
self,
depot_data: List[DepotInfo],
app_info: SteamAppInfo,
) -> bool:
"""设置SteamTools解锁"""
st_path = self.steam_path / "config" / "stplug-in"
st_path.mkdir(parents=True, exist_ok=True)
lua_content = f"""
-- Generated Lua Manifest by Onekey
-- Steam App {app_info.appId} Manifest
-- Name: {app_info.name}
-- Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
-- Total Depots: {app_info.depotCount}
-- Total DLCs: {app_info.dlcCount}
-- MAIN APP
addappid({app_info.appId}, "0", "{app_info.workshopDecryptionKey}")
-- ALL Depots
"""
for depot in depot_data:
lua_content += (
f'addappid({depot.depot_id}, "1", "{depot.decryption_key}")\n'
)
lua_file = st_path / f"{app_info.appId}.lua"
lua_file.write_text(lua_content, encoding="utf-8")
return True

17
src/utils/steam.py Normal file
View File

@@ -0,0 +1,17 @@
from typing import Optional, Tuple
def parse_manifest_filename(filename: str) -> Tuple[Optional[str], Optional[str]]:
"""解析清单文件名"""
if not filename.endswith(".manifest"):
return None, None
name = filename.replace(".manifest", "")
if "_" not in name:
return None, None
parts = name.split("_", 1)
if len(parts) != 2 or not all(p.isdigit() for p in parts):
return None, None
return parts[0], parts[1]