Compare commits

...

3 Commits
1.1.3 ... 1.1.5

Author SHA1 Message Date
ikun
b4ea1edb53 chore: 优化代码工整性 2024-09-01 14:00:20 +08:00
ikun
f555f0273d rm: 搜索游戏自动获取App ID 2024-08-29 17:22:04 +08:00
ikun
3ba8b67f0e fix: bug 2024-08-27 09:48:20 +08:00
9 changed files with 177 additions and 199 deletions

0
common/__init__.py Normal file
View File

36
common/config.py Normal file
View File

@@ -0,0 +1,36 @@
import ujson as json
import aiofiles
from . import log
import os
import sys
import asyncio
log = log.log
# 生成配置文件
async def gen_config_file():
default_config ={
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"QA1": "温馨提示Github_Personal_Token可在Github设置的最底下开发者选项找到详情看教程",
"教程": "https://lyvx-my.sharepoint.com/:w:/g/personal/ikun_ikunshare_com/EWqIqyCElLNLo_CKfLbqix0BWU_O03HLzEHQKHdJYrUz-Q?e=79MZjw"
}
async with aiofiles.open("./config.json", mode="w", encoding="utf-8") as f:
await f.write(json.dumps(default_config, indent=2, ensure_ascii=False,
escape_forward_slashes=False))
await f.close()
log.info(' 🖱️ 程序可能为第一次启动,请填写配置文件后重新启动程序')
# 加载配置文件
async def load_config():
if not os.path.exists('./config.json'):
await gen_config_file()
os.system('pause')
sys.exit()
else:
async with aiofiles.open("./config.json", mode="r", encoding="utf-8") as f:
config = json.loads(await f.read())
return config
config = asyncio.run(load_config())

17
common/getsteampath.py Normal file
View File

@@ -0,0 +1,17 @@
import winreg
from pathlib import Path
from common import config
config = config.config
# 通过注册表获取Steam安装路径
def get_steam_path():
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Valve\Steam')
steam_path = Path(winreg.QueryValueEx(key, 'SteamPath')[0])
custom_steam_path = config["Custom_Steam_Path"]
if not custom_steam_path == '':
return Path(custom_steam_path)
else:
return steam_path
steam_path = get_steam_path()

33
common/glunlock.py Normal file
View File

@@ -0,0 +1,33 @@
from common import getsteampath
steam_path = getsteampath.steam_path
# 增加GreenLuma解锁相关文件
async def greenluma_add(depot_id_list):
app_list_path = steam_path / 'AppList'
if app_list_path.exists() and app_list_path.is_file():
app_list_path.unlink(missing_ok=True)
if not app_list_path.is_dir():
app_list_path.mkdir(parents=True, exist_ok=True)
depot_dict = {}
for i in app_list_path.iterdir():
if i.stem.isdecimal() and i.suffix == '.txt':
with i.open('r', encoding='utf-8') as f:
app_id_ = f.read().strip()
depot_dict[int(i.stem)] = None
if app_id_.isdecimal():
depot_dict[int(i.stem)] = int(app_id_)
for depot_id in depot_id_list:
if int(depot_id) not in depot_dict.values():
index = max(depot_dict.keys()) + 1 if depot_dict.keys() else 0
if index != 0:
for i in range(max(depot_dict.keys())):
if i not in depot_dict.keys():
index = i
break
with (app_list_path / f'{index}.txt').open('w', encoding='utf-8') as f:
f.write(str(depot_id))
depot_dict[index] = int(depot_id)
return True
glunlock = greenluma_add

23
common/log.py Normal file
View File

@@ -0,0 +1,23 @@
import colorlog
import logging
# 初始化日志记录器
def init_log():
logger = logging.getLogger('Onekey')
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
fmt_string = '%(log_color)s[%(name)s][%(levelname)s]%(message)s'
log_colors = {
'INFO': 'cyan',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'purple'
}
fmt = colorlog.ColoredFormatter(fmt_string, log_colors=log_colors)
stream_handler.setFormatter(fmt)
logger.addHandler(stream_handler)
return logger
log = init_log()

8
common/stack_error.py Normal file
View File

@@ -0,0 +1,8 @@
import traceback
# 错误堆栈处理
def stack_error(exception):
stack_trace = traceback.format_exception(type(exception), exception, exception.__traceback__)
return ''.join(stack_trace)
stack_error = stack_error

29
common/stunlock.py Normal file
View File

@@ -0,0 +1,29 @@
from common import log, getsteampath
import asyncio
import aiofiles
import os
import subprocess
lock = asyncio.Lock()
log = log.log
steam_path = getsteampath.steam_path
# 增加SteamTools解锁相关文件
async def stool_add(depot_data, app_id):
lua_filename = f"Onekey_unlock_{app_id}.lua"
lua_filepath = steam_path / "config" / "stplug-in" / lua_filename
async with lock:
log.info(f' ✅ SteamTools解锁文件生成: {lua_filepath}')
async with aiofiles.open(lua_filepath, mode="w", encoding="utf-8") as lua_file:
await lua_file.write(f'addappid({app_id}, 1, "None")\n')
for depot_id, depot_key in depot_data:
await lua_file.write(f'addappid({depot_id}, 1, "{depot_key}")\n')
luapacka_path = steam_path / "config" / "stplug-in" / "luapacka.exe"
subprocess.run([str(luapacka_path), str(lua_filepath)])
os.remove(lua_filepath)
return True
stunlock = stool_add

226
main.py
View File

@@ -1,70 +1,22 @@
import os
import vdf
import winreg
import aiofiles
import traceback
import subprocess
import colorlog
import logging
import ujson as json
import time
import sys
import psutil
import asyncio
from common import log, config, getsteampath, stunlock, glunlock, stack_error
from aiohttp import ClientSession, ClientError
from pathlib import Path
# 初始化日志记录器
def init_log():
logger = logging.getLogger('Onekey')
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
fmt_string = '%(log_color)s[%(name)s][%(levelname)s]%(message)s'
log_colors = {
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'purple'
}
fmt = colorlog.ColoredFormatter(fmt_string, log_colors=log_colors)
stream_handler.setFormatter(fmt)
logger.addHandler(stream_handler)
return logger
# 生成配置文件
def gen_config_file():
default_config ={
"Github_Personal_Token": "",
"Custom_Steam_Path": "",
"QA1": "温馨提示Github_Personal_Token可在Github设置的最底下开发者选项找到详情看教程",
"教程": "https://lyvx-my.sharepoint.com/:w:/g/personal/ikun_ikunshare_com/EWqIqyCElLNLo_CKfLbqix0BWU_O03HLzEHQKHdJYrUz-Q?e=79MZjw"
}
with open("./config.json", "w", encoding="utf-8") as f:
f.write(json.dumps(default_config, indent=2, ensure_ascii=False,
escape_forward_slashes=False))
f.close()
log.info(' 🖱️ 程序可能为第一次启动,请填写配置文件后重新启动程序')
# 加载配置文件
def load_config():
if not os.path.exists('./config.json'):
gen_config_file()
os.system('pause')
sys.exit()
else:
with open("./config.json", "r", encoding="utf-8") as f:
config = json.loads(f.read())
return config
log = init_log()
config = load_config()
log = log.log
config = config.config
lock = asyncio.Lock()
steam_path = getsteampath.steam_path
isGreenLuma = any((steam_path / dll).exists() for dll in ['GreenLuma_2024_x86.dll', 'GreenLuma_2024_x64.dll', 'User32.dll'])
isSteamTools = (steam_path / 'config' / 'stplug-in').is_dir()
stunlock = stunlock.stunlock
glunlock = glunlock.glunlock
stack_error = stack_error.stack_error
print('\033[1;32;40m _____ __ _ _____ _ _ _____ __ __ ' + '\033[0m')
print('\033[1;32;40m / _ \\ | \\ | | | ____| | | / / | ____| \\ \\ / /' + '\033[0m')
@@ -73,71 +25,12 @@ print('\033[1;32;40m | | | | | |\\ | | __| | |\\ \\ | __| \\ / ' + '\
print('\033[1;32;40m | |_| | | | \\ | | |___ | | \\ \\ | |___ / /' + '\033[0m')
print('\033[1;32;40m \\_____/ |_| \\_| |_____| |_| \\_\\ |_____| /_/' + '\033[0m')
log.info('作者ikun0014')
log.info('本项目基于wxy1343/ManifestAutoUpdate进行修改采用GPL V3许可证')
log.info('版本1.1.3')
log.info('本项目基于wxy1343/ManifestAutoUpdate进行修改采用ACSL许可证')
log.info('版本1.1.6')
log.info('项目仓库https://github.com/ikunshare/Onekey')
log.debug('官网ikunshare.com')
log.info('官网ikunshare.com')
log.warning('本项目完全开源免费如果你在淘宝QQ群内通过购买方式获得赶紧回去骂商家死全家\n交流群组:\n点击链接加入群聊【𝗶𝗸𝘂𝗻分享】https://qm.qq.com/q/d7sWovfAGI\nhttps://t.me/ikunshare_group')
# 通过注册表获取Steam安装路径
def get_steam_path():
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Valve\Steam')
steam_path = Path(winreg.QueryValueEx(key, 'SteamPath')[0])
custom_steam_path = config.get("Custom_Steam_Path", "")
if not custom_steam_path == '':
return Path(custom_steam_path)
else:
return steam_path
steam_path = get_steam_path()
isGreenLuma = any((steam_path / dll).exists() for dll in ['GreenLuma_2024_x86.dll', 'GreenLuma_2024_x64.dll', 'User32.dll'])
isSteamTools = (steam_path / 'config' / 'stplug-in').is_dir()
# 错误堆栈处理
def stack_error(exception):
stack_trace = traceback.format_exception(type(exception), exception, exception.__traceback__)
return ''.join(stack_trace)
# 从Steam API直接搜索游戏信息
async def search_game_info(search_term):
async with ClientSession() as session:
url = f'https://steamui.com/loadGames.php?search={search_term}'
async with session.get(url) as r:
if r.status == 200:
data = await r.json()
games = data.get('games', [])
return games
else:
log.error("⚠ 获取游戏信息失败")
return []
# 通过游戏名查找appid
async def find_appid_by_name(game_name):
games = await search_game_info(game_name)
if games:
log.info("🔍 找到以下匹配的游戏:")
for idx, game in enumerate(games, 1):
if game['schinese_name'] == '':
gamename = game['name']
else:
gamename = game['schinese_name']
log.info(f"{idx}. {gamename} (AppID: {game['appid']})")
choice = input("请选择游戏编号:")
if choice.isdigit() and 1 <= int(choice) <= len(games):
selected_game = games[int(choice) - 1]
log.info(f"✅ 选择的游戏: {selected_game['schinese_name']} (AppID: {selected_game['appid']})")
return selected_game['appid'], selected_game['schinese_name']
log.error("⚠ 未找到匹配的游戏")
return None, None
# 下载清单
async def get(sha, path, repo, session):
url_list = [
@@ -164,7 +57,6 @@ async def get(sha, path, repo, session):
log.error(f' 🔄 超过最大重试次数: {path}')
raise Exception(f' 🔄 无法下载: {path}')
# 获取清单信息
async def get_manifest(sha, path, steam_path: Path, repo, session):
collected_depots = []
@@ -193,14 +85,13 @@ async def get_manifest(sha, path, steam_path: Path, repo, session):
raise
return collected_depots
# 合并DecryptionKey
async def depotkey_merge(config_path, depots_config):
if not config_path.exists():
async with lock:
log.error(' 👋 Steam默认配置不存在可能是没有登录账号')
return
with open(config_path, encoding='utf-8') as f:
async with aiofiles.open(config_path, encoding='utf-8') as f:
config = vdf.load(f)
software = config['InstallConfigStore']['Software']
valve = software.get('Valve') or software.get('valve')
@@ -208,66 +99,10 @@ async def depotkey_merge(config_path, depots_config):
if 'depots' not in steam:
steam['depots'] = {}
steam['depots'].update(depots_config['depots'])
with open(config_path, 'w', encoding='utf-8') as f:
async with aiofiles.open(config_path, mode='w', encoding='utf-8') as f:
vdf.dump(config, f, pretty=True)
return True
# 增加SteamTools解锁相关文件
async def stool_add(depot_data, app_id):
lua_filename = f"Onekey_unlock_{app_id}.lua"
lua_filepath = steam_path / "config" / "stplug-in" / lua_filename
async with lock:
log.info(f' ✅ SteamTools解锁文件生成: {lua_filepath}')
with open(lua_filepath, "w", encoding="utf-8") as lua_file:
lua_file.write(f'addappid({app_id}, 1, "None")\n')
for depot_id, depot_key in depot_data:
lua_file.write(f'addappid({depot_id}, 1, "{depot_key}")\n')
luapacka_path = steam_path / "config" / "stplug-in" / "luapacka.exe"
subprocess.run([str(luapacka_path), str(lua_filepath)])
os.remove(lua_filepath)
return True
# 增加GreenLuma解锁相关文件
async def greenluma_add(depot_id_list):
app_list_path = steam_path / 'AppList'
if app_list_path.exists() and app_list_path.is_file():
app_list_path.unlink(missing_ok=True)
if not app_list_path.is_dir():
app_list_path.mkdir(parents=True, exist_ok=True)
depot_dict = {}
for i in app_list_path.iterdir():
if i.stem.isdecimal() and i.suffix == '.txt':
with i.open('r', encoding='utf-8') as f:
app_id_ = f.read().strip()
depot_dict[int(i.stem)] = None
if app_id_.isdecimal():
depot_dict[int(i.stem)] = int(app_id_)
for depot_id in depot_id_list:
if int(depot_id) not in depot_dict.values():
index = max(depot_dict.keys()) + 1 if depot_dict.keys() else 0
if index != 0:
for i in range(max(depot_dict.keys())):
if i not in depot_dict.keys():
index = i
break
with (app_list_path / f'{index}.txt').open('w', encoding='utf-8') as f:
f.write(str(depot_id))
depot_dict[index] = int(depot_id)
return True
# 检查进程是否运行
def check_process_running(process_name):
for process in psutil.process_iter(['name']):
if process.info['name'] == process_name:
return True
return False
async def check_github_api_rate_limit(headers, session):
url = 'https://api.github.com/rate_limit'
@@ -284,18 +119,19 @@ async def check_github_api_rate_limit(headers, session):
reset_time = rate_limit['reset']
reset_time_formatted = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_time))
log.info(f' 🔄 剩余请求次数: {remaining_requests}')
else:
log.error('Github请求数检查失败')
if remaining_requests == 0:
log.warning(f' ⚠ GitHub API 请求数已用尽,将在 {reset_time_formatted} 重置, 不想等生成一个填配置文件里')
# 主函数
async def main(app_id, game_name):
async def main(app_id):
app_id_list = list(filter(str.isdecimal, app_id.strip().split('-')))
app_id = app_id_list[0]
async with ClientSession() as session:
github_token = config.get("Github_Personal_Token", "")
github_token = config["Github_Personal_Token"]
headers = {'Authorization': f'Bearer {github_token}'} if github_token else None
latest_date = None
selected_repo = None
@@ -333,21 +169,22 @@ async def main(app_id, game_name):
collected_depots.extend(result)
if collected_depots:
if isSteamTools:
await stool_add(collected_depots, app_id)
await stunlock(collected_depots, app_id)
log.info(' ✅ 找到SteamTools已添加解锁文件')
if isGreenLuma:
await greenluma_add([app_id])
await glunlock([app_id])
depot_config = {'depots': {depot_id: {'DecryptionKey': depot_key} for depot_id, depot_key in collected_depots}}
await depotkey_merge(steam_path / 'config' / 'config.vdf', depot_config)
if await greenluma_add([int(i) for i in depot_config['depots'] if i.isdecimal()]):
if await glunlock([int(i) for i in depot_config['depots'] if i.isdecimal()]):
log.info(' ✅ 找到GreenLuma已添加解锁文件')
log.info(f' ✅ 清单最后更新时间:{date}')
log.info(f' ✅ 入库成功: {app_id}{game_name}')
log.info(f' ✅ 入库成功: {app_id}')
os.system('pause')
return True
log.error(f' ⚠ 清单下载或生成失败: {app_id}{game_name}')
log.error(f' ⚠ 清单下载或生成失败: {app_id}')
os.system('pause')
return False
repos = [
'ManifestHub/ManifestHub',
'ikun0014/ManifestHub',
@@ -356,16 +193,11 @@ repos = [
]
if __name__ == '__main__':
try:
log.debug('App ID可以在SteamDB或Steam商店链接页面查看')
user_input = input("请输入游戏AppID或名称").strip()
appid, game_name = asyncio.run(find_appid_by_name(user_input))
if not appid:
log.error(' ⚠ 未找到匹配的游戏,请尝试其他名称。')
asyncio.run(main(appid, game_name))
log.info('App ID可以在SteamDB或Steam商店链接页面查看')
app_id = input("请输入游戏AppID").strip()
asyncio.run(main(app_id))
except KeyboardInterrupt:
exit()
except Exception as e:
log.error(f' ⚠ 发生错误: {stack_error(e)}')
traceback.print_exc()
if not user_input:
os.system('pause')
traceback.print_exc()

View File

@@ -1,5 +1,5 @@
aiofiles==24.1.0
aiohttp==3.9.5
colorlog==6.8.2
psutil==6.0.0
ujson==5.10.0
vdf==3.4
nuitka==2.4.8