Compare commits

...

21 Commits

Author SHA1 Message Date
ikun0014
f207604b0e Update release.yml 2025-05-02 12:32:10 +08:00
ikun0014
74a74e5fa3 1.4.7 2025-05-02 12:16:30 +08:00
ikun0014
f7118f0224 日常维护 2025-05-02 12:16:18 +08:00
ikun0014
a475dcb6b8 1.4.6 2025-03-20 22:55:17 +08:00
ikun0014
2ea7c76004 1.4.6 2025-03-20 22:55:11 +08:00
ikun0014
14684cf1b7 fix: ManifestDownload 2025-03-20 22:54:43 +08:00
ikun0014
f560dab35f 1.4.5 2025-03-11 18:05:34 +08:00
ikun0014
8cdd9aa208 fix: SteamTools Import 2025-03-11 18:05:14 +08:00
ikun0014
37f862ba9e 1.4.4 2025-03-07 23:56:55 +08:00
ikun0014
8612fd0c94 VER 1.4.4 2025-03-07 23:56:51 +08:00
ikun0014
7fcbadabdf 1.4.3 2025-03-07 23:56:29 +08:00
ikun0014
6a21200ccc VER 1.4.3 2025-03-07 23:56:27 +08:00
ikun0014
0a384ce114 VER 1.4.3 2025-03-07 23:55:52 +08:00
ikun0014
041f8d6a00 1.4.2 2025-03-05 21:25:37 +08:00
ikun0014
4db910c8da 1.4.3 2025-03-05 21:25:35 +08:00
ikun0014
8bf15eda57 VER 1.4.2 2025-03-05 21:25:24 +08:00
ikun0014
628b92b86d 1.4.1 2025-03-05 16:35:06 +08:00
ikun0014
ee8c2242f2 VER 1.4.1 2025-03-05 16:35:04 +08:00
ikun0014
76340538b8 修复了一些已知问题。 2025-03-05 16:33:35 +08:00
ikun0014
c693220d73 更新 release.yml 2025-03-04 23:54:08 +08:00
ikun0014
324e537c60 fix? 2025-03-01 00:46:48 +08:00
8 changed files with 278 additions and 167 deletions

View File

@@ -70,7 +70,7 @@ jobs:
uses: softprops/action-gh-release@v2
with:
tag_name: v${{ env.PACKAGE_VERSION }}
files: build/Onekey---v${{ env.PACKAGE_VERSION }}.exe
files: build/Onekey_v${{ env.PACKAGE_VERSION }}.exe
prerelease: false
draft: false
env:
@@ -87,13 +87,14 @@ jobs:
gitee_release_body: I don't know
gitee_target_commitish: main
gitee_upload_retry_times: 3
gitee_file_name: Onekey---v${{ env.PACKAGE_VERSION }}.exe
gitee_file_path: build/Onekey---v${{ env.PACKAGE_VERSION }}.exe
gitee_file_name: Onekey_v${{ env.PACKAGE_VERSION }}.exe
gitee_file_path: build/Onekey_v${{ env.PACKAGE_VERSION }}.exe
- name: Upload to Telegram Channel
run: |
& curl -F "chat_id=${{ secrets.TELEGRAM_TO }}" `
-F "document=@build/Onekey---v${{ env.PACKAGE_VERSION }}.exe" `
-F "thread_id=${{ secrets.TELEGRAM_THREAD }}" `
-F "document=@build/Onekey_v${{ env.PACKAGE_VERSION }}.exe" `
-F "caption=Onekey's New Update ${{ env.PACKAGE_VERSION }}" `
-F "parse_mode=Markdown" `
"https://api.telegram.org/bot${{ secrets.TELEGRAM_TOKEN }}/sendDocument"

4
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"python.analysis.autoImportCompletions": true,
"python.analysis.typeCheckingMode": "basic"
}

0
common/__init__.py Normal file
View File

45
common/log.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import colorama
import logging
import logzero
from logzero import setup_logger, LogFormatter
from .variable import LOG_FILE, DEBUG_MODE
if not os.path.exists(f"./logs"):
os.makedirs(f"./logs")
def log(name: str) -> logging.Logger:
if DEBUG_MODE:
level = logzero.DEBUG
else:
level = logzero.INFO
colors = {
logzero.DEBUG: colorama.Fore.CYAN,
logzero.INFO: colorama.Fore.GREEN,
logzero.WARNING: colorama.Fore.YELLOW,
logzero.ERROR: colorama.Fore.RED,
logzero.CRITICAL: colorama.Fore.MAGENTA,
}
terminal_formatter = LogFormatter(
color=True,
fmt="%(color)s%(message)s%(end_color)s",
datefmt="%Y-%m-%d %H:%M:%S",
colors=colors,
)
logger = setup_logger(name, level=level, formatter=terminal_formatter)
if LOG_FILE:
logfile = f"./logs/{name}.log"
file_handler = logging.FileHandler(logfile)
file_formatter = logging.Formatter(
"[%(asctime)s] | [%(name)s:%(levelname)s] | [%(module)s.%(funcName)s:%(lineno)d] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger

73
common/variable.py Normal file
View File

@@ -0,0 +1,73 @@
import time
import httpx
import sys
import winreg
import ujson as json
from pathlib import Path
def get_steam_path(config: dict) -> Path:
"""获取Steam安装路径"""
try:
if custom_path := config.get("Custom_Steam_Path"):
return Path(custom_path)
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Valve\Steam") as key:
return Path(winreg.QueryValueEx(key, "SteamPath")[0])
except Exception as e:
print(f"Steam路径获取失败: {str(e)}")
sys.exit(1)
DEFAULT_CONFIG = {
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"Debug_Mode": False,
"Logging_Files": True,
"Help": "Github Personal Token可在GitHub设置的Developer settings中生成",
}
def generate_config() -> None:
try:
with open(Path("./config.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(DEFAULT_CONFIG, indent=2, ensure_ascii=False))
print("配置文件已生成")
except IOError as e:
print(f"配置文件创建失败: {str(e)}")
sys.exit(1)
def load_config() -> dict:
if not Path("./config.json").exists():
generate_config()
print("请填写配置文件后重新运行程序5秒后退出")
time.sleep(5)
sys.exit(1)
try:
with open(Path("./config.json"), "r", encoding="utf-8") as f:
return json.loads(f.read())
except json.JSONDecodeError:
print("配置文件损坏,正在重新生成...")
generate_config()
sys.exit(1)
except Exception as e:
print(f"配置加载失败: {str(e)}")
sys.exit(1)
CLIENT = httpx.AsyncClient(verify=False)
CONFIG = load_config()
DEBUG_MODE = CONFIG.get("Debug_Mode", False)
LOG_FILE = CONFIG.get("Logging_Files", True)
GITHUB_TOKEN = str(CONFIG.get("Github_Personal_Token", ""))
STEAM_PATH = get_steam_path(CONFIG)
IS_CN = True
HEADER = {"Authorization": f"Bearer {GITHUB_TOKEN}"} if GITHUB_TOKEN else None
REPO_LIST = [
"SteamAutoCracks/ManifestHub",
"ikun0014/ManifestHub",
"Auiowu/ManifestAutoUpdate",
"tymolu233/ManifestAutoUpdate-fix",
]

311
main.py
View File

@@ -1,36 +1,27 @@
import os
import sys
import traceback
import asyncio
import aiofiles
import httpx
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import vdf
import time
from typing import Tuple, List, Dict
import httpx
import asyncio
import traceback
from typing import Any, Tuple, List, Dict
from pathlib import Path
from enum import Enum
from common import log
from common import log, variable
from common.variable import (
CLIENT,
HEADER,
STEAM_PATH,
REPO_LIST,
)
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
class RepoChoice(Enum):
STEAMTOOLS_DUMP = ("SteamAutoCracks/ManifestHub", "推荐使用")
IKUN = ("ikun0014/ManifestHub", "没号更不了")
AUIOWU = ("Auiowu/ManifestAutoUpdate", "未知维护状态的仓库")
LOCK = asyncio.Lock()
DEFAULT_REPO = RepoChoice.STEAMTOOLS_DUMP
logger = log.log("Onekey")
LOG = log.log("Onekey")
DEFAULT_REPO = REPO_LIST[0]
def init() -> None:
@@ -43,44 +34,45 @@ def init() -> None:
| |_| | | | \ | | |___ | | \ \ | |___ / /
\_____/ |_| \_| |_____| |_| \_\ |_____| /_/
"""
logger.info(banner)
logger.info("作者: ikun0014 | 版本: 1.4.0 | 官网: ikunshare.com")
logger.info("项目仓库: GitHub: https://github.com/ikunshare/Onekey")
logger.info("ikunshare.com | 严禁倒卖")
logger.info("提示: 请确保已安装Windows 10/11并正确配置Steam;SteamTools/GreenLuma")
logger.info("开梯子必须配置Token, 你的IP我不相信能干净到哪")
LOG.info(banner)
LOG.info("作者: ikun0014 | 版本: 1.4.7 | 官网: ikunshare.com")
LOG.info("项目仓库: GitHub: https://github.com/ikunshare/Onekey")
LOG.warning("ikunshare.com | 严禁倒卖")
LOG.warning("提示: 请确保已安装Windows 10/11并正确配置Steam;SteamTools/GreenLuma")
LOG.warning("开梯子必须配置Token, 你的IP我不相信能干净到哪")
async def checkcn() -> bool:
async def CheckCN() -> bool:
try:
req = await CLIENT.get("https://mips.kugou.com/check/iscn?&format=json")
body = req.json()
scn = bool(body["flag"])
if not scn:
logger.info(
LOG.info(
f"您在非中国大陆地区({body['country']})上使用了项目, 已自动切换回Github官方下载CDN"
)
os.environ["IS_CN"] = "no"
variable.IS_CN = False
return False
else:
os.environ["IS_CN"] = "yes"
variable.IS_CN = True
return True
except KeyboardInterrupt:
logger.info("程序已退出")
LOG.info("程序已退出")
return True
except httpx.ConnectError as e:
os.environ["IS_CN"] = "yes"
logger.warning("检查服务器位置失败,已忽略,自动认为你在中国大陆")
variable.IS_CN = True
LOG.warning("检查服务器位置失败,已忽略,自动认为你在中国大陆")
return False
def stack_error(exception: Exception) -> str:
def StackError(exception: Exception) -> str:
stack_trace = traceback.format_exception(
type(exception), exception, exception.__traceback__
)
return "".join(stack_trace)
async def check_github_api_rate_limit(headers):
async def CheckLimit(headers):
url = "https://api.github.com/rate_limit"
try:
r = await CLIENT.get(url, headers=headers)
@@ -92,87 +84,111 @@ async def check_github_api_rate_limit(headers):
reset_time_formatted = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(reset_time)
)
logger.info(f"剩余请求次数: {remaining_requests}")
LOG.info(f"剩余Github API请求次数: {remaining_requests}")
if remaining_requests == 0:
logger.warning(
f"GitHub API 请求数已用尽, 将在 {reset_time_formatted} 重置,建议生成一个填在配置文件里"
LOG.warning(
f"GitHub API 请求数已用尽, 将在 {reset_time_formatted} 重置, 建议生成一个填在配置文件里"
)
else:
logger.error("Github请求数检查失败, 网络错误")
LOG.error("Github请求数检查失败, 网络错误")
except KeyboardInterrupt:
logger.info("程序已退出")
LOG.info("程序已退出")
except httpx.ConnectError as e:
logger.error(f"检查Github API 请求数失败, {stack_error(e)}")
LOG.error(f"检查Github API 请求数失败, {StackError(e)}")
except httpx.ConnectTimeout as e:
logger.error(f"检查Github API 请求数超时: {stack_error(e)}")
LOG.error(f"检查Github API 请求数超时: {StackError(e)}")
except Exception as e:
logger.error(f"发生错误: {stack_error(e)}")
LOG.error(f"发生错误: {StackError(e)}")
async def handle_depot_files(
repo: str, app_id: str, steam_path: Path
async def GetLatestRepoInfo(repos: list, app_id: str, headers) -> Any | str | None:
latest_date = None
selected_repo = None
for repo in repos:
url = f"https://api.github.com/repos/{repo}/branches/{app_id}"
r = await CLIENT.get(url, headers=headers)
r_json = r.json()
if r_json and "commit" in r_json:
date = r_json["commit"]["commit"]["author"]["date"]
if (latest_date is None) or (date > latest_date):
latest_date = str(date)
selected_repo = str(repo)
return selected_repo, latest_date
async def HandleDepotFiles(
repos: List, app_id: str, steam_path: Path
) -> List[Tuple[str, str]]:
collected = []
depot_map = {}
try:
branch_url = f"https://api.github.com/repos/{repo}/branches/{app_id}"
branch_res = await CLIENT.get(branch_url, headers=HEADER)
branch_res.raise_for_status()
selected_repo, latest_date = await GetLatestRepoInfo(
repos, app_id, headers=HEADER
) # type: ignore
tree_url = branch_res.json()["commit"]["commit"]["tree"]["url"]
tree_res = await CLIENT.get(tree_url)
tree_res.raise_for_status()
if selected_repo:
branch_url = (
f"https://api.github.com/repos/{selected_repo}/branches/{app_id}"
)
branch_res = await CLIENT.get(branch_url, headers=HEADER)
branch_res.raise_for_status()
depot_cache = steam_path / "depotcache"
depot_cache.mkdir(exist_ok=True)
tree_url = branch_res.json()["commit"]["commit"]["tree"]["url"]
tree_res = await CLIENT.get(tree_url)
tree_res.raise_for_status()
for item in tree_res.json()["tree"]:
file_path = str(item["path"])
if file_path.endswith(".manifest"):
save_path = depot_cache / file_path
if save_path.exists():
logger.warning(f"已存在清单: {save_path}")
depot_cache = steam_path / "depotcache"
depot_cache.mkdir(exist_ok=True)
LOG.info(f"当前选择清单仓库: https://github.com/{selected_repo}")
LOG.info(f"此清单分支上次更新时间:{latest_date}")
for item in tree_res.json()["tree"]:
file_path = str(item["path"])
if file_path.endswith(".manifest"):
save_path = depot_cache / file_path
if save_path.exists():
LOG.warning(f"已存在清单: {save_path}")
continue
content = await FetchFiles(
branch_res.json()["commit"]["sha"], file_path, selected_repo
)
LOG.info(f"清单下载成功: {file_path}")
with open(save_path, "wb") as f:
f.write(content)
elif "key.vdf" in file_path.lower():
key_content = await FetchFiles(
branch_res.json()["commit"]["sha"], file_path, selected_repo
)
collected.extend(ParseKey(key_content))
for item in tree_res.json()["tree"]:
if not item["path"].endswith(".manifest"):
continue
content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, repo
)
logger.info(f"清单下载成功: {file_path}")
async with aiofiles.open(save_path, "wb") as f:
await f.write(content)
elif "key.vdf" in file_path.lower():
key_content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, repo
)
collected.extend(parse_key_vdf(key_content))
for item in tree_res.json()["tree"]:
if not item["path"].endswith(".manifest"):
continue
filename = Path(item["path"]).stem
if "_" not in filename:
continue
filename = Path(item["path"]).stem
if "_" not in filename:
continue
depot_id, manifest_id = filename.replace(".manifest", "").split("_", 1)
if not (depot_id.isdigit() and manifest_id.isdigit()):
continue
depot_id, manifest_id = filename.replace(".manifest", "").split("_", 1)
if not (depot_id.isdigit() and manifest_id.isdigit()):
continue
depot_map.setdefault(depot_id, []).append(manifest_id)
depot_map.setdefault(depot_id, []).append(manifest_id)
for depot_id in depot_map:
depot_map[depot_id].sort(key=lambda x: int(x), reverse=True)
for depot_id in depot_map:
depot_map[depot_id].sort(key=lambda x: int(x), reverse=True)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP错误: {e.response.status_code}")
LOG.error(f"HTTP错误: {e.response.status_code}")
except Exception as e:
logger.error(f"文件处理失败: {str(e)}")
return collected, depot_map
LOG.error(f"文件处理失败: {str(e)}")
return collected, depot_map # type: ignore
async def fetch_from_cdn(sha: str, path: str, repo: str):
if os.environ.get("IS_CN") == "yes":
async def FetchFiles(sha: str, path: str, repo: str):
if variable.IS_CN:
url_list = [
f"https://jsdelivr.pai233.top/gh/{repo}@{sha}/{path}",
f"https://cdn.jsdmirror.com/gh/{repo}@{sha}/{path}",
f"https://raw.gitmirror.com/{repo}/{sha}/{path}",
f"https://raw.dgithub.xyz/{repo}/{sha}/{path}",
@@ -188,58 +204,54 @@ async def fetch_from_cdn(sha: str, path: str, repo: str):
if r.status_code == 200:
return r.read()
else:
logger.error(f"获取失败: {path} - 状态码: {r.status_code}")
LOG.error(f"获取失败: {path} - 状态码: {r.status_code}")
except KeyboardInterrupt:
logger.info("程序已退出")
LOG.info("程序已退出")
except httpx.ConnectError as e:
logger.error(f"获取失败: {path} - 连接错误: {str(e)}")
LOG.error(f"获取失败: {path} - 连接错误: {str(e)}")
except httpx.ConnectTimeout as e:
logger.error(f"连接超时: {url} - 错误: {str(e)}")
LOG.error(f"连接超时: {url} - 错误: {str(e)}")
retry -= 1
logger.warning(f"重试剩余次数: {retry} - {path}")
LOG.warning(f"重试剩余次数: {retry} - {path}")
LOG.error(f"超过最大重试次数: {path}")
logger.error(f"超过最大重试次数: {path}")
raise Exception(f"无法下载: {path}")
def parse_key_vdf(content: bytes) -> List[Tuple[str, str]]:
def ParseKey(content: bytes) -> List[Tuple[str, str]]:
try:
depots = vdf.loads(content.decode("utf-8"))["depots"]
return [(d_id, d_info["DecryptionKey"]) for d_id, d_info in depots.items()]
except Exception as e:
logger.error(f"密钥解析失败: {str(e)}")
LOG.error(f"密钥解析失败: {str(e)}")
return []
async def setup_unlock_tool(
def SetupUnlock(
depot_data: List[Tuple[str, str]],
app_id: str,
tool_choice: int,
depot_map: Dict,
) -> bool:
isGreenLuma = any(
(STEAM_PATH / dll).exists()
for dll in ["GreenLuma_2024_x86.dll", "GreenLuma_2024_x64.dll", "User32.dll"]
)
isSteamTools = (STEAM_PATH / "config" / "stUI").is_dir()
if (tool_choice == 1) and (isSteamTools):
return await setup_steamtools(depot_data, app_id, depot_map)
elif (tool_choice == 2) and (isGreenLuma):
return await setup_greenluma(depot_data)
if tool_choice == 1:
return SetupTools(depot_data, app_id, depot_map)
elif tool_choice == 2:
return SetupGreenLuma(depot_data)
else:
logger.error("你选的啥?")
LOG.error("你选的啥?")
return False
async def setup_steamtools(
depot_data: List[Tuple[str, str]], app_id: str, depot_map: Dict
) -> bool:
def SetupTools(depot_data: List[Tuple[str, str]], app_id: str, depot_map: Dict) -> bool:
st_path = STEAM_PATH / "config" / "stplug-in"
st_path.mkdir(exist_ok=True)
choice = input(f"是否锁定版本(推荐在选择仓库1时使用)?(y/n): \n").lower()
choice = input(
f"是否锁定版本(推荐在选择仓库SteamAutoCracks/ManifestHub时使用)?(y/n): "
).lower()
if choice == "y":
versionlock = True
@@ -255,29 +267,13 @@ async def setup_steamtools(
lua_content += f'addappid({d_id}, 1, "{d_key}")\n'
lua_file = st_path / f"{app_id}.lua"
async with aiofiles.open(lua_file, "w") as f:
await f.write(lua_content)
proc = await asyncio.create_subprocess_exec(
str(st_path / "luapacka.exe"),
str(lua_file),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.wait()
if proc.returncode != 0:
logger.error(f"Lua编译失败: {await proc.stderr.read()}")
return False
if lua_file.exists():
os.remove(lua_file)
logger.info(f"删除临时文件: {lua_file}")
with open(lua_file, "w") as f:
f.write(lua_content)
return True
async def setup_greenluma(depot_data: List[Tuple[str, str]]) -> bool:
def SetupGreenLuma(depot_data: List[Tuple[str, str]]) -> bool:
applist_dir = STEAM_PATH / "AppList"
applist_dir.mkdir(exist_ok=True)
@@ -288,68 +284,61 @@ async def setup_greenluma(depot_data: List[Tuple[str, str]]) -> bool:
(applist_dir / f"{idx}.txt").write_text(str(d_id))
config_path = STEAM_PATH / "config" / "config.vdf"
async with aiofiles.open(config_path, "r+") as f:
content = vdf.loads(await f.read())
with open(config_path, "r+") as f:
content = vdf.loads(f.read())
content.setdefault("depots", {}).update(
{d_id: {"DecryptionKey": d_key} for d_id, d_key in depot_data}
)
await f.seek(0)
await f.write(vdf.dumps(content))
f.seek(0)
f.write(vdf.dumps(content))
return True
async def main_flow(app_id: str):
async def Main(app_id: str):
app_id_list = list(filter(str.isdecimal, app_id.strip().split("-")))
if not app_id_list:
logger.error(f"App ID无效")
LOG.error(f"App ID无效")
os.system("pause")
return False
app_id = app_id_list[0]
try:
await checkcn()
await check_github_api_rate_limit(HEADER)
await CheckCN()
await CheckLimit(HEADER)
print(
"\n".join(
[f"{idx+1}. {item.value[1]}" for idx, item in enumerate(RepoChoice)]
)
)
repo_choice = int(input("请选择清单仓库 (默认1): \n") or 1)
selected_repo = list(RepoChoice)[repo_choice - 1].value[0]
depot_data, depot_map = await HandleDepotFiles(REPO_LIST, app_id, STEAM_PATH)
tool_choice = int(input("请选择解锁工具 (1.SteamTools 2.GreenLuma): \n"))
if (not depot_data) or (not depot_map):
LOG.error(f"未找到此游戏的清单")
return os.system("pause")
depot_data, depot_map = await handle_depot_files(
selected_repo, app_id, STEAM_PATH
)
tool_choice = int(input("请选择解锁工具 (1.SteamTools 2.GreenLuma): "))
if await setup_unlock_tool(depot_data, app_id, tool_choice, depot_map):
logger.info("游戏解锁配置成功!")
logger.info("重启Steam后生效")
if SetupUnlock(depot_data, app_id, tool_choice, depot_map): # type: ignore
LOG.info("游戏解锁配置成功!")
LOG.info("重启Steam后生效")
else:
logger.error("配置失败,请检查日志")
LOG.error("配置失败")
os.system("pause")
return True
except Exception as e:
logger.error(f"运行错误: {stack_error(e)}")
return False
LOG.error(f"运行错误: {StackError(e)}")
return os.system("pause")
except KeyboardInterrupt:
return False
return os.system("pause")
finally:
await CLIENT.aclose()
return True
if __name__ == "__main__":
try:
init()
app_id = input(f"请输入游戏AppID: \n").strip()
asyncio.run(main_flow(app_id))
app_id = input(f"请输入游戏AppID: ").strip()
asyncio.run(Main(app_id))
except (asyncio.CancelledError, KeyboardInterrupt):
os.system("pause")
except Exception as e:
logger.error(f"错误:{stack_error(e)}")
LOG.error(f"错误:{StackError(e)}")
os.system("pause")

View File

@@ -1,6 +1,6 @@
{
"name": "onekey",
"version": "1.4.0",
"version": "1.4.7",
"description": "一个Steam仓库清单下载器",
"main": "index.js",
"scripts": {

View File

@@ -1,4 +1,3 @@
aiofiles
httpx
logzero
ujson