清单库新增+优化

This commit is contained in:
ikun0014
2025-02-27 19:09:20 +08:00
parent 485fca07f1
commit cd18a2f49d
3 changed files with 629 additions and 439 deletions

707
main.py
View File

@@ -1,494 +1,323 @@
DEFAULT_CONFIG = {
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"QA1": "Github Personal Token可在GitHub设置的Developer settings中生成",
"教程": "https://ikunshare.com/Onekey_tutorial",
}
import platform
import sys
import os
import traceback
import time
import logging
import subprocess
import logzero
import asyncio
import aiofiles
import colorlog
import httpx
import winreg
import ujson as json
import vdf
from typing import Any
from typing import Any, Tuple, List, Dict
from pathlib import Path
from colorama import init, Fore, Back, Style
from enum import Enum
init()
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
lock = asyncio.Lock()
client = httpx.AsyncClient(verify=False)
class RepoChoice(Enum):
IKUN = ("ikun0014/ManifestHub", "已断更的旧仓库")
AUIOWU = ("Auiowu/ManifestAutoUpdate", "未知维护状态的仓库")
STEAM_AUTO = ("SteamAutoCracks/ManifestHub", "官方推荐仓库")
DEFAULT_CONFIG = {
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"QA1": "温馨提示: Github_Personal_Token可在Github设置的最底下开发者选项找到, 详情看教程",
"教程": "https://ikunshare.com/Onekey_tutorial"
"QA1": "Github Personal Token可在GitHub设置的Developer settings中生成",
"教程": "https://ikunshare.com/Onekey_tutorial",
}
LOG_FORMAT = '%(log_color)s%(message)s'
LOG_COLORS = {
'INFO': 'cyan',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'purple',
}
DEFAULT_REPO = RepoChoice.STEAM_AUTO
WINDOWS_VERSIONS = ["10", "11"]
STEAM_REG_PATH = r"Software\Valve\Steam"
CONFIG_PATH = Path("./config.json")
LOCK = asyncio.Lock()
CLIENT = httpx.AsyncClient(verify=False, timeout=30)
log = logzero.setup_logger("Onekey")
def init_log(level=logging.DEBUG) -> logging.Logger:
""" 初始化日志模块 """
logger = logging.getLogger('Onekey')
logger.setLevel(level)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(level)
fmt = colorlog.ColoredFormatter(LOG_FORMAT, log_colors=LOG_COLORS)
stream_handler.setFormatter(fmt)
if not logger.handlers:
logger.addHandler(stream_handler)
return logger
def init() -> None:
"""初始化控制台输出"""
banner = r"""
_____ __ _ _____ _ _ _____ __ __
/ _ \ | \ | | | ____| | | / / | ____| \ \ / /
| | | | | \| | | |__ | |/ / | |__ \ \/ /
| | | | | |\ | | __| | |\ \ | __| \ /
| |_| | | | \ | | |___ | | \ \ | |___ / /
\_____/ |_| \_| |_____| |_| \_\ |_____| /_/
"""
print(banner)
print("作者: ikun0014 | 版本: 1.3.7 | 官网: ikunshare.com")
print("项目仓库: GitHub: https://github.com/ikunshare/Onekey")
print("提示: 请确保已安装最新版Windows 10/11并正确配置Steam")
log = init_log()
def validate_windows_version() -> None:
"""验证Windows版本"""
if platform.system() != "Windows":
log.error("仅支持Windows操作系统")
sys.exit(1)
release = platform.uname().release
if release not in WINDOWS_VERSIONS:
log.error(f"需要Windows 10/11当前版本: Windows {release}")
sys.exit(1)
def init():
""" 输出初始化信息 """
banner_lines = [
" _____ __ _ _____ _ _ _____ __ __ ",
" / _ \\ | \\ | | | ____| | | / / | ____| \\ \\ / /",
" | | | | | \\| | | |__ | |/ / | |__ \\ \\/ /",
" | | | | | |\\ | | __| | |\\ \\ | __| \\ / ",
" | |_| | | | \\ | | |___ | | \\ \\ | |___ / /",
" \\_____/ |_| \\_| |_____| |_| \\_\\ |_____| /_/",
]
for line in banner_lines:
log.info(line)
async def load_config() -> Dict[str, Any]:
"""异步加载配置文件"""
if not CONFIG_PATH.exists():
await generate_config()
log.info("请填写配置文件后重新运行程序")
sys.exit(0)
log.info('作者: ikun0014')
log.warning('本项目采用GNU General Public License v3开源许可证, 请勿用于商业用途')
log.info('版本: 1.3.6')
log.info(
'项目Github仓库: https://github.com/ikunshare/Onekey \n Gitee: https://gitee.com/ikun0014/Onekey'
)
log.info('官网: ikunshare.com')
log.warning(
'本项目完全开源免费, 如果你在淘宝, QQ群内通过购买方式获得, 赶紧回去骂商家死全家\n 交流群组:\n https://t.me/ikunshare_qun'
)
log.info('App ID可以在SteamDB, SteamUI或Steam商店链接页面查看')
def stack_error(exception: Exception) -> str:
""" 处理错误堆栈 """
stack_trace = traceback.format_exception(
type(exception), exception, exception.__traceback__)
return ''.join(stack_trace)
async def gen_config_file():
""" 生成配置文件 """
try:
async with aiofiles.open("./config.json", mode="w", encoding="utf-8") as f:
await f.write(json.dumps(DEFAULT_CONFIG, indent=2, ensure_ascii=False, escape_forward_slashes=False))
log.info('程序可能为第一次启动或配置重置,请填写配置文件后重新启动程序')
except KeyboardInterrupt:
log.info("程序已退出")
async with aiofiles.open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.loads(await f.read())
except json.JSONDecodeError:
log.error("配置文件损坏,正在重新生成...")
await generate_config()
sys.exit(1)
except Exception as e:
log.error(f'配置文件生成失败,{stack_error(e)}')
log.error(f"配置加载失败: {str(e)}")
sys.exit(1)
async def load_config() -> dict:
""" 加载配置文件 """
if not os.path.exists('./config.json'):
await gen_config_file()
os.system('pause')
sys.exit()
async def generate_config() -> None:
"""生成默认配置文件"""
try:
async with aiofiles.open("./config.json", mode="r", encoding="utf-8") as f:
config = json.loads(await f.read())
return config
except KeyboardInterrupt:
log.info("程序已退出")
async with aiofiles.open(CONFIG_PATH, "w", encoding="utf-8") as f:
await f.write(json.dumps(DEFAULT_CONFIG, indent=2, ensure_ascii=False))
log.info("配置文件已生成")
except IOError as e:
log.error(f"配置文件创建失败: {str(e)}")
sys.exit(1)
def get_steam_path(config: Dict) -> Path:
"""获取Steam安装路径"""
try:
if custom_path := config.get("Custom_Steam_Path"):
return Path(custom_path)
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, STEAM_REG_PATH) as key:
return Path(winreg.QueryValueEx(key, "SteamPath")[0])
except Exception as e:
log.error(f"配置文件加载失败,原因: {stack_error(e)},重置配置文件中...")
os.remove("./config.json")
await gen_config_file()
os.system('pause')
sys.exit()
config = asyncio.run(load_config())
log.error(f"Steam路径获取失败: {str(e)}")
sys.exit(1)
async def check_github_api_rate_limit(headers):
""" 检查Github请求数 """
if headers != None:
log.info(f"您已配置Github Token")
url = 'https://api.github.com/rate_limit'
async def download_key(file_path: str, repo: str, sha: str) -> bytes:
"""下载密钥文件"""
try:
r = await client.get(url, headers=headers)
r_json = r.json()
if r.status_code == 200:
rate_limit = r_json.get('rate', {})
remaining_requests = rate_limit.get('remaining', 0)
reset_time = rate_limit.get('reset', 0)
reset_time_formatted = time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(reset_time))
log.info(f'剩余请求次数: {remaining_requests}')
if remaining_requests == 0:
log.warning(f'GitHub API 请求数已用尽, 将在 {reset_time_formatted} 重置,建议生成一个填在配置文件里')
else:
log.error('Github请求数检查失败, 网络错误')
except KeyboardInterrupt:
log.info("程序已退出")
except httpx.ConnectError as e:
log.error(f'检查Github API 请求数失败, {stack_error(e)}')
except httpx.ConnectTimeout as e:
log.error(f'检查Github API 请求数超时: {stack_error(e)}')
return await fetch_from_cdn(file_path, repo, sha)
except Exception as e:
log.error(f'发生错误: {stack_error(e)}')
async def checkcn() -> bool:
try:
req = await client.get('https://mips.kugou.com/check/iscn?&format=json')
body = req.json()
scn = bool(body['flag'])
if (not scn):
log.info(
f"您在非中国大陆地区({body['country']})上使用了项目, 已自动切换回Github官方下载CDN")
os.environ['IS_CN'] = 'no'
return False
else:
os.environ['IS_CN'] = 'yes'
return True
except KeyboardInterrupt:
log.info("程序已退出")
except httpx.ConnectError as e:
os.environ['IS_CN'] = 'yes'
log.warning('检查服务器位置失败,已忽略,自动认为你在中国大陆')
log.warning(stack_error(e))
return False
async def depotkey_merge(config_path: Path, depots_config: dict) -> bool:
if not config_path.exists():
async with lock:
log.error('Steam默认配置不存在, 可能是没有登录账号')
return False
try:
async with aiofiles.open(config_path, encoding='utf-8') as f:
content = await f.read()
config = vdf.loads(content)
steam = config.get('InstallConfigStore', {}).get('Software', {}).get('Valve') or config.get('InstallConfigStore', {}).get('Software', {}).get('valve')
if steam is None:
log.error('找不到Steam配置, 请检查配置文件')
return False
depots = steam.setdefault('depots', {})
depots.update(depots_config.get('depots', {}))
async with aiofiles.open(config_path, mode='w', encoding='utf-8') as f:
new_context = vdf.dumps(config, pretty=True)
await f.write(new_context)
log.info('成功合并')
return True
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
async with lock:
log.error(f'合并失败, 原因: {e}')
return False
async def get(sha: str, path: str, repo: str):
if os.environ.get('IS_CN') == 'yes':
url_list = [
f'https://jsdelivr.pai233.top/gh/{repo}@{sha}/{path}',
f'https://cdn.jsdmirror.com/gh/{repo}@{sha}/{path}',
f'https://raw.gitmirror.com/{repo}/{sha}/{path}',
f'https://raw.dgithub.xyz/{repo}/{sha}/{path}',
f'https://gh.akass.cn/{repo}/{sha}/{path}'
]
else:
url_list = [
f'https://raw.githubusercontent.com/{repo}/{sha}/{path}'
]
retry = 3
while retry > 0:
for url in url_list:
try:
r = await client.get(url, timeout=30)
if r.status_code == 200:
return r.read()
else:
log.error(f'获取失败: {path} - 状态码: {r.status_code}')
except KeyboardInterrupt:
log.info("程序已退出")
except httpx.ConnectError as e:
log.error(f'获取失败: {path} - 连接错误: {str(e)}')
except httpx.ConnectTimeout as e:
log.error(f'连接超时: {url} - 错误: {str(e)}')
retry -= 1
log.warning(f'重试剩余次数: {retry} - {path}')
log.error(f'超过最大重试次数: {path}')
raise Exception(f'无法下载: {path}')
async def get_manifest(sha: str, path: str, steam_path: Path, repo: str) -> list:
collected_depots = []
depot_cache_path = steam_path / 'depotcache'
try:
depot_cache_path.mkdir(exist_ok=True)
if path.endswith('.manifest'):
save_path = depot_cache_path / path
if save_path.exists():
log.warning(f'已存在清单: {save_path}')
return collected_depots
content = await get(sha, path, repo)
log.info(f'清单下载成功: {path}')
async with aiofiles.open(save_path, 'wb') as f:
await f.write(content)
elif path == 'Key.vdf':
content = await get(sha, path, repo)
log.info(f'密钥下载成功: {path}')
depots_config = vdf.loads(content.decode('utf-8'))
depots = dict(depots_config.get('depots'))
collected_depots = [
(depot_id, depot_info['DecryptionKey'])
for depot_id, depot_info in depots.items()
]
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
log.error(f'处理失败: {path} - {stack_error(e)}')
log.error(f"密钥下载失败: {str(e)}")
raise
return collected_depots
def get_steam_path() -> Path:
async def handle_depot_files(
repo: str, app_id: str, steam_path: Path
) -> List[Tuple[str, str]]:
"""处理清单文件和密钥"""
collected = []
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Valve\Steam')
steam_path = Path(winreg.QueryValueEx(key, 'SteamPath')[0])
async with httpx.AsyncClient() as client:
branch_url = f"https://api.github.com/repos/{repo}/branches/{app_id}"
branch_res = await client.get(branch_url)
branch_res.raise_for_status()
custom_steam_path = config.get("Custom_Steam_Path", "").strip()
return Path(custom_steam_path) if custom_steam_path else steam_path
except KeyboardInterrupt:
log.info("程序已退出")
tree_url = branch_res.json()["commit"]["commit"]["tree"]["url"]
tree_res = await client.get(tree_url)
tree_res.raise_for_status()
depot_cache = steam_path / "depotcache"
depot_cache.mkdir(exist_ok=True)
for item in tree_res.json()["tree"]:
file_path = item["path"]
if file_path.endswith(".manifest"):
await download_manifest(
file_path, depot_cache, repo, branch_res.json()["commit"]["sha"]
)
elif "key.vdf" in file_path.lower():
key_content = await download_key(
file_path, repo, branch_res.json()["commit"]["sha"]
)
collected.extend(parse_key_vdf(key_content))
except httpx.HTTPStatusError as e:
log.error(f"HTTP错误: {e.response.status_code}")
except Exception as e:
log.error(f'Steam路径获取失败, {stack_error(e)}, 请检查是否正确安装Steam')
os.system('pause')
return Path()
log.error(f"文件处理失败: {str(e)}")
return collected
SP = get_steam_path()
GL = any((SP / dll).exists() for dll in ['GreenLuma_2024_x86.dll', 'GreenLuma_2024_x64.dll', 'User32.dll'])
ST = (SP / 'config' / 'stUI').is_dir()
ST_PT = Path(SP) / "config" / "stplug-in"
TP = Path('./temp')
ST_STP_URL = 'https://steamtools.net/res/SteamtoolsSetup.exe'
STP_FILE = TP / 'SteamtoolsSetup.exe'
async def download_manifest(path: str, save_dir: Path, repo: str, sha: str) -> None:
"""下载清单文件"""
save_path = save_dir / path
if save_path.exists():
log.warning(f"清单已存在: {path}")
return
content = await fetch_from_cdn(path, repo, sha)
async with aiofiles.open(save_path, "wb") as f:
await f.write(content)
log.info(f"清单下载成功: {path}")
async def download_setup_file() -> None:
log.info('开始下载 SteamTools 安装程序...')
try:
r = await client.get(ST_STP_URL, timeout=30)
if r.status_code == 200:
async with aiofiles.open(STP_FILE, mode='wb') as f:
await f.write(r.read())
log.info('安装程序下载完成')
else:
log.error(f'网络错误,无法下载安装程序,状态码: {r.status_code}')
except KeyboardInterrupt:
log.info("程序已退出")
except httpx.ConnectTimeout:
log.error('下载时超时')
except Exception as e:
log.error(f'下载失败: {e}')
async def fetch_from_cdn(path: str, repo: str, sha: str) -> bytes:
"""从CDN获取资源"""
mirrors = (
[
f"https://jsdelivr.pai233.top/gh/{repo}@{sha}/{path}",
f"https://cdn.jsdmirror.com/gh/{repo}@{sha}/{path}",
f"https://raw.gitmirror.com/{repo}/{sha}/{path}",
]
if os.environ.get("IS_CN") == "yes"
else [f"https://raw.githubusercontent.com/{repo}/{sha}/{path}"]
)
async def migrate() -> None:
try:
log.info('检测到你正在使用 SteamTools,尝试迁移旧文件')
if SP.exists():
for file in SP.iterdir():
if file.is_file() and file.name.startswith("Onekey_unlock_"):
new_filename = file.name[len("Onekey_unlock_"):]
try:
file.rename(SP / new_filename)
log.info(f'Renamed: {file.name} -> {new_filename}')
except Exception as e:
log.error(
f'重命名失败 {file.name} -> {new_filename}: {e}')
else:
log.error('故障,正在重新安装 SteamTools')
TP.mkdir(parents=True, exist_ok=True)
await download_setup_file(client)
subprocess.run(str(STP_FILE), check=True)
for file in TP.iterdir():
file.unlink()
TP.rmdir()
except KeyboardInterrupt:
log.info("程序已退出")
async def stool_add(depot_data: list, app_id: str) -> bool:
lua_filename = f"{app_id}.lua"
lua_filepath = SP / "config" / "stplug-in" / lua_filename
async with lock:
log.info(f'SteamTools 解锁文件生成: {lua_filepath}')
for url in mirrors:
try:
async with aiofiles.open(lua_filepath, mode="w", encoding="utf-8") as lua_file:
await lua_file.write(f'addappid({app_id}, 1, "None")\n')
for depot_id, depot_key in depot_data:
await lua_file.write(f'addappid({depot_id}, 1, "{depot_key}")\n')
luapacka_path = SP / "config" / "stplug-in" / "luapacka.exe"
log.info(f'正在处理文件: {lua_filepath}')
result = subprocess.run(
[str(luapacka_path), str(lua_filepath)],
capture_output=True
)
if result.returncode != 0:
log.error(f'调用失败: {result.stderr.decode()}')
return False
log.info('处理完成')
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
log.error(f'处理过程出现错误: {e}')
return False
finally:
if lua_filepath.exists():
os.remove(lua_filepath)
log.info(f'删除临时文件: {lua_filepath}')
res = await CLIENT.get(url)
res.raise_for_status()
return res.content
except httpx.HTTPError:
continue
raise Exception("所有镜像源均不可用")
def parse_key_vdf(content: bytes) -> List[Tuple[str, str]]:
"""解析密钥文件"""
try:
depots = vdf.loads(content.decode("utf-8"))["depots"]
return [(d_id, d_info["DecryptionKey"]) for d_id, d_info in depots.items()]
except Exception as e:
log.error(f"密钥解析失败: {str(e)}")
return []
async def setup_unlock_tool(
config: Dict, depot_data: List[Tuple[str, str]], app_id: str, tool_choice: int
) -> bool:
"""配置解锁工具"""
if tool_choice == 1:
return await setup_steamtools(config, depot_data, app_id)
elif tool_choice == 2:
return await setup_greenluma(config, depot_data)
else:
log.error("无效的工具选择")
return False
async def setup_steamtools(
config: Dict, depot_data: List[Tuple[str, str]], app_id: str
) -> bool:
"""配置SteamTools"""
steam_path = (
Path(config["Custom_Steam_Path"])
if config.get("Custom_Steam_Path")
else get_steam_path(config)
)
st_path = steam_path / "config" / "stplug-in"
st_path.mkdir(exist_ok=True)
lua_content = f'addappid({app_id}, 1, "None")\n'
for d_id, d_key in depot_data:
lua_content += f'addappid({d_id}, 1, "{d_key}")\n'
lua_file = st_path / f"{app_id}.lua"
async with aiofiles.open(lua_file, "w") as f:
await f.write(lua_content)
proc = await asyncio.create_subprocess_exec(
str(st_path / "luapacka.exe"),
str(lua_file),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.wait()
if proc.returncode != 0:
log.error(f"Lua编译失败: {await proc.stderr.read()}")
return False
return True
async def greenluma_add(depot_id_list: list) -> bool:
app_list_path = SP / 'AppList'
try:
app_list_path.mkdir(parents=True, exist_ok=True)
for file in app_list_path.glob('*.txt'):
file.unlink(missing_ok=True)
depot_dict = {
int(i.stem): int(i.read_text(encoding='utf-8').strip())
for i in app_list_path.iterdir() if i.is_file() and i.stem.isdecimal() and i.suffix == '.txt'
}
for depot_id in map(int, depot_id_list):
if depot_id not in depot_dict.values():
index = max(depot_dict.keys(), default=-1) + 1
while index in depot_dict:
index += 1
(app_list_path /
f'{index}.txt').write_text(str(depot_id), encoding='utf-8')
depot_dict[index] = depot_id
return True
except Exception as e:
print(f'处理时出错: {e}')
return False
async def setup_greenluma(config: Dict, depot_data: List[Tuple[str, str]]) -> bool:
"""配置GreenLuma"""
steam_path = (
Path(config["Custom_Steam_Path"])
if config.get("Custom_Steam_Path")
else get_steam_path(config)
)
applist_dir = steam_path / "AppList"
applist_dir.mkdir(exist_ok=True)
for f in applist_dir.glob("*.txt"):
f.unlink()
for idx, (d_id, _) in enumerate(depot_data, 1):
(applist_dir / f"{idx}.txt").write_text(str(d_id))
config_path = steam_path / "config" / "config.vdf"
async with aiofiles.open(config_path, "r+") as f:
content = vdf.loads(await f.read())
content.setdefault("depots", {}).update(
{d_id: {"DecryptionKey": d_key} for d_id, d_key in depot_data}
)
await f.seek(0)
await f.write(vdf.dumps(content))
return True
async def fetch_branch_info(url, headers) -> str | None:
try:
r = await client.get(url, headers=headers)
return r.json()
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
log.error(f'获取信息失败: {stack_error(e)}')
return None
except httpx.ConnectTimeout as e:
log.error(f'获取信息时超时: {stack_error(e)}')
return None
async def get_latest_repo_info(repos: list, app_id: str, headers) -> Any | None:
latest_date = None
selected_repo = None
for repo in repos:
url = f'https://api.github.com/repos/{repo}/branches/{app_id}'
r_json = await fetch_branch_info(url, headers)
if r_json and 'commit' in r_json:
date = r_json['commit']['commit']['author']['date']
if (latest_date is None) or (date > latest_date):
latest_date = date
selected_repo = repo
return selected_repo, latest_date
async def main(app_id: str, repos: list) -> bool:
app_id_list = list(filter(str.isdecimal, app_id.strip().split('-')))
if not app_id_list:
log.error(f'App ID无效')
return False
app_id = app_id_list[0]
github_token = config.get("Github_Personal_Token", "")
headers = {'Authorization': f'Bearer {github_token}'} if github_token else None
await checkcn()
await check_github_api_rate_limit(headers)
selected_repo, latest_date = await get_latest_repo_info(repos, app_id, headers)
if (selected_repo):
log.info(f'选择清单仓库: {selected_repo}')
url = f'https://api.github.com/repos/{selected_repo}/branches/{app_id}'
r_json = await fetch_branch_info(url, headers)
if (r_json) and ('commit' in r_json):
sha = r_json['commit']['sha']
url = r_json['commit']['commit']['tree']['url']
r2_json = await fetch_branch_info(url, headers)
if (r2_json) and ('tree' in r2_json):
collected_depots = []
for item in r2_json['tree']:
result = await get_manifest(sha, item['path'], SP, selected_repo)
collected_depots.extend(result)
if collected_depots:
if ST:
await migrate()
await stool_add(collected_depots, app_id)
log.info('找到SteamTools, 已添加解锁文件')
elif GL:
await greenluma_add([app_id])
depot_config = {'depots': {depot_id: {'DecryptionKey': depot_key} for depot_id, depot_key in collected_depots}}
await depotkey_merge(SP / 'config' / 'config.vdf', depot_config)
if await greenluma_add([int(i) for i in depot_config['depots'] if i.isdecimal()]):
log.info('找到GreenLuma, 已添加解锁文件')
log.info(f'清单最后更新时间: {latest_date}')
log.info(f'入库成功: {app_id}')
await client.aclose()
os.system('pause')
return True
log.error(f'清单下载或生成失败: {app_id}')
await client.aclose()
os.system('pause')
return False
if __name__ == '__main__':
async def main_flow():
"""主流程控制"""
validate_windows_version()
init()
try:
repos = [
'ikun0014/ManifestHub',
'Auiowu/ManifestAutoUpdate',
]
app_id = input(f"{Fore.CYAN}{Back.BLACK}{Style.BRIGHT}请输入游戏AppID: {Style.RESET_ALL}").strip()
asyncio.run(main(app_id, repos))
except KeyboardInterrupt:
log.info("程序已退出")
except SystemExit:
sys.exit()
app_id = input("请输入游戏AppID: ").strip()
if not app_id.isdigit():
raise ValueError("无效的AppID")
print(
"\n".join(
[f"{idx+1}. {item.value[1]}" for idx, item in enumerate(RepoChoice)]
)
)
repo_choice = int(input("请选择清单仓库 (默认3): ") or 3)
selected_repo = list(RepoChoice)[repo_choice - 1].value[0]
tool_choice = int(input("请选择解锁工具 (1.SteamTools 2.GreenLuma): "))
config = await load_config()
steam_path = get_steam_path(config)
depot_data = await handle_depot_files(selected_repo, app_id, steam_path)
if await setup_unlock_tool(config, depot_data, app_id, tool_choice):
log.info("游戏解锁配置成功!")
if tool_choice == 1:
log.info("请重启SteamTools生效")
elif tool_choice == 2:
log.info("请重启GreenLuma生效")
else:
log.error("配置失败,请检查日志")
except Exception as e:
log.error(f"运行错误: {str(e)}")
log.debug(traceback.format_exc())
finally:
await CLIENT.aclose()
if __name__ == "__main__":
asyncio.run(main_flow())
os.system("pause")