This commit is contained in:
ikun0014
2025-07-24 20:54:36 +08:00
parent bb160f9f6d
commit 611edaec04
26 changed files with 0 additions and 1494 deletions

View File

@@ -1,3 +0,0 @@
__version__ = "1.5.1"
__author__ = "ikun0014"
__website__ = "ikunshare.top"

View File

@@ -1,88 +0,0 @@
import json
import sys
import time
import winreg
from pathlib import Path
from typing import Dict, Optional
from .constants import CONFIG_FILE
from .models import AppConfig
DEFAULT_CONFIG = {
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"Debug_Mode": False,
"Logging_Files": True,
"Help": "Github Personal Token可在GitHub设置的Developer settings中生成",
}
class ConfigManager:
"""配置管理器"""
def __init__(self):
self.config_path = CONFIG_FILE
self._config_data: Dict = {}
self.app_config: AppConfig = AppConfig()
self.steam_path: Optional[Path] = None
self._load_config()
def _generate_config(self) -> None:
"""生成默认配置文件"""
try:
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(DEFAULT_CONFIG, f, indent=2, ensure_ascii=False)
print("配置文件已生成")
except IOError as e:
print(f"配置文件创建失败: {str(e)}")
sys.exit(1)
def _load_config(self) -> None:
"""加载配置文件"""
if not self.config_path.exists():
self._generate_config()
print("请填写配置文件后重新运行程序5秒后退出")
time.sleep(5)
sys.exit(1)
try:
with open(self.config_path, "r", encoding="utf-8") as f:
self._config_data = json.load(f)
self.app_config = AppConfig(
github_token=self._config_data.get("Github_Personal_Token", ""),
custom_steam_path=self._config_data.get("Custom_Steam_Path", ""),
debug_mode=self._config_data.get("Debug_Mode", False),
logging_files=self._config_data.get("Logging_Files", True),
)
self.steam_path = self._get_steam_path()
except json.JSONDecodeError:
print("配置文件损坏,正在重新生成...")
self._generate_config()
sys.exit(1)
except Exception as e:
print(f"配置加载失败: {str(e)}")
sys.exit(1)
def _get_steam_path(self) -> Path:
"""获取Steam安装路径"""
try:
if self.app_config.custom_steam_path:
return Path(self.app_config.custom_steam_path)
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER, r"Software\Valve\Steam"
) as key:
return Path(winreg.QueryValueEx(key, "SteamPath")[0])
except Exception as e:
print(f"Steam路径获取失败: {str(e)}")
sys.exit(1)
@property
def github_headers(self) -> Optional[Dict[str, str]]:
"""获取GitHub请求头"""
if self.app_config.github_token:
return {"Authorization": f"Bearer {self.app_config.github_token}"}
return None

View File

@@ -1,35 +0,0 @@
"""常量定义"""
from pathlib import Path
APP_NAME = "Onekey"
BANNER = r"""
_____ __ _ _____ _ _ _____ __ __
/ _ \ | \ | | | ____| | | / / | ____| \ \ / /
| | | | | \| | | |__ | |/ / | |__ \ \/ /
| | | | | |\ | | __| | |\ \ | __| \ /
| |_| | | | \ | | |___ | | \ \ | |___ / /
\_____/ |_| \_| |_____| |_| \_\ |_____| /_/
"""
REPO_LIST = [
"SteamAutoCracks/ManifestHub",
"ikun0014/ManifestHub",
"Auiowu/ManifestAutoUpdate",
"tymolu233/ManifestAutoUpdate-fix",
]
CN_CDN_LIST = [
"https://cdn.jsdmirror.com/gh/{repo}@{sha}/{path}",
"https://raw.gitmirror.com/{repo}/{sha}/{path}",
"https://raw.dgithub.xyz/{repo}/{sha}/{path}",
"https://gh.akass.cn/{repo}/{sha}/{path}",
]
GLOBAL_CDN_LIST = ["https://raw.githubusercontent.com/{repo}/{sha}/{path}"]
GITHUB_API_BASE = "https://api.github.com"
REGION_CHECK_URL = "https://mips.kugou.com/check/iscn?&format=json"
LOG_DIR = Path("logs")
CONFIG_FILE = Path("config.json")

View File

@@ -1,67 +0,0 @@
"""日志模块"""
import logging
import colorama
import logzero
from logzero import setup_logger, LogFormatter
from .constants import LOG_DIR
class Logger:
"""统一的日志管理器"""
def __init__(self, name: str, debug_mode: bool = False, log_file: bool = True):
self.name = name
self.debug_mode = debug_mode
self.log_file = log_file
self._logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
"""设置日志器"""
level = logzero.DEBUG if self.debug_mode else logzero.INFO
colors = {
logzero.DEBUG: colorama.Fore.CYAN,
logzero.INFO: colorama.Fore.GREEN,
logzero.WARNING: colorama.Fore.YELLOW,
logzero.ERROR: colorama.Fore.RED,
logzero.CRITICAL: colorama.Fore.MAGENTA,
}
terminal_formatter = LogFormatter(
color=True,
fmt="%(color)s%(message)s%(end_color)s",
datefmt="%Y-%m-%d %H:%M:%S",
colors=colors,
)
logger = setup_logger(self.name, level=level, formatter=terminal_formatter)
if self.log_file:
LOG_DIR.mkdir(exist_ok=True)
logfile = LOG_DIR / f"{self.name}.log"
file_handler = logging.FileHandler(logfile, encoding="utf-8")
file_formatter = logging.Formatter(
"[%(asctime)s] | [%(name)s:%(levelname)s] | [%(module)s.%(funcName)s:%(lineno)d] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def debug(self, msg: str):
self._logger.debug(msg)
def info(self, msg: str):
self._logger.info(msg)
def warning(self, msg: str):
self._logger.warning(msg)
def error(self, msg: str):
self._logger.error(msg)
def critical(self, msg: str):
self._logger.critical(msg)

View File

@@ -1,170 +0,0 @@
import traceback
from typing import List, Dict, Tuple
from . import __version__, __author__, __website__
from .constants import BANNER, REPO_LIST
from .config import ConfigManager
from .logger import Logger
from .models import DepotInfo
from .network.client import HttpClient
from .network.github import GitHubAPI
from .utils.region import RegionDetector
from .utils.steam import parse_key_file, parse_manifest_filename
from .tools.steamtools import SteamTools
from .tools.greenluma import GreenLuma
class OnekeyApp:
"""Onekey主应用"""
def __init__(self):
self.config = ConfigManager()
self.logger = Logger(
"Onekey",
debug_mode=self.config.app_config.debug_mode,
log_file=self.config.app_config.logging_files,
)
self.client = HttpClient()
self.github = GitHubAPI(self.client, self.config.github_headers, self.logger)
def show_banner(self):
"""显示横幅"""
self.logger.info(BANNER)
self.logger.info(
f"本程序源代码基于GPL 2.0许可证开放于Github"
)
self.logger.info(
f"作者: {__author__} | 版本: {__version__} | 官网: {__website__}"
)
self.logger.info("项目仓库: GitHub: https://github.com/ikunshare/Onekey")
self.logger.warning("ikunshare.top | 严禁倒卖")
self.logger.warning(
"提示: 请确保已安装Windows 10/11并正确配置Steam;SteamTools/GreenLuma"
)
if not self.config.app_config.github_token:
self.logger.warning("开梯子必须配置Token, 你的IP我不相信能干净到哪")
async def handle_depot_files(
self, app_id: str
) -> Tuple[List[DepotInfo], Dict[str, List[str]]]:
"""处理仓库文件"""
depot_list = []
depot_map = {}
repo_info = await self.github.get_latest_repo_info(REPO_LIST, app_id)
if not repo_info:
return depot_list, depot_map
self.logger.info(f"当前选择清单仓库: https://github.com/{repo_info.name}")
self.logger.info(f"此清单分支上次更新时间:{repo_info.last_update}")
branch_url = f"https://api.github.com/repos/{repo_info.name}/branches/{app_id}"
branch_res = await self.client.get(
branch_url, headers=self.config.github_headers
)
branch_res.raise_for_status()
tree_url = branch_res.json()["commit"]["commit"]["tree"]["url"]
tree_res = await self.client.get(tree_url)
tree_res.raise_for_status()
depot_cache = self.config.steam_path / "depotcache"
depot_cache.mkdir(exist_ok=True)
for item in tree_res.json()["tree"]:
file_path = item["path"]
if file_path.endswith(".manifest"):
save_path = depot_cache / file_path
if save_path.exists():
self.logger.warning(f"已存在清单: {save_path}")
continue
content = await self.github.fetch_file(
repo_info.name, repo_info.sha, file_path
)
save_path.write_bytes(content)
self.logger.info(f"清单下载成功: {file_path}")
depot_id, manifest_id = parse_manifest_filename(file_path)
if depot_id and manifest_id:
depot_map.setdefault(depot_id, []).append(manifest_id)
elif "key.vdf" in file_path.lower():
key_content = await self.github.fetch_file(
repo_info.name, repo_info.sha, file_path
)
depot_list.extend(parse_key_file(key_content))
for depot_id in depot_map:
depot_map[depot_id].sort(key=lambda x: int(x), reverse=True)
return depot_list, depot_map
async def run(self, app_id: str):
"""运行主程序"""
try:
detector = RegionDetector(self.client, self.logger)
is_cn, country = await detector.check_cn()
self.github.is_cn = is_cn
await self.github.check_rate_limit()
self.logger.info(f"正在处理游戏 {app_id}...")
depot_data, depot_map = await self.handle_depot_files(app_id)
if not depot_data:
self.logger.error("未找到此游戏的清单")
return
print("\n请选择解锁工具:")
print("1. SteamTools")
print("2. GreenLuma")
choice = input("请输入选择 (1/2): ").strip()
if choice == "1":
tool = SteamTools(self.config.steam_path)
version_lock = False
lock_choice = input(
"是否锁定版本(推荐在选择仓库SteamAutoCracks/ManifestHub时使用)?(y/n): "
).lower()
if lock_choice == "y":
version_lock = True
success = await tool.setup(
depot_data, app_id, depot_map=depot_map, version_lock=version_lock
)
elif choice == "2":
tool = GreenLuma(self.config.steam_path)
success = await tool.setup(depot_data, app_id)
else:
self.logger.error("无效的选择")
return
if success:
self.logger.info("游戏解锁配置成功!")
self.logger.info("重启Steam后生效")
else:
self.logger.error("配置失败")
except Exception as e:
self.logger.error(f"运行错误: {traceback.format_exc()}")
finally:
await self.client.close()
async def main():
"""程序入口"""
app = OnekeyApp()
app.show_banner()
app_id = input("\n请输入游戏AppID: ").strip()
app_id_list = [id for id in app_id.split("-") if id.isdigit()]
if not app_id_list:
app.logger.error("App ID无效")
return
await app.run(app_id_list[0])

View File

@@ -1,35 +0,0 @@
from typing import List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DepotInfo:
"""仓库信息"""
depot_id: str
decryption_key: str
manifest_ids: List[str] = None
def __post_init__(self):
if self.manifest_ids is None:
self.manifest_ids = []
@dataclass
class RepoInfo:
"""GitHub仓库信息"""
name: str
last_update: datetime
sha: str
@dataclass
class AppConfig:
"""应用配置"""
github_token: str = ""
custom_steam_path: str = ""
debug_mode: bool = False
logging_files: bool = True

View File

@@ -1,25 +0,0 @@
"""HTTP客户端模块"""
import httpx
from typing import Optional, Dict
class HttpClient:
"""HTTP客户端封装"""
def __init__(self):
self._client = httpx.AsyncClient(verify=False, timeout=30.0)
async def get(self, url: str, headers: Optional[Dict] = None) -> httpx.Response:
"""GET请求"""
return await self._client.get(url, headers=headers)
async def close(self):
"""关闭客户端"""
await self._client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

View File

@@ -1,92 +0,0 @@
import time
from typing import List, Dict, Optional
from datetime import datetime
from ..constants import GITHUB_API_BASE, CN_CDN_LIST, GLOBAL_CDN_LIST
from ..models import RepoInfo
from ..logger import Logger
from .client import HttpClient
class GitHubAPI:
"""GitHub API封装"""
def __init__(
self,
client: HttpClient,
headers: Optional[Dict] = None,
logger: Optional[Logger] = None,
):
self.client = client
self.headers = headers or {}
self.logger = logger or Logger("GitHubAPI")
self.is_cn = True
async def check_rate_limit(self) -> None:
"""检查API请求限制"""
url = f"{GITHUB_API_BASE}/rate_limit"
try:
r = await self.client.get(url, headers=self.headers)
if r.status_code == 200:
r_json = r.json()
rate_limit = r_json.get("rate", {})
remaining = rate_limit.get("remaining", 0)
reset_time = rate_limit.get("reset", 0)
reset_formatted = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(reset_time)
)
self.logger.info(f"剩余Github API请求次数: {remaining}")
if remaining == 0:
self.logger.warning(
f"GitHub API 请求数已用尽, 将在 {reset_formatted} 重置"
)
else:
self.logger.error("Github请求数检查失败, 网络错误")
except Exception as e:
self.logger.error(f"检查Github API 请求数失败: {str(e)}")
async def get_latest_repo_info(
self, repos: List[str], app_id: str
) -> Optional[RepoInfo]:
"""获取最新的仓库信息"""
latest_date = None
selected_repo = None
selected_sha = None
for repo in repos:
url = f"{GITHUB_API_BASE}/repos/{repo}/branches/{app_id}"
try:
r = await self.client.get(url, headers=self.headers)
if r.status_code == 200:
r_json = r.json()
if "commit" in r_json:
date_str = r_json["commit"]["commit"]["author"]["date"]
date = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
if latest_date is None or date > latest_date:
latest_date = date
selected_repo = repo
selected_sha = r_json["commit"]["sha"]
except Exception as e:
self.logger.warning(f"检查仓库 {repo} 失败: {str(e)}")
if selected_repo:
return RepoInfo(
name=selected_repo, last_update=latest_date, sha=selected_sha
)
return None
async def fetch_file(self, repo: str, sha: str, path: str) -> bytes:
"""获取文件内容"""
cdn_list = CN_CDN_LIST if self.is_cn else GLOBAL_CDN_LIST
for _ in range(3):
for cdn_template in cdn_list:
url = cdn_template.format(repo=repo, sha=sha, path=path)
try:
r = await self.client.get(url, headers=self.headers)
if r.status_code == 200:
return r.content
except Exception as e:
self.logger.debug(f"{url} 下载失败: {str(e)}")
raise Exception(f"无法下载文件: {path}")

View File

@@ -1,17 +0,0 @@
from abc import ABC, abstractmethod
from typing import List
from pathlib import Path
from ..models import DepotInfo
class UnlockTool(ABC):
"""解锁工具基类"""
def __init__(self, steam_path: Path):
self.steam_path = steam_path
@abstractmethod
async def setup(self, depot_data: List[DepotInfo], app_id: str, **kwargs) -> bool:
"""设置解锁"""
pass

View File

@@ -1,39 +0,0 @@
import vdf
from typing import List
from .base import UnlockTool
from ..models import DepotInfo
class GreenLuma(UnlockTool):
"""GreenLuma解锁工具实现"""
async def setup(self, depot_data: List[DepotInfo], app_id: str, **kwargs) -> bool:
"""设置GreenLuma解锁"""
applist_dir = self.steam_path / "AppList"
applist_dir.mkdir(exist_ok=True)
for f in applist_dir.glob("*.txt"):
f.unlink()
for idx, depot in enumerate(depot_data, 1):
(applist_dir / f"{idx}.txt").write_text(depot.depot_id)
config_path = self.steam_path / "config" / "config.vdf"
try:
with open(config_path, "r", encoding="utf-8") as f:
content = vdf.loads(f.read())
content.setdefault("depots", {}).update(
{
depot.depot_id: {"DecryptionKey": depot.decryption_key}
for depot in depot_data
}
)
with open(config_path, "w", encoding="utf-8") as f:
f.write(vdf.dumps(content))
return True
except Exception:
return False

View File

@@ -1,38 +0,0 @@
from typing import List, Dict
from .base import UnlockTool
from ..models import DepotInfo
class SteamTools(UnlockTool):
"""SteamTools解锁工具实现"""
async def setup(
self,
depot_data: List[DepotInfo],
app_id: str,
depot_map: Dict[str, List[str]] = None,
version_lock: bool = False,
) -> bool:
"""设置SteamTools解锁"""
st_path = self.steam_path / "config" / "stplug-in"
st_path.mkdir(parents=True, exist_ok=True)
lua_content = f'addappid({app_id}, 1, "None")\n'
for depot in depot_data:
if version_lock and depot_map and depot.depot_id in depot_map:
for manifest_id in depot_map[depot.depot_id]:
lua_content += (
f'addappid({depot.depot_id}, 1, "{depot.decryption_key}")\n'
f'setManifestid({depot.depot_id},"{manifest_id}")\n'
)
else:
lua_content += (
f'addappid({depot.depot_id}, 1, "{depot.decryption_key}")\n'
)
lua_file = st_path / f"{app_id}.lua"
lua_file.write_text(lua_content, encoding="utf-8")
return True

View File

@@ -1,32 +0,0 @@
from typing import Tuple
from ..constants import REGION_CHECK_URL
from ..network.client import HttpClient
from ..logger import Logger
class RegionDetector:
"""地区检测器"""
def __init__(self, client: HttpClient, logger: Logger):
self.client = client
self.logger = logger
async def check_cn(self) -> Tuple[bool, str]:
"""检查是否在中国大陆"""
try:
req = await self.client.get(REGION_CHECK_URL)
body = req.json()
is_cn = bool(body.get("flag", True))
country = body.get("country", "Unknown")
if not is_cn:
self.logger.info(
f"您在非中国大陆地区({country})上使用了项目, "
"已自动切换回Github官方下载CDN"
)
return is_cn, country
except Exception as e:
self.logger.warning("检查服务器位置失败,自动认为你在中国大陆")
return True, "CN"

View File

@@ -1,32 +0,0 @@
import vdf
from typing import List, Optional, Tuple
from ..models import DepotInfo
def parse_key_file(content: bytes) -> List[DepotInfo]:
"""解析密钥文件"""
try:
depots = vdf.loads(content.decode("utf-8"))["depots"]
return [
DepotInfo(depot_id=d_id, decryption_key=d_info["DecryptionKey"])
for d_id, d_info in depots.items()
]
except Exception:
return []
def parse_manifest_filename(filename: str) -> Tuple[Optional[str], Optional[str]]:
"""解析清单文件名"""
if not filename.endswith(".manifest"):
return None, None
name = filename.replace(".manifest", "")
if "_" not in name:
return None, None
parts = name.split("_", 1)
if len(parts) != 2 or not all(p.isdigit() for p in parts):
return None, None
return parts[0], parts[1]