Compare commits

...

31 Commits

Author SHA1 Message Date
ikun0014
37f862ba9e 1.4.4 2025-03-07 23:56:55 +08:00
ikun0014
8612fd0c94 VER 1.4.4 2025-03-07 23:56:51 +08:00
ikun0014
7fcbadabdf 1.4.3 2025-03-07 23:56:29 +08:00
ikun0014
6a21200ccc VER 1.4.3 2025-03-07 23:56:27 +08:00
ikun0014
0a384ce114 VER 1.4.3 2025-03-07 23:55:52 +08:00
ikun0014
041f8d6a00 1.4.2 2025-03-05 21:25:37 +08:00
ikun0014
4db910c8da 1.4.3 2025-03-05 21:25:35 +08:00
ikun0014
8bf15eda57 VER 1.4.2 2025-03-05 21:25:24 +08:00
ikun0014
628b92b86d 1.4.1 2025-03-05 16:35:06 +08:00
ikun0014
ee8c2242f2 VER 1.4.1 2025-03-05 16:35:04 +08:00
ikun0014
76340538b8 修复了一些已知问题。 2025-03-05 16:33:35 +08:00
ikun0014
c693220d73 更新 release.yml 2025-03-04 23:54:08 +08:00
ikun0014
324e537c60 fix? 2025-03-01 00:46:48 +08:00
ikun0014
50b9f1b724 1.4.0 2025-03-01 00:23:21 +08:00
ikun0014
7ba02c4e8f fix? 2025-03-01 00:23:10 +08:00
ikun0014
b2dada2018 . 2025-02-28 22:03:34 +08:00
ikun0014
5ca4f26242 1.3.9 2025-02-28 21:57:09 +08:00
ikun0014
da596964da 1.3.9 2025-02-28 21:56:59 +08:00
ikun0014
ed8fa1cd7f 1.3.8 2025-02-28 18:55:49 +08:00
ikun0014
485a9d85e2 1.3.8 2025-02-28 18:55:36 +08:00
ikun0014
41cfa244e3 fix 2025-02-28 18:55:13 +08:00
ikun0014
09a9e48f7e fix 2025-02-27 19:15:20 +08:00
ikun0014
7ef7297119 sth 2025-02-27 19:14:02 +08:00
ikun0014
ad26456d6c 1.3.7 2025-02-27 19:09:34 +08:00
ikun0014
cd18a2f49d 清单库新增+优化 2025-02-27 19:09:20 +08:00
ikun0014
485fca07f1 修复构建 2025-01-26 22:58:28 +08:00
ikun0014
452be816b1 构建修复 2025-01-26 22:39:44 +08:00
ikun0014
2ba17f1bac 1.3.6 2025-01-26 22:32:32 +08:00
ikun0014
17e1fea9cf 常规维护 2025-01-26 22:32:24 +08:00
ikun0014
2fd7a13bcc 1.3.5 2024-11-27 00:07:12 +08:00
ikun0014
4edcfa8c8e 更新 main.py 2024-11-27 00:06:53 +08:00
8 changed files with 502 additions and 552 deletions

View File

@@ -11,88 +11,89 @@ jobs:
contents: write
runs-on: windows-latest
steps:
- name: Check out git repository
uses: actions/checkout@v4
- name: Check out git repository
uses: actions/checkout@v4
- name: Get package version
shell: powershell
run: |
$version = (Get-Content package.json | ConvertFrom-Json).version
echo "PACKAGE_VERSION=$version" >> $env:GITHUB_ENV
- name: Set up Python 3.12
uses: actions/setup-python@v3
with:
python-version: 3.12
- name: Get package version
shell: powershell
run: |
$version = (Get-Content package.json | ConvertFrom-Json).version
echo "PACKAGE_VERSION=$version" >> $env:GITHUB_ENV
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install imageio
pip install -r requirements.txt
- name: Set up Python 3.13.1
uses: actions/setup-python@v3
with:
python-version: 3.13.1
- name: Build
uses: Nuitka/Nuitka-Action@v1.1
with:
nuitka-version: main
script-name: main.py
standalone: true
onefile: true
windows-uac-admin: true
windows-icon-from-ico: icon.jpg
company-name: ikunshare
product-name: Onekey
file-version: ${{ env.PACKAGE_VERSION }}
product-version: ${{ env.PACKAGE_VERSION }}
file-description: 一个Steam仓库清单下载器
copyright: Copyright © 2024 ikun0014
output-file: Onekey---v${{ env.PACKAGE_VERSION }}.exe
assume-yes-for-downloads: true
output-dir: build
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install imageio
pip install -r requirements.txt
- name: Create git tag
uses: pkgdeps/git-tag-action@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
github_repo: ${{ github.repository }}
version: ${{ env.PACKAGE_VERSION }}
git_commit_sha: ${{ github.sha }}
git_tag_prefix: "v"
- name: Build
uses: Nuitka/Nuitka-Action@main
with:
nuitka-version: main
script-name: main.py
mode: onefile
show-memory: true
windows-uac-admin: true
onefile-tempdir-spec: "%TEMP%\\onekey_%PID%_%TIME%"
windows-icon-from-ico: icon.jpg
company-name: ikunshare
product-name: Onekey
file-version: ${{ env.PACKAGE_VERSION }}
product-version: ${{ env.PACKAGE_VERSION }}
file-description: 一个Steam仓库清单下载器
copyright: Copyright © 2025 ikun0014
output-file: Onekey_v${{ env.PACKAGE_VERSION }}.exe
assume-yes-for-downloads: true
output-dir: build
- name: Upload Artifact
uses: actions/upload-artifact@v4
with:
name: Onekey_v${{ env.PACKAGE_VERSION }}.exe
path: build/Onekey_v${{ env.PACKAGE_VERSION }}.exe
- name: Create git tag
uses: pkgdeps/git-tag-action@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
github_repo: ${{ github.repository }}
version: ${{ env.PACKAGE_VERSION }}
git_commit_sha: ${{ github.sha }}
git_tag_prefix: "v"
- name: Release
uses: softprops/action-gh-release@v2
with:
tag_name: v${{ env.PACKAGE_VERSION }}
files: build/Onekey---v${{ env.PACKAGE_VERSION }}.exe
prerelease: false
draft: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Upload Artifact
uses: actions/upload-artifact@v4
with:
name: Onekey_v${{ env.PACKAGE_VERSION }}.exe
path: build/Onekey_v${{ env.PACKAGE_VERSION }}.exe
- name: Gitee Release
uses: nicennnnnnnlee/action-gitee-release@v1.0.5
with:
gitee_owner: ikun0014
gitee_repo: Onekey
gitee_token: ${{ secrets.GITEE_TOKEN }}
gitee_tag_name: v${{ env.PACKAGE_VERSION }}
gitee_release_name: v${{ env.PACKAGE_VERSION }}
gitee_release_body: I don't know
gitee_target_commitish: main
gitee_upload_retry_times: 3
gitee_file_name: Onekey---v${{ env.PACKAGE_VERSION }}.exe
gitee_file_path: build/Onekey---v${{ env.PACKAGE_VERSION }}.exe
- name: Release
uses: softprops/action-gh-release@v2
with:
tag_name: v${{ env.PACKAGE_VERSION }}
files: build/Onekey_v${{ env.PACKAGE_VERSION }}.exe
prerelease: false
draft: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Upload to Telegram Channel
run: |
& curl -F "chat_id=${{ secrets.TELEGRAM_TO }}" `
-F "document=@build/Onekey---v${{ env.PACKAGE_VERSION }}.exe" `
-F "caption=Onekey's New Update ${{ env.PACKAGE_VERSION }}" `
-F "parse_mode=Markdown" `
"https://api.telegram.org/bot${{ secrets.TELEGRAM_TOKEN }}/sendDocument"
- name: Gitee Release
uses: nicennnnnnnlee/action-gitee-release@v1.0.5
with:
gitee_owner: ikun0014
gitee_repo: Onekey
gitee_token: ${{ secrets.GITEE_TOKEN }}
gitee_tag_name: v${{ env.PACKAGE_VERSION }}
gitee_release_name: v${{ env.PACKAGE_VERSION }}
gitee_release_body: I don't know
gitee_target_commitish: main
gitee_upload_retry_times: 3
gitee_file_name: Onekey_v${{ env.PACKAGE_VERSION }}.exe
gitee_file_path: build/Onekey_v${{ env.PACKAGE_VERSION }}.exe
- name: Upload to Telegram Channel
run: |
& curl -F "chat_id=${{ secrets.TELEGRAM_TO }}" `
-F "document=@build/Onekey_v${{ env.PACKAGE_VERSION }}.exe" `
-F "caption=Onekey's New Update ${{ env.PACKAGE_VERSION }}" `
-F "parse_mode=Markdown" `
"https://api.telegram.org/bot${{ secrets.TELEGRAM_TOKEN }}/sendDocument"

4
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"python.analysis.autoImportCompletions": true,
"python.analysis.typeCheckingMode": "basic"
}

0
common/__init__.py Normal file
View File

45
common/log.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import colorama
import logging
import logzero
from logzero import setup_logger, LogFormatter
from .variable import LOG_FILE, DEBUG_MODE
if not os.path.exists(f"./logs"):
os.makedirs(f"./logs")
def log(name: str) -> logging.Logger:
if DEBUG_MODE:
level = logzero.DEBUG
else:
level = logzero.INFO
colors = {
logzero.DEBUG: colorama.Fore.CYAN,
logzero.INFO: colorama.Fore.GREEN,
logzero.WARNING: colorama.Fore.YELLOW,
logzero.ERROR: colorama.Fore.RED,
logzero.CRITICAL: colorama.Fore.MAGENTA,
}
terminal_formatter = LogFormatter(
color=True,
fmt="%(color)s%(message)s%(end_color)s",
datefmt="%Y-%m-%d %H:%M:%S",
colors=colors,
)
logger = setup_logger(name, level=level, formatter=terminal_formatter)
if LOG_FILE:
logfile = f"./logs/{name}.log"
file_handler = logging.FileHandler(logfile)
file_formatter = logging.Formatter(
"[%(asctime)s] | [%(name)s:%(levelname)s] | [%(module)s.%(funcName)s:%(lineno)d] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger

71
common/variable.py Normal file
View File

@@ -0,0 +1,71 @@
import os
import httpx
import sys
import winreg
import ujson as json
from pathlib import Path
def get_steam_path(config: dict) -> Path:
"""获取Steam安装路径"""
try:
if custom_path := config.get("Custom_Steam_Path"):
return Path(custom_path)
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Valve\Steam") as key:
return Path(winreg.QueryValueEx(key, "SteamPath")[0])
except Exception as e:
print(f"Steam路径获取失败: {str(e)}")
sys.exit(1)
DEFAULT_CONFIG = {
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"Debug_Mode": False,
"Logging_Files": True,
"Help": "Github Personal Token可在GitHub设置的Developer settings中生成",
}
def generate_config() -> None:
try:
with open(Path("./config.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(DEFAULT_CONFIG, indent=2, ensure_ascii=False))
print("配置文件已生成")
except IOError as e:
print(f"配置文件创建失败: {str(e)}")
sys.exit(1)
def load_config() -> dict:
if not Path("./config.json").exists():
generate_config()
print("请填写配置文件后重新运行程序")
os.system("pause")
try:
with open(Path("./config.json"), "r", encoding="utf-8") as f:
return json.loads(f.read())
except json.JSONDecodeError:
print("配置文件损坏,正在重新生成...")
generate_config()
sys.exit(1)
except Exception as e:
print(f"配置加载失败: {str(e)}")
sys.exit(1)
CLIENT = httpx.AsyncClient(verify=False)
CONFIG = load_config()
DEBUG_MODE = CONFIG.get("Debug_Mode", False)
LOG_FILE = CONFIG.get("Logging_Files", True)
GITHUB_TOKEN = str(CONFIG.get("Github_Personal_Token", ""))
STEAM_PATH = get_steam_path(CONFIG)
IS_CN = True
HEADER = {"Authorization": f"Bearer {GITHUB_TOKEN}"} if GITHUB_TOKEN else None
REPO_LIST = [
"SteamAutoCracks/ManifestHub",
"ikun0014/ManifestHub",
"Auiowu/ManifestAutoUpdate",
]

765
main.py
View File

@@ -1,532 +1,363 @@
import sys
import os
import sys
import traceback
import time
import logging
import subprocess
import asyncio
import aiofiles
import colorlog
import httpx
import winreg
import ujson as json
import vdf
from typing import Any
import time
from typing import Any, Tuple, List, Dict
from pathlib import Path
from colorama import init, Fore, Back, Style
from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn
from common import log, variable
from common.variable import (
CLIENT,
HEADER,
STEAM_PATH,
REPO_LIST,
)
init()
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
lock = asyncio.Lock()
client = httpx.AsyncClient(trust_env=True)
LOCK = asyncio.Lock()
LOG = log.log("Onekey")
DEFAULT_REPO = REPO_LIST[0]
DEFAULT_CONFIG = {
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"QA1": "温馨提示: Github_Personal_Token可在Github设置的最底下开发者选项找到, 详情看教程",
"教程": "https://ikunshare.com/Onekey_tutorial"
}
LOG_FORMAT = '%(log_color)s%(message)s'
LOG_COLORS = {
'INFO': 'cyan',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'purple',
}
def init_log(level=logging.DEBUG) -> logging.Logger:
""" 初始化日志模块 """
logger = logging.getLogger('Onekey')
logger.setLevel(level)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(level)
fmt = colorlog.ColoredFormatter(LOG_FORMAT, log_colors=LOG_COLORS)
stream_handler.setFormatter(fmt)
if not logger.handlers:
logger.addHandler(stream_handler)
return logger
log = init_log()
def init():
""" 输出初始化信息 """
banner_lines = [
f" _____ __ _ _____ _ _ _____ __ __ ",
f" / _ \\ | \\ | | | ____| | | / / | ____| \\ \\ / /",
f" | | | | | \\| | | |__ | |/ / | |__ \\ \\/ /",
f" | | | | | |\\ | | __| | |\\ \\ | __| \\ / ",
f" | |_| | | | \\ | | |___ | | \\ \\ | |___ / /",
f" \\_____/ |_| \\_| |_____| |_| \\_\\ |_____| /_/",
]
for line in banner_lines:
log.info(line)
log.info(f'作者: ikun0014')
log.warning(f'本项目采用GNU General Public License v3开源许可证, 请勿用于商业用途')
log.info(f'版本: 1.3.4')
log.info(f'项目Github仓库: https://github.com/ikunshare/Onekey \n Gitee: https://gitee.com/ikun0014/Onekey')
log.info(f'官网: ikunshare.com')
log.warning(
f'本项目完全开源免费, 如果你在淘宝, QQ群内通过购买方式获得, 赶紧回去骂商家死全家\n 交流群组:\n https://t.me/ikunshare_qun')
log.info('App ID可以在SteamDB, SteamUI或Steam商店链接页面查看')
def stack_error(exception: Exception) -> str:
""" 处理错误堆栈 """
stack_trace = traceback.format_exception(
type(exception), exception, exception.__traceback__)
return ''.join(stack_trace)
async def gen_config_file():
""" 生成配置文件 """
try:
async with aiofiles.open("./config.json", mode="w", encoding="utf-8") as f:
await f.write(json.dumps(DEFAULT_CONFIG, indent=2, ensure_ascii=False, escape_forward_slashes=False))
log.info('程序可能为第一次启动或配置重置,请填写配置文件后重新启动程序')
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
log.error(f'配置文件生成失败,{stack_error(e)}')
async def load_config():
""" 加载配置文件 """
if not os.path.exists('./config.json'):
await gen_config_file()
os.system('pause')
sys.exit()
try:
async with aiofiles.open("./config.json", mode="r", encoding="utf-8") as f:
config = json.loads(await f.read())
return config
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
log.error(f"配置文件加载失败,原因: {stack_error(e)},重置配置文件中...")
os.remove("./config.json")
await gen_config_file()
os.system('pause')
sys.exit()
config = asyncio.run(load_config())
async def check_github_api_rate_limit(headers):
url = 'https://api.github.com/rate_limit'
try:
r = await client.get(url, headers=headers)
r_json = r.json()
if r.status_code == 200:
rate_limit = r_json.get('rate', {})
remaining_requests = rate_limit.get('remaining', 0)
reset_time = rate_limit.get('reset', 0)
reset_time_formatted = time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(reset_time))
log.info(f'剩余请求次数: {remaining_requests}')
if remaining_requests == 0:
log.warning(f'GitHub API 请求数已用尽, 将在 {
reset_time_formatted} 重置,建议生成一个填在配置文件里')
else:
log.error('Github请求数检查失败, 网络错误')
except KeyboardInterrupt:
log.info("程序已退出")
except httpx.ConnectError as e:
log.error(f'检查Github API 请求数失败, {stack_error(e)}')
except httpx.ConnectTimeout as e:
log.error(f'检查Github API 请求数超时: {stack_error(e)}')
except Exception as e:
log.error(f'发生错误: {stack_error(e)}')
def init() -> None:
"""初始化控制台输出"""
banner = r"""
_____ __ _ _____ _ _ _____ __ __
/ _ \ | \ | | | ____| | | / / | ____| \ \ / /
| | | | | \| | | |__ | |/ / | |__ \ \/ /
| | | | | |\ | | __| | |\ \ | __| \ /
| |_| | | | \ | | |___ | | \ \ | |___ / /
\_____/ |_| \_| |_____| |_| \_\ |_____| /_/
"""
LOG.info(banner)
LOG.info("作者: ikun0014 | 版本: 1.4.4 | 官网: ikunshare.com")
LOG.info("项目仓库: GitHub: https://github.com/ikunshare/Onekey")
LOG.warning("ikunshare.com | 严禁倒卖")
LOG.warning("提示: 请确保已安装Windows 10/11并正确配置Steam;SteamTools/GreenLuma")
LOG.warning("开梯子必须配置Token, 你的IP我不相信能干净到哪")
async def checkcn() -> bool:
try:
req = await client.get('https://mips.kugou.com/check/iscn?&format=json')
req = await CLIENT.get("https://mips.kugou.com/check/iscn?&format=json")
body = req.json()
scn = bool(body['flag'])
if (not scn):
log.info(
f"您在非中国大陆地区({body['country']})上使用了项目, 已自动切换回Github官方下载CDN")
os.environ['IS_CN'] = 'no'
scn = bool(body["flag"])
if not scn:
LOG.info(
f"您在非中国大陆地区({body['country']})上使用了项目, 已自动切换回Github官方下载CDN"
)
variable.IS_CN = False
return False
else:
os.environ['IS_CN'] = 'yes'
variable.IS_CN = True
return True
except KeyboardInterrupt:
log.info("程序已退出")
except httpx.ConnectError as e:
os.environ['IS_CN'] = 'yes'
log.warning('检查服务器位置失败,已忽略,自动认为你在中国大陆')
log.warning(stack_error(e))
return False
async def depotkey_merge(config_path: Path, depots_config: dict) -> bool:
if not config_path.exists():
async with lock:
log.error('Steam默认配置不存在, 可能是没有登录账号')
return False
try:
async with aiofiles.open(config_path, encoding='utf-8') as f:
content = await f.read()
config = vdf.loads(content)
steam = config.get('InstallConfigStore', {}).get('Software', {}).get('Valve') or \
config.get('InstallConfigStore', {}).get(
'Software', {}).get('valve')
if steam is None:
log.error('找不到Steam配置, 请检查配置文件')
return False
depots = steam.setdefault('depots', {})
depots.update(depots_config.get('depots', {}))
async with aiofiles.open(config_path, mode='w', encoding='utf-8') as f:
new_context = vdf.dumps(config, pretty=True)
await f.write(new_context)
log.info('成功合并')
LOG.info("程序已退出")
return True
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
async with lock:
log.error(f'合并失败, 原因: {e}')
except httpx.ConnectError as e:
variable.IS_CN = True
LOG.warning("检查服务器位置失败,已忽略,自动认为你在中国大陆")
return False
async def get(sha: str, path: str, repo: str, chunk_size: int = 1024) -> bytearray:
if os.environ.get('IS_CN') == 'yes':
def stack_error(exception: Exception) -> str:
stack_trace = traceback.format_exception(
type(exception), exception, exception.__traceback__
)
return "".join(stack_trace)
async def check_github_api_rate_limit(headers):
url = "https://api.github.com/rate_limit"
try:
r = await CLIENT.get(url, headers=headers)
r_json = r.json()
if r.status_code == 200:
rate_limit = r_json.get("rate", {})
remaining_requests = rate_limit.get("remaining", 0)
reset_time = rate_limit.get("reset", 0)
reset_time_formatted = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(reset_time)
)
LOG.info(f"剩余请求次数: {remaining_requests}")
if remaining_requests == 0:
LOG.warning(
f"GitHub API 请求数已用尽, 将在 {reset_time_formatted} 重置,建议生成一个填在配置文件里"
)
else:
LOG.error("Github请求数检查失败, 网络错误")
except KeyboardInterrupt:
LOG.info("程序已退出")
except httpx.ConnectError as e:
LOG.error(f"检查Github API 请求数失败, {stack_error(e)}")
except httpx.ConnectTimeout as e:
LOG.error(f"检查Github API 请求数超时: {stack_error(e)}")
except Exception as e:
LOG.error(f"发生错误: {stack_error(e)}")
async def get_latest_repo_info(repos: list, app_id: str, headers) -> Any | str | None:
latest_date = None
selected_repo = None
for repo in repos:
url = f"https://api.github.com/repos/{repo}/branches/{app_id}"
r = await CLIENT.get(url, headers=headers)
r_json = r.json()
if r_json and "commit" in r_json:
date = r_json["commit"]["commit"]["author"]["date"]
if (latest_date is None) or (date > latest_date):
latest_date = str(date)
selected_repo = str(repo)
return selected_repo, latest_date
async def handle_depot_files(
repos: List, app_id: str, steam_path: Path
) -> List[Tuple[str, str]]:
collected = []
depot_map = {}
try:
selected_repo, latest_date = await get_latest_repo_info(
repos, app_id, headers=HEADER
) # type: ignore
if selected_repo:
branch_url = (
f"https://api.github.com/repos/{selected_repo}/branches/{app_id}"
)
branch_res = await CLIENT.get(branch_url, headers=HEADER)
branch_res.raise_for_status()
tree_url = branch_res.json()["commit"]["commit"]["tree"]["url"]
tree_res = await CLIENT.get(tree_url)
tree_res.raise_for_status()
depot_cache = steam_path / "depotcache"
depot_cache.mkdir(exist_ok=True)
LOG.info(f"当前选择清单仓库: https://github.com/{selected_repo}")
LOG.info(f"此清单分支上次更新时间:{latest_date}")
for item in tree_res.json()["tree"]:
file_path = str(item["path"])
if file_path.endswith(".manifest"):
save_path = depot_cache / file_path
if save_path.exists():
LOG.warning(f"已存在清单: {save_path}")
continue
content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, selected_repo
)
LOG.info(f"清单下载成功: {file_path}")
async with aiofiles.open(save_path, "wb") as f:
await f.write(content)
elif "key.vdf" in file_path.lower():
key_content = await fetch_from_cdn(
branch_res.json()["commit"]["sha"], file_path, selected_repo
)
collected.extend(parse_key_vdf(key_content))
for item in tree_res.json()["tree"]:
if not item["path"].endswith(".manifest"):
continue
filename = Path(item["path"]).stem
if "_" not in filename:
continue
depot_id, manifest_id = filename.replace(".manifest", "").split("_", 1)
if not (depot_id.isdigit() and manifest_id.isdigit()):
continue
depot_map.setdefault(depot_id, []).append(manifest_id)
for depot_id in depot_map:
depot_map[depot_id].sort(key=lambda x: int(x), reverse=True)
except httpx.HTTPStatusError as e:
LOG.error(f"HTTP错误: {e.response.status_code}")
except Exception as e:
LOG.error(f"文件处理失败: {str(e)}")
return collected, depot_map # type: ignore
async def fetch_from_cdn(sha: str, path: str, repo: str):
if variable.IS_CN:
url_list = [
f'https://jsdelivr.pai233.top/gh/{repo}@{sha}/{path}',
f'https://cdn.jsdmirror.com/gh/{repo}@{sha}/{path}',
f'https://raw.gitmirror.com/{repo}/{sha}/{path}',
f'https://raw.dgithub.xyz/{repo}/{sha}/{path}',
f'https://gh.akass.cn/{repo}/{sha}/{path}'
f"https://jsdelivr.pai233.top/gh/{repo}@{sha}/{path}",
f"https://cdn.jsdmirror.com/gh/{repo}@{sha}/{path}",
f"https://raw.gitmirror.com/{repo}/{sha}/{path}",
f"https://raw.dgithub.xyz/{repo}/{sha}/{path}",
f"https://gh.akass.cn/{repo}/{sha}/{path}",
]
else:
url_list = [
f'https://raw.githubusercontent.com/{repo}/{sha}/{path}'
]
url_list = [f"https://raw.githubusercontent.com/{repo}/{sha}/{path}"]
retry = 3
while retry > 0:
for url in url_list:
try:
r = await client.get(url, timeout=30)
r = await CLIENT.get(url, headers=HEADER, timeout=30)
if r.status_code == 200:
total_size = int(
r.headers.get('Content-Length', 0))
content = bytearray()
with Progress(
TextColumn(
"[progress.description]{task.description}", style="#66CCFF"),
BarColumn(
style="#66CCFF", complete_style="#4CE49F", finished_style="#2FE9D9"),
TextColumn(
"[progress.percentage]{task.percentage:>3.0f}%", style="#66CCFF"),
TimeElapsedColumn(),
) as progress:
task = progress.add_task(
f"下载{path}中...", total=total_size)
async for chunk in r.aiter_bytes(chunk_size):
content.extend(chunk)
progress.update(task, advance=len(chunk))
return content
return r.read()
else:
log.error(
f'获取失败: {path} - 状态码: {r.status_code}')
LOG.error(f"获取失败: {path} - 状态码: {r.status_code}")
except KeyboardInterrupt:
log.info("程序已退出")
LOG.info("程序已退出")
except httpx.ConnectError as e:
log.error(f'获取失败: {path} - 连接错误: {str(e)}')
LOG.error(f"获取失败: {path} - 连接错误: {str(e)}")
except httpx.ConnectTimeout as e:
log.error(f'连接超时: {url} - 错误: {str(e)}')
LOG.error(f"连接超时: {url} - 错误: {str(e)}")
retry -= 1
log.warning(f'重试剩余次数: {retry} - {path}')
LOG.warning(f"重试剩余次数: {retry} - {path}")
log.error(f'超过最大重试次数: {path}')
raise Exception(f'无法下载: {path}')
LOG.error(f"超过最大重试次数: {path}")
raise Exception(f"无法下载: {path}")
async def get_manifest(sha: str, path: str, steam_path: Path, repo: str) -> list:
collected_depots = []
depot_cache_path = steam_path / 'depotcache'
def parse_key_vdf(content: bytes) -> List[Tuple[str, str]]:
try:
depot_cache_path.mkdir(exist_ok=True)
if path.endswith('.manifest'):
save_path = depot_cache_path / path
if save_path.exists():
log.warning(f'已存在清单: {save_path}')
return collected_depots
content = await get(sha, path, repo)
log.info(f'清单下载成功: {path}')
async with aiofiles.open(save_path, 'wb') as f:
await f.write(content)
elif path == 'Key.vdf':
content = await get(sha, path, repo)
log.info(f'密钥下载成功: {path}')
depots_config = vdf.loads(content.decode('utf-8'))
collected_depots = [
(depot_id, depot_info['DecryptionKey'])
for depot_id, depot_info in depots_config['depots'].items()
]
except KeyboardInterrupt:
log.info("程序已退出")
depots = vdf.loads(content.decode("utf-8"))["depots"]
return [(d_id, d_info["DecryptionKey"]) for d_id, d_info in depots.items()]
except Exception as e:
log.error(f'处理失败: {path} - {stack_error(e)}')
raise
return collected_depots
LOG.error(f"密钥解析失败: {str(e)}")
return []
def get_steam_path() -> Path:
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Valve\Steam')
steam_path = Path(winreg.QueryValueEx(key, 'SteamPath')[0])
async def setup_unlock_tool(
depot_data: List[Tuple[str, str]],
app_id: str,
tool_choice: int,
depot_map: Dict,
) -> bool:
isGreenLuma = any(
(STEAM_PATH / dll).exists()
for dll in ["GreenLuma_2024_x86.dll", "GreenLuma_2024_x64.dll", "User32.dll"]
)
isSteamTools = (STEAM_PATH / "config" / "stUI").is_dir()
custom_steam_path = config.get("Custom_Steam_Path", "").strip()
return Path(custom_steam_path) if custom_steam_path else steam_path
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
log.error(f'Steam路径获取失败, {stack_error(e)}, 请检查是否正确安装Steam')
os.system('pause')
return Path()
if (tool_choice == 1) and (isSteamTools):
return await setup_steamtools(depot_data, app_id, depot_map)
elif (tool_choice == 2) and (isGreenLuma):
return await setup_greenluma(depot_data)
else:
LOG.error("你选的啥?")
return False
steam_path = get_steam_path()
isGreenLuma = any((steam_path / dll).exists()
for dll in ['GreenLuma_2024_x86.dll', 'GreenLuma_2024_x64.dll', 'User32.dll'])
isSteamTools = (steam_path / 'config' / 'stUI').is_dir()
directory = Path(steam_path) / "config" / "stplug-in"
temp_path = Path('./temp')
setup_url = 'https://steamtools.net/res/SteamtoolsSetup.exe'
setup_file = temp_path / 'SteamtoolsSetup.exe'
async def setup_steamtools(
depot_data: List[Tuple[str, str]], app_id: str, depot_map: Dict
) -> bool:
st_path = STEAM_PATH / "config" / "stplug-in"
st_path.mkdir(exist_ok=True)
choice = input(f"是否锁定版本(推荐在选择仓库1时使用)?(y/n): ").lower()
async def download_setup_file() -> None:
log.info('开始下载 SteamTools 安装程序...')
try:
r = await client.get(setup_url, timeout=30)
if r.status_code == 200:
total_size = int(r.headers.get('Content-Length', 0))
chunk_size = 8192
with Progress(
TextColumn(
"[progress.description]{task.description}", style="#66CCFF"),
BarColumn(
style="#66CCFF", complete_style="#4CE49F", finished_style="#2FE9D9"),
TextColumn(
"[progress.percentage]{task.percentage:>3.0f}%", style="#66CCFF"),
TimeElapsedColumn(),
) as progress:
task = progress.add_task(
f"下载安装程序中...", total=total_size)
async with aiofiles.open(setup_file, mode='wb') as f:
async for chunk in r.aiter_bytes(chunk_size):
await f.write(chunk)
progress.update(task, advance=len(chunk))
log.info('安装程序下载完成')
if choice == "y":
versionlock = True
else:
versionlock = False
lua_content = f'addappid({app_id}, 1, "None")\n'
for d_id, d_key in depot_data:
if versionlock:
for manifest_id in depot_map[d_id]:
lua_content += f'addappid({d_id}, 1, "{d_key}")\nsetManifestid({d_id},"{manifest_id}")\n'
else:
log.error(f'网络错误,无法下载安装程序,状态码: {r.status_code}')
except KeyboardInterrupt:
log.info("程序已退出")
except httpx.ConnectTimeout:
log.error('下载时超时')
except Exception as e:
log.error(f'下载失败: {e}')
lua_content += f'addappid({d_id}, 1, "{d_key}")\n'
lua_file = st_path / f"{app_id}.lua"
async with aiofiles.open(lua_file, "w") as f:
await f.write(lua_content)
async def migrate(st_use: bool) -> None:
try:
if st_use:
log.info('检测到你正在使用 SteamTools,尝试迁移旧文件')
if directory.exists():
for file in directory.iterdir():
if file.is_file() and file.name.startswith("Onekey_unlock_"):
new_filename = file.name[len("Onekey_unlock_"):]
try:
file.rename(directory / new_filename)
log.info(f'Renamed: {file.name} -> {new_filename}')
except Exception as e:
log.error(
f'重命名失败 {file.name} -> {new_filename}: {e}')
else:
log.error('故障,正在重新安装 SteamTools')
temp_path.mkdir(parents=True, exist_ok=True)
await download_setup_file(client)
subprocess.run(str(setup_file), check=True)
for file in temp_path.iterdir():
file.unlink()
temp_path.rmdir()
else:
log.info('未使用 SteamTools,停止迁移')
except KeyboardInterrupt:
log.info("程序已退出")
proc = await asyncio.create_subprocess_exec(
str(st_path / "luapacka.exe"),
str(lua_file),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.wait()
if proc.returncode != 0:
LOG.error(f"Lua编译失败: {await proc.stderr.read()}") # type: ignore
return False
if lua_file.exists():
os.remove(lua_file)
LOG.info(f"删除临时文件: {lua_file}")
async def stool_add(depot_data: list, app_id: str) -> bool:
lua_filename = f"{app_id}.lua"
lua_filepath = steam_path / "config" / "stplug-in" / lua_filename
async with lock:
log.info(f'SteamTools 解锁文件生成: {lua_filepath}')
try:
async with aiofiles.open(lua_filepath, mode="w", encoding="utf-8") as lua_file:
await lua_file.write(f'addappid({app_id}, 1, "None")\n')
for depot_id, depot_key in depot_data:
await lua_file.write(f'addappid({depot_id}, 1, "{depot_key}")\n')
luapacka_path = steam_path / "config" / "stplug-in" / "luapacka.exe"
log.info(f'正在处理文件: {lua_filepath}')
result = subprocess.run(
[str(luapacka_path), str(lua_filepath)],
capture_output=True
)
if result.returncode != 0:
log.error(f'调用失败: {result.stderr.decode()}')
return False
log.info('处理完成')
except KeyboardInterrupt:
log.info("程序已退出")
except Exception as e:
log.error(f'处理过程出现错误: {e}')
return False
finally:
if lua_filepath.exists():
os.remove(lua_filepath)
log.info(f'删除临时文件: {lua_filepath}')
return True
async def greenluma_add(depot_id_list: list) -> bool:
app_list_path = steam_path / 'AppList'
async def setup_greenluma(depot_data: List[Tuple[str, str]]) -> bool:
applist_dir = STEAM_PATH / "AppList"
applist_dir.mkdir(exist_ok=True)
for f in applist_dir.glob("*.txt"):
f.unlink()
for idx, (d_id, _) in enumerate(depot_data, 1):
(applist_dir / f"{idx}.txt").write_text(str(d_id))
config_path = STEAM_PATH / "config" / "config.vdf"
async with aiofiles.open(config_path, "r+") as f:
content = vdf.loads(await f.read())
content.setdefault("depots", {}).update(
{d_id: {"DecryptionKey": d_key} for d_id, d_key in depot_data}
)
await f.seek(0)
await f.write(vdf.dumps(content))
return True
async def main_flow(app_id: str):
app_id_list = list(filter(str.isdecimal, app_id.strip().split("-")))
if not app_id_list:
LOG.error(f"App ID无效")
os.system("pause")
return False
app_id = app_id_list[0]
try:
app_list_path.mkdir(parents=True, exist_ok=True)
for file in app_list_path.glob('*.txt'):
file.unlink(missing_ok=True)
depot_dict = {
int(i.stem): int(i.read_text(encoding='utf-8').strip())
for i in app_list_path.iterdir() if i.is_file() and i.stem.isdecimal() and i.suffix == '.txt'
}
for depot_id in map(int, depot_id_list):
if depot_id not in depot_dict.values():
index = max(depot_dict.keys(), default=-1) + 1
while index in depot_dict:
index += 1
(app_list_path /
f'{index}.txt').write_text(str(depot_id), encoding='utf-8')
depot_dict[index] = depot_id
await checkcn()
await check_github_api_rate_limit(HEADER)
tool_choice = int(input("请选择解锁工具 (1.SteamTools 2.GreenLuma): "))
depot_data, depot_map = await handle_depot_files(REPO_LIST, app_id, STEAM_PATH)
if await setup_unlock_tool(depot_data, app_id, tool_choice, depot_map): # type: ignore
LOG.info("游戏解锁配置成功!")
LOG.info("重启Steam后生效")
else:
LOG.error("配置失败")
os.system("pause")
return True
except Exception as e:
print(f'处理时出错: {e}')
LOG.error(f"运行错误: {stack_error(e)}")
return False
async def fetch_branch_info(url, headers) -> str | None:
try:
r = await client.get(url, headers=headers)
return r.json()
except KeyboardInterrupt:
log.info("程序已退出")
return False
finally:
await CLIENT.aclose()
return True
if __name__ == "__main__":
try:
init()
app_id = input(f"请输入游戏AppID: ").strip()
asyncio.run(main_flow(app_id))
except (asyncio.CancelledError, KeyboardInterrupt):
os.system("pause")
except Exception as e:
log.error(f'获取信息失败: {stack_error(e)}')
return None
except httpx.ConnectTimeout as e:
log.error(f'获取信息时超时: {stack_error(e)}')
return None
async def get_latest_repo_info(repos: list, app_id: str, headers) -> Any | None:
latest_date = None
selected_repo = None
for repo in repos:
url = f'https://api.github.com/repos/{repo}/branches/{app_id}'
r_json = await fetch_branch_info(url, headers)
if r_json and 'commit' in r_json:
date = r_json['commit']['commit']['author']['date']
if (latest_date is None) or (date > latest_date):
latest_date = date
selected_repo = repo
return selected_repo, latest_date
async def main(app_id: str, repos: list) -> bool:
app_id_list = list(filter(str.isdecimal, app_id.strip().split('-')))
if not app_id_list:
log.error(f'App ID无效')
return False
app_id = app_id_list[0]
github_token = config.get("Github_Personal_Token", "")
headers = {'Authorization': f'Bearer {
github_token}'} if github_token else None
await checkcn()
await check_github_api_rate_limit(headers)
selected_repo, latest_date = await get_latest_repo_info(repos, app_id, headers)
if (selected_repo):
log.info(f'选择清单仓库: {selected_repo}')
url = f'https://api.github.com/repos/{selected_repo}/branches/{app_id}'
r_json = await fetch_branch_info(url, headers)
if (r_json) and ('commit' in r_json):
sha = r_json['commit']['sha']
url = r_json['commit']['commit']['tree']['url']
r2_json = await fetch_branch_info(url, headers)
if (r2_json) and ('tree' in r2_json):
collected_depots = []
for item in r2_json['tree']:
result = await get_manifest(sha, item['path'], steam_path, selected_repo)
collected_depots.extend(result)
if collected_depots:
if isSteamTools:
await migrate(st_use=True)
await stool_add(collected_depots, app_id)
log.info('找到SteamTools, 已添加解锁文件')
elif isGreenLuma:
await migrate(st_use=False)
await greenluma_add([app_id])
depot_config = {'depots': {depot_id: {
'DecryptionKey': depot_key} for depot_id, depot_key in collected_depots}}
await depotkey_merge(steam_path / 'config' / 'config.vdf', depot_config)
if await greenluma_add([int(i) for i in depot_config['depots'] if i.isdecimal()]):
log.info('找到GreenLuma, 已添加解锁文件')
log.info(f'清单最后更新时间: {latest_date}')
log.info(f'入库成功: {app_id}')
await client.aclose()
os.system('pause')
return True
log.error(f'清单下载或生成失败: {app_id}')
await client.aclose()
os.system('pause')
return False
if __name__ == '__main__':
init()
try:
while True:
repos = [
'ikun0014/ManifestHub',
'Auiowu/ManifestAutoUpdate',
'tymolu233/ManifestAutoUpdate',
]
app_id = input(f"{Fore.CYAN}{Back.BLACK}{
Style.BRIGHT}请输入游戏AppID: {Style.RESET_ALL}").strip()
asyncio.run(main(app_id, repos))
except KeyboardInterrupt:
log.info("程序已退出")
except SystemExit:
sys.exit()
LOG.error(f"错误:{stack_error(e)}")
os.system("pause")

View File

@@ -1,6 +1,6 @@
{
"name": "onekey",
"version": "1.3.4",
"version": "1.4.4",
"description": "一个Steam仓库清单下载器",
"main": "index.js",
"scripts": {

View File

@@ -1,7 +1,5 @@
aiofiles==24.1.0
colorama==0.4.6
colorlog==6.8.2
httpx==0.27.2
rich==13.9.4
ujson==5.10.0
vdf==3.4
aiofiles
httpx
logzero
ujson
vdf