VER 1.4.2

This commit is contained in:
ikun0014
2025-03-05 21:25:24 +08:00
parent 628b92b86d
commit 8bf15eda57
2 changed files with 118 additions and 103 deletions

View File

@@ -1,3 +1,4 @@
import os
import httpx
import sys
import winreg
@@ -42,7 +43,7 @@ def load_config() -> dict:
if not Path("./config.json").exists():
generate_config()
print("请填写配置文件后重新运行程序")
sys.exit(0)
os.system("pause")
try:
with open(Path("./config.json"), "r", encoding="utf-8") as f:
@@ -62,4 +63,10 @@ DEBUG_MODE = CONFIG.get("Debug_Mode", False)
LOG_FILE = CONFIG.get("Logging_Files", True)
GITHUB_TOKEN = str(CONFIG.get("Github_Personal_Token", ""))
STEAM_PATH = get_steam_path(CONFIG)
IS_CN = True
HEADER = {"Authorization": f"Bearer {GITHUB_TOKEN}"} if GITHUB_TOKEN else None
REPO_LIST = [
"SteamAutoCracks/ManifestHub",
"ikun0014/ManifestHub",
"Auiowu/ManifestAutoUpdate",
]

212
main.py
View File

@@ -6,31 +6,23 @@ import aiofiles
import httpx
import vdf
import time
from typing import Tuple, List, Dict
from typing import Any, Tuple, List, Dict
from pathlib import Path
from enum import Enum
from common import log
from common import log, variable
from common.variable import (
CLIENT,
HEADER,
STEAM_PATH,
REPO_LIST,
)
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
class RepoChoice(Enum):
STEAMTOOLS_DUMP = ("SteamAutoCracks/ManifestHub", "推荐使用")
IKUN = ("ikun0014/ManifestHub", "没号更不了")
AUIOWU = ("Auiowu/ManifestAutoUpdate", "未知维护状态的仓库")
LOCK = asyncio.Lock()
DEFAULT_REPO = RepoChoice.STEAMTOOLS_DUMP
logger = log.log("Onekey")
LOG = log.log("Onekey")
DEFAULT_REPO = REPO_LIST[0]
def init() -> None:
@@ -43,12 +35,12 @@ def init() -> None:
| |_| | | | \ | | |___ | | \ \ | |___ / /
\_____/ |_| \_| |_____| |_| \_\ |_____| /_/
"""
logger.info(banner)
logger.info("作者: ikun0014 | 版本: 1.4.1 | 官网: ikunshare.com")
logger.info("项目仓库: GitHub: https://github.com/ikunshare/Onekey")
logger.info("ikunshare.com | 严禁倒卖")
logger.info("提示: 请确保已安装Windows 10/11并正确配置Steam;SteamTools/GreenLuma")
logger.info("开梯子必须配置Token, 你的IP我不相信能干净到哪")
LOG.info(banner)
LOG.info("作者: ikun0014 | 版本: 1.4.1 | 官网: ikunshare.com")
LOG.info("项目仓库: GitHub: https://github.com/ikunshare/Onekey")
LOG.info("ikunshare.com | 严禁倒卖")
LOG.info("提示: 请确保已安装Windows 10/11并正确配置Steam;SteamTools/GreenLuma")
LOG.info("开梯子必须配置Token, 你的IP我不相信能干净到哪")
async def checkcn() -> bool:
@@ -57,19 +49,19 @@ async def checkcn() -> bool:
body = req.json()
scn = bool(body["flag"])
if not scn:
logger.info(
LOG.info(
f"您在非中国大陆地区({body['country']})上使用了项目, 已自动切换回Github官方下载CDN"
)
os.environ["IS_CN"] = "no"
variable.IS_CN = False
return False
else:
os.environ["IS_CN"] = "yes"
variable.IS_CN = True
return True
except KeyboardInterrupt:
logger.info("程序已退出")
LOG.info("程序已退出")
except httpx.ConnectError as e:
os.environ["IS_CN"] = "yes"
logger.warning("检查服务器位置失败,已忽略,自动认为你在中国大陆")
variable.IS_CN = True
LOG.warning("检查服务器位置失败,已忽略,自动认为你在中国大陆")
return False
@@ -92,85 +84,111 @@ async def check_github_api_rate_limit(headers):
reset_time_formatted = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(reset_time)
)
logger.info(f"剩余请求次数: {remaining_requests}")
LOG.info(f"剩余请求次数: {remaining_requests}")
if remaining_requests == 0:
logger.warning(
LOG.warning(
f"GitHub API 请求数已用尽, 将在 {reset_time_formatted} 重置,建议生成一个填在配置文件里"
)
else:
logger.error("Github请求数检查失败, 网络错误")
LOG.error("Github请求数检查失败, 网络错误")
except KeyboardInterrupt:
logger.info("程序已退出")
LOG.info("程序已退出")
except httpx.ConnectError as e:
logger.error(f"检查Github API 请求数失败, {stack_error(e)}")
LOG.error(f"检查Github API 请求数失败, {stack_error(e)}")
except httpx.ConnectTimeout as e:
logger.error(f"检查Github API 请求数超时: {stack_error(e)}")
LOG.error(f"检查Github API 请求数超时: {stack_error(e)}")
except Exception as e:
logger.error(f"发生错误: {stack_error(e)}")
LOG.error(f"发生错误: {stack_error(e)}")
async def get_latest_repo_info(repos: list, app_id: str, headers) -> Any | None:
latest_date = None
selected_repo = None
for repo in repos:
url = f"https://api.github.com/repos/{repo}/branches/{app_id}"
r = await CLIENT.get(url, headers=headers)
r_json = r.json()
if r_json and "commit" in r_json:
date = r_json["commit"]["commit"]["author"]["date"]
if (latest_date is None) or (date > latest_date):
latest_date = date
selected_repo = repo
return selected_repo, latest_date
async def handle_depot_files(
repo: str, app_id: str, steam_path: Path
repos: List, app_id: str, steam_path: Path
) -> List[Tuple[str, str]]:
collected = []
depot_map = {}
try:
branch_url = f"https://api.github.com/repos/{repo}/branches/{app_id}"
branch_res = await CLIENT.get(branch_url, headers=HEADER)
branch_res.raise_for_status()
selected_repo, latest_date = await get_latest_repo_info(
repos, app_id, headers=HEADER
)
tree_url = branch_res.json()["commit"]["commit"]["tree"]["url"]
tree_res = await CLIENT.get(tree_url)
tree_res.raise_for_status()
if selected_repo:
branch_url = (
f"https://api.github.com/repos/{selected_repo}/branches/{app_id}"
)
branch_res = await CLIENT.get(branch_url, headers=HEADER)
branch_res.raise_for_status()
depot_cache = steam_path / "depotcache"
depot_cache.mkdir(exist_ok=True)
tree_url = branch_res.json()["commit"]["commit"]["tree"]["url"]
tree_res = await CLIENT.get(tree_url)
tree_res.raise_for_status()
for item in tree_res.json()["tree"]:
file_path = str(item["path"])
if file_path.endswith(".manifest"):
save_path = depot_cache / file_path
if save_path.exists():
logger.warning(f"已存在清单: {save_path}")
depot_cache = steam_path / "depotcache"
depot_cache.mkdir(exist_ok=True)
LOG.info(f"当前选择清单仓库: https://github.com/{selected_repo}")
LOG.info(f"此清单分支上次更新时间:{latest_date}")
for item in tree_res.json()["tree"]:
file_path = str(item["path"])
if file_path.endswith(".manifest"):
save_path = depot_cache / file_path
if save_path.exists():
LOG.warning(f"已存在清单: {save_path}")
continue
content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, selected_repo
)
LOG.info(f"清单下载成功: {file_path}")
async with aiofiles.open(save_path, "wb") as f:
await f.write(content)
elif "key.vdf" in file_path.lower():
key_content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, selected_repo
)
collected.extend(parse_key_vdf(key_content))
for item in tree_res.json()["tree"]:
if not item["path"].endswith(".manifest"):
continue
content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, repo
)
logger.info(f"清单下载成功: {file_path}")
async with aiofiles.open(save_path, "wb") as f:
await f.write(content)
elif "key.vdf" in file_path.lower():
key_content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, repo
)
collected.extend(parse_key_vdf(key_content))
for item in tree_res.json()["tree"]:
if not item["path"].endswith(".manifest"):
continue
filename = Path(item["path"]).stem
if "_" not in filename:
continue
filename = Path(item["path"]).stem
if "_" not in filename:
continue
depot_id, manifest_id = filename.replace(".manifest", "").split("_", 1)
if not (depot_id.isdigit() and manifest_id.isdigit()):
continue
depot_id, manifest_id = filename.replace(".manifest", "").split("_", 1)
if not (depot_id.isdigit() and manifest_id.isdigit()):
continue
depot_map.setdefault(depot_id, []).append(manifest_id)
depot_map.setdefault(depot_id, []).append(manifest_id)
for depot_id in depot_map:
depot_map[depot_id].sort(key=lambda x: int(x), reverse=True)
for depot_id in depot_map:
depot_map[depot_id].sort(key=lambda x: int(x), reverse=True)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP错误: {e.response.status_code}")
LOG.error(f"HTTP错误: {e.response.status_code}")
except Exception as e:
logger.error(f"文件处理失败: {str(e)}")
LOG.error(f"文件处理失败: {str(e)}")
return collected, depot_map
async def fetch_from_cdn(sha: str, path: str, repo: str):
if os.environ.get("IS_CN") == "yes":
if variable.IS_CN:
url_list = [
f"https://jsdelivr.pai233.top/gh/{repo}@{sha}/{path}",
f"https://cdn.jsdmirror.com/gh/{repo}@{sha}/{path}",
@@ -188,18 +206,19 @@ async def fetch_from_cdn(sha: str, path: str, repo: str):
if r.status_code == 200:
return r.read()
else:
logger.error(f"获取失败: {path} - 状态码: {r.status_code}")
LOG.error(f"获取失败: {path} - 状态码: {r.status_code}")
except KeyboardInterrupt:
logger.info("程序已退出")
LOG.info("程序已退出")
except httpx.ConnectError as e:
logger.error(f"获取失败: {path} - 连接错误: {str(e)}")
LOG.error(f"获取失败: {path} - 连接错误: {str(e)}")
except httpx.ConnectTimeout as e:
logger.error(f"连接超时: {url} - 错误: {str(e)}")
LOG.error(f"连接超时: {url} - 错误: {str(e)}")
retry -= 1
logger.warning(f"重试剩余次数: {retry} - {path}")
LOG.warning(f"重试剩余次数: {retry} - {path}")
LOG.error(f"超过最大重试次数: {path}")
logger.error(f"超过最大重试次数: {path}")
raise Exception(f"无法下载: {path}")
@@ -208,7 +227,7 @@ def parse_key_vdf(content: bytes) -> List[Tuple[str, str]]:
depots = vdf.loads(content.decode("utf-8"))["depots"]
return [(d_id, d_info["DecryptionKey"]) for d_id, d_info in depots.items()]
except Exception as e:
logger.error(f"密钥解析失败: {str(e)}")
LOG.error(f"密钥解析失败: {str(e)}")
return []
@@ -229,7 +248,7 @@ async def setup_unlock_tool(
elif (tool_choice == 2) and (isGreenLuma):
return await setup_greenluma(depot_data)
else:
logger.error("你选的啥?")
LOG.error("你选的啥?")
return False
@@ -267,12 +286,12 @@ async def setup_steamtools(
await proc.wait()
if proc.returncode != 0:
logger.error(f"Lua编译失败: {await proc.stderr.read()}")
LOG.error(f"Lua编译失败: {await proc.stderr.read()}")
return False
if lua_file.exists():
os.remove(lua_file)
logger.info(f"删除临时文件: {lua_file}")
LOG.info(f"删除临时文件: {lua_file}")
return True
@@ -301,7 +320,7 @@ async def setup_greenluma(depot_data: List[Tuple[str, str]]) -> bool:
async def main_flow(app_id: str):
app_id_list = list(filter(str.isdecimal, app_id.strip().split("-")))
if not app_id_list:
logger.error(f"App ID无效")
LOG.error(f"App ID无效")
os.system("pause")
return False
@@ -311,30 +330,19 @@ async def main_flow(app_id: str):
await checkcn()
await check_github_api_rate_limit(HEADER)
print(
"\n".join(
[f"{idx+1}. {item.value[1]}" for idx, item in enumerate(RepoChoice)]
)
)
repo_choice = int(input("请选择清单仓库 (默认1): \n") or 1)
selected_repo = list(RepoChoice)[repo_choice - 1].value[0]
tool_choice = int(input("请选择解锁工具 (1.SteamTools 2.GreenLuma): \n"))
depot_data, depot_map = await handle_depot_files(
selected_repo, app_id, STEAM_PATH
)
depot_data, depot_map = await handle_depot_files(REPO_LIST, app_id, STEAM_PATH)
if await setup_unlock_tool(depot_data, app_id, tool_choice, depot_map):
logger.info("游戏解锁配置成功!")
logger.info("重启Steam后生效")
LOG.info("游戏解锁配置成功!")
LOG.info("重启Steam后生效")
else:
logger.error("配置失败,请检查日志")
LOG.error("配置失败,请检查日志")
os.system("pause")
return True
except Exception as e:
logger.error(f"运行错误: {stack_error(e)}")
LOG.error(f"运行错误: {stack_error(e)}")
return False
except KeyboardInterrupt:
return False
@@ -351,5 +359,5 @@ if __name__ == "__main__":
except (asyncio.CancelledError, KeyboardInterrupt):
os.system("pause")
except Exception as e:
logger.error(f"错误:{stack_error(e)}")
LOG.error(f"错误:{stack_error(e)}")
os.system("pause")