Compare commits

..

5 Commits

Author SHA1 Message Date
wei 8bd09688bb Add color logs and some fix. 2026-05-18 23:29:58 +08:00
wei 8a3b1b1638 Add .gitignore. 2026-05-17 21:22:25 +08:00
wei 07276f6dd7 Some changes. 2026-05-17 21:14:40 +08:00
wei 2d751f6f19 Some changes. 2026-05-17 19:16:00 +08:00
wei d1663c96e9 Update files. 2026-05-17 17:16:58 +08:00
7 changed files with 2567 additions and 1 deletions
+3
View File
@@ -0,0 +1,3 @@
/servers/
/data/
/config/
+1 -1
View File
@@ -1,3 +1,3 @@
# ServerJar
Create/Manage multiple Mineraft server all in one tool.
Create/Manage multiple Minecraft server all in one tool.
+770
View File
@@ -0,0 +1,770 @@
import argparse
import asyncio
import base64
import binascii
import queue
import re
import shutil
import ssl
import sys
import threading
import socket
import time
import traceback
from pathlib import Path
from prompt_toolkit import Application
from prompt_toolkit.layout import Layout, HSplit
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.widgets import TextArea
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.styles import Style
from prompt_toolkit.filters import has_focus
from prompt_toolkit.shortcuts import clear as ptk_clear
version = "Beta-1"
SERVER_JAR_DIR = Path.home() / ".serverjar"
CLIENT_CERT_DIR = Path.home() / ".serverjar" / "client" / "cert"
CLIENT_CERT_SUFFIXES = {".pem", ".crt", ".cer"}
CLIENT_HISTORY_FILE = Path.home() / ".serverjar" / "history" / "history.txt"
CLIENT_LOG_DIR = Path.home() / ".serverjar" / "client" / "logs"
def add_client_cert(cert_path):
source = Path(cert_path).expanduser()
if not source.exists():
raise FileNotFoundError(f"{source} does not exist")
if not source.is_file():
raise IsADirectoryError(f"{source} is not a file")
if source.suffix.lower() not in CLIENT_CERT_SUFFIXES:
allowed = ", ".join(sorted(CLIENT_CERT_SUFFIXES))
raise ValueError(f"Unsupported certificate suffix '{source.suffix}'. Allowed: {allowed}")
CLIENT_CERT_DIR.mkdir(parents=True, exist_ok=True)
target = CLIENT_CERT_DIR / source.name
if source.resolve() == target.resolve():
return target
shutil.copy2(source, target)
return target
def run_cli_action(argv=None):
argv = list(sys.argv[1:] if argv is None else argv)
if not argv or argv[0] != "--add-cert":
return False
parser = argparse.ArgumentParser(prog=f"{Path(sys.argv[0]).name} --add-cert")
parser.add_argument("cert_path", help="Path to a PEM/CRT/CER certificate file")
args = parser.parse_args(argv[1:])
try:
target = add_client_cert(args.cert_path)
except Exception as e:
print(f"Unable to add certificate: {e}", file=sys.stderr)
sys.exit(1)
print(f"Certificate added: {target}")
return True
def create_client_tls_context(log=None, warn=None):
CLIENT_CERT_DIR.mkdir(parents=True, exist_ok=True)
context = ssl.create_default_context()
loaded_certs = []
for cert_path in sorted(CLIENT_CERT_DIR.iterdir()):
if not cert_path.is_file() or cert_path.suffix.lower() not in CLIENT_CERT_SUFFIXES:
continue
try:
context.load_verify_locations(cafile=cert_path)
except ssl.SSLError as e:
if callable(warn):
warn(f"Unable to load TLS certificate {cert_path}: {e}")
continue
loaded_certs.append(cert_path.name)
if callable(log):
if loaded_certs:
log("Loaded TLS certificate(s): {}".format(", ".join(loaded_certs)))
else:
log(f"No custom TLS certificates found in {CLIENT_CERT_DIR}")
return context
def get_history(log=None, warn=None):
if not CLIENT_HISTORY_FILE.exists():
return []
CLIENT_HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
if callable(log):
log("Restoring command history...")
try:
with CLIENT_HISTORY_FILE.open("r", encoding="utf-8") as f:
return [line for line in f.read().splitlines() if line]
except Exception as e:
if callable(warn):
warn(f"Unable to restore command history: {e}")
return []
def save_history(history, log=None, warn=None):
CLIENT_HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
if callable(log):
log("Saving command history...")
try:
with CLIENT_HISTORY_FILE.open("w", encoding="utf-8") as f:
f.write("\n".join(history))
except Exception as e:
if callable(warn):
warn(f"Unable to save command history: {e}")
class ServerJarClient(Application):
def __init__(self, **kwargs):
super().__init__(**kwargs, mouse_support=True)
# Text style
self.style = Style.from_dict({
"input": "bg:#222222 #ffffff",
"separator-area": "bg:#000000 #ffffff",
"message-area": "bg:#111111 #ffffff",
"log": "bg:#000000 #ffffff",
"warning": "bg:#000000 ansiyellow",
"error": "bg:#000000 ansibrightred bold",
"system": "bg:#000000 ansicyan",
"process-log": "bg:#000000 ansigreen",
"process-error": "bg:#000000 ansired",
"unknown": "bg:#000000 ansiwhite bold",
})
class LogLexer(Lexer):
tag_pattern = re.compile(r"^\[([A-Za-z0-9_. -]+)([:|][A-Za-z0-9_. -]+)?\]")
line_style = {
"auth_err": "class:error",
"auth_ok": "class:process-log",
"auth_required": "class:warning",
"client": "class:log",
"client|err": "class:error",
"client|warn": "class:warning",
"download_log_begin": "class:system",
"download_log_end": "class:system",
"err": "class:process-error",
"log": "class:process-log",
"ok": "class:process-log",
"sys": "class:system",
"sys:err": "class:error",
"unknown": "class:unknown",
}
def lex_document(self, document):
def get_line(lineno):
line = document.lines[lineno]
match = self.tag_pattern.match(line)
tag = "unknown"
if match:
tag = (match.group(1) + (match.group(2) or "")).lower()
style = self.line_style.get(tag, self.line_style["unknown"])
return [(style, line)]
return get_line
# Socket
self.sock = None
# Args
self.args = None
# event
self.kb = KeyBindings()
# Areas
# self.log_lines = []
# self.log_control = FormattedTextControl(
# text=lambda: ANSI("".join(self.log_lines))
# )
self.log_area = TextArea(
style="class:log",
wrap_lines=True,
lexer=LogLexer(),
)
self.separator_area = TextArea(text="=" * 10 + " Enter Command Here " + "=" * 10, height=1,
style="class:separator-area")
self.message_area = TextArea(height=1, multiline=False, style="class:message-area")
self.input_area = TextArea(height=1, prompt="> ", style="class:input", multiline=False)
self.layout = Layout(HSplit([
self.log_area,
self.separator_area,
self.message_area,
self.input_area,
]))
# Thread
self.sock_lock = threading.Lock()
self.closing_event = threading.Event()
self.client_thread = threading.Thread(target=self.client, daemon=True)
self.connect_event = threading.Event()
self.disconnect_event = threading.Event()
self.auth_required = False
# Queue
self.incoming = queue.Queue()
# Command History
self.cmds = []
self.current_index = None
self.history_draft = ""
@self.kb.add("c-c")
def closing_kb(event):
self.shutdown("Ctrl-C (Stopped by user)")
@self.kb.add("up", filter=has_focus(self.input_area))
def get_old_cmd(event):
# check if there's no old command available in the command history list
if not self.cmds:
return
# Save the current input so Down can restore it after browsing history.
if self.current_index is None:
self.history_draft = self.get_input_area_output()
self.current_index = 0
elif self.current_index < len(self.cmds) - 1:
self.current_index += 1
old_cmd = self.cmds[self.current_index]
self.set_input_area_text(old_cmd)
@self.kb.add("down", filter=has_focus(self.input_area))
def get_new_cmd(event):
# Check if there's no old command available in the command history list
if len(self.cmds) < 2:
return
# Get new command only working when current_index greater then 0
if self.current_index is None:
return
if self.current_index == 0:
self.current_index = None
self.set_input_area_text(self.history_draft)
return
self.current_index -= 1
new_cmd = self.cmds[self.current_index]
self.set_input_area_text(new_cmd)
@self.kb.add("enter", filter=has_focus(self.input_area))
def enter_kb(event):
cmd = self.input_area.text
self.input_area.text = ""
self.current_index = None
self.history_draft = ""
if cmd == "_exit":
self.shutdown("_exit command detected")
return
if self.auth_required:
if cmd == "_d":
self.command_parser(cmd)
return
with self.sock_lock:
s = self.sock
if s:
try:
s.sendall(("__auth " + cmd + "\n").encode("utf-8"))
self.display_message("Password sent, waiting for server...")
except OSError as e:
self._err(f"Send failed: {e}\n")
else:
self._err("The remote server is not connected yet.")
return
# Save command
self.insert_new_cmd_to_history(cmd)
# if re.match(r"^_[A-Za-z0-9]+(?:$|_.*)", cmd):
exit_flag = self.command_parser(cmd)
if exit_flag:
return
with self.sock_lock:
s = self.sock
if s:
try:
s.sendall((cmd + "\n").encode("utf-8"))
except OSError as e:
self._err(f"Send failed: {e}\n")
else:
self._err("The remote server is not connected yet.")
@self.kb.add("c-w", filter=has_focus(self.input_area))
def focus_log_area(event):
self.layout.focus(self.log_area)
self.display_message("Now focus at log area.")
@self.kb.add("c-w", filter=has_focus(self.log_area))
def focus_log_area(event):
self.layout.focus(self.input_area)
self.display_message("Now focus at input area.")
self.key_bindings = self.kb
self.full_screen = True
# Host and Port
self.host = None
self.port = None
def command_parser(self, command):
def connect_to_server(host, port):
# update target
self.host = host
self.port = port
# trigger connection
self.disconnect_event.clear()
self.connect_event.set()
return True
def disconnect_from_server(cmd):
self._log("Disconnecting...")
self.disconnect_event.set()
self.connect_event.clear()
with self.sock_lock:
s = self.sock
if s:
try:
s.shutdown(socket.SHUT_RDWR)
except Exception as e:
self._err("An error occurred while shutting down sock: " + str(e))
try:
s.close()
except Exception as e:
self._err("An error occurred while closing the socket: {}".format(e))
elif self.connect_event.is_set():
self.host = None
self.port = None
self._log("Auto-reconnect stopped.")
else:
self._err("The remote server is not connected yet.")
return True
def connect_to_server_parser(cmd):
target = cmd[3:].strip()
try:
host, port_str = target.split(":", 1)
host = host.strip()
port = int(port_str.strip())
if not host:
raise ValueError("empty host")
except Exception as _:
self._err("Usage: _c host:port")
return True
connect_to_server(host, port)
return True
def _shutdown(cmd):
self.shutdown("_exit command detected")
return True
def _top(cmd):
self.log_area.buffer.cursor_position = 0
self.invalidate()
return True
def _bottom(cmd):
self.log_area.buffer.cursor_position = len(self.log_area.buffer.text)
self.invalidate()
return True
def _version(cmd):
self._log("ServerJar Client Version {}".format(version))
return True
def _help(cmd):
for key, value in cmd_map.items():
self._log(f"{key}: {value.get('description')}")
return True
def _clear(cmd):
self.log_area.text = ""
self._log("Log cleared")
return True
def _clear_history(cmd):
self.cmds = []
self.current_index = None
self.history_draft = ""
self._log("History cleared")
return True
cmd_map = {
"_exit": {
"func": _shutdown,
"description": "Exit the shell",
},
"_c": {
"func": connect_to_server_parser,
"description": "Connect to the remote server (Usage: _c host:port)",
},
"_d": {
"func": disconnect_from_server,
"description": "Disconnect from the remote server",
},
"_top": {
"func": _top,
"description": "Go to the top of the log area",
},
"_bottom": {
"func": _bottom,
"description": "Go to the bottom of the log area",
},
"_version": {
"func": _version,
"description": "Display the version of the client",
},
"_clear_history": {
"func": _clear_history,
"description": "Clear the command history",
},
"_clear": {
"func": _clear,
"description": "Clear the log area",
},
"_help": {
"func": _help,
"description": "Display the help message",
},
}
if not command.strip():
return True
header = command.split()[0]
for cmd in cmd_map.keys():
if cmd == header:
func = cmd_map.get(cmd).get("func")
return_flag = func(command)
return return_flag
# self._err("Unknown command '%s'" % command)
return False
def display_message(self, message):
self.message_area.text = message
def get_input_area_output(self):
return self.input_area.text
def set_input_area_text(self, text):
self.input_area.text = text
self.input_area.buffer.cursor_position = len(text)
def insert_new_cmd_to_history(self, cmd):
if not cmd:
return
if self.cmds and self.cmds[0] == cmd:
return
self.cmds.insert(0, cmd)
class ServerInfoInvalidException(Exception):
def __init__(self, message, **kwargs):
super().__init__()
self.msg = message
def __str__(self):
return self.msg
def arguments_parser(self):
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", type=int, help="Port number", required=False)
parser.add_argument('-host', '--host', type=str, help="Hostname", required=False)
parser.add_argument('-no-tls', '--no-tls', help="Enable TLS support", action='store_true', default=False,
required=False)
parser.add_argument('-r', '--retry', help="Retry when disconnect", action='store_true', default=False,
required=False)
parser.add_argument('--add-cert', help="Add server certificate", type=str, default=None, required=False)
args = parser.parse_args()
return args
def get_tls_context(self):
return create_client_tls_context(log=self._log, warn=self._warn)
def shutdown(self, reason=""):
if self.closing_event.is_set():
return
self.closing_event.set()
self._log(f"Shutting down for reason: {reason}")
with self.sock_lock:
s = self.sock
if s:
try:
s.close()
except Exception as e:
self._err(f"Unable to close socket: {e}")
pass
# Save history
save_history(self.cmds, self._log, self._warn)
# Exit ui event loop
self.full_exit()
# @staticmethod
# def clear_screen():
# os.system("cls" if os.name == "nt" else "clear")
@staticmethod
def _ensure_log_tag(message):
message = f"{message}"
if re.match(r"^\[[A-Za-z0-9_. -]+(?:[:|][A-Za-z0-9_. -]+)?\]", message):
return message
return f"[unknown] {message}"
def log(self, message):
self.incoming.put(self._ensure_log_tag(message))
def _log(self, message):
# Nothing change
self.incoming.put(f"[client] {message}")
# self.display_message(f"{message}")
def _err(self, message):
# WIP... (display text as red color if the log is an error message)
self.incoming.put(f"[client|err] {message}")
# self.display_message(f"ERROR: {message}")
def _warn(self, message):
# WIP... (Display text as yellow color if the log is a warning message)
self.incoming.put(f"[client|warn] {message}")
# self.display_message(f"WARNING: {message}")
def full_exit(self):
self.exit()
sys.exit()
async def consume_incoming(self):
loop = asyncio.get_running_loop()
while True:
msg = await loop.run_in_executor(None, self.incoming.get)
# self.log_lines.append(msg)
#
# if len(self.log_lines) > 2000:
# self.log_lines = self.log_lines[-2000:]
if len(self.log_area.text) > 0:
self.log_area.text += "\n" + msg
else:
self.log_area.text += msg
if len(self.log_area.text) > 300_000:
self.log_area.text = "New log start here.\n" + self.log_area.text[-250_000:]
self.log_area.buffer.cursor_position = len(self.log_area.buffer.text)
self.invalidate()
def client(self):
ptk_clear()
while not self.closing_event.is_set():
if self.closing_event.is_set():
break
if not self.connect_event.is_set() and (self.args.port is None or self.args.host is None):
self._log("Type _c host:port to connect. (_d to disconnect)")
self.connect_event.wait()
elif self.connect_event.is_set():
self._log(f"Reconnecting...")
else:
self._log(f"Connecting to remote server from {self.args.host}:{self.args.port} (Value from sys.argv)...")
self.host, self.port = self.args.host, self.args.port
if not self.host or not self.port:
self._err("No host/port set. Usage: _c host:port")
self.connect_event.clear()
continue
try:
self._log(f"Connecting to {self.host}:{self.port} ...")
# Create connect
if self.args.no_tls:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.port))
else:
raw = socket.create_connection((self.host, self.port))
context = self.get_tls_context()
s = context.wrap_socket(raw, server_hostname=self.host)
with self.sock_lock:
self.sock = s
self.auth_required = False
self._log("Remote socket server connected [HOST: {}, PORT: {}]".format(self.host, self.port))
buffer = ""
downloading_log = False
download_path = None
download_file = None
while True:
# Receive remote server broadcast message and display it on log area
data = s.recv(4096)
if not data:
raise ConnectionError("Server closed")
buffer += data.decode("utf-8", errors="replace")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("[AUTH_REQUIRED]"):
self.auth_required = True
self.display_message("Password required. Type the server password to continue.")
self._warn("Server requires a password before continuing.")
if self.args.no_tls:
self._warn("TLS is disabled. The password will be sent without encryption.")
continue
if line.startswith("[AUTH_OK]"):
self.auth_required = False
self.display_message("Authenticated.")
self._log("Server password accepted.")
continue
if line.startswith("[AUTH_ERR]"):
self.auth_required = True
self.display_message("Invalid password. Try again, or use _d to disconnect.")
self._err(line)
continue
if line.startswith("[DOWNLOAD_LOG_BEGIN]"):
if download_file:
download_file.close()
file_name = line[len("[DOWNLOAD_LOG_BEGIN]"):].strip()
file_name = Path(file_name).name or "serverjar.log"
CLIENT_LOG_DIR.mkdir(parents=True, exist_ok=True)
download_path = CLIENT_LOG_DIR / file_name
download_file = download_path.open("wb")
downloading_log = True
self._log(f"Downloading log to {download_path}")
continue
if line == "[DOWNLOAD_LOG_END]":
if download_file:
download_file.close()
download_file = None
downloading_log = False
self._log(f"Log downloaded: {download_path}")
download_path = None
continue
if downloading_log:
if line.startswith("["):
if line.startswith("[SYS:ERR]"):
if download_file:
download_file.close()
download_file = None
downloading_log = False
download_path = None
self.log(line)
continue
try:
if download_file:
download_file.write(base64.b64decode(line.encode("ascii")))
except (binascii.Error, OSError) as e:
self._err(f"Unable to write downloaded log: {e}")
if download_file:
download_file.close()
download_file = None
downloading_log = False
continue
# ### Use normal log method ###
self.log(line)
except (ConnectionError, OSError) as e:
if not self.closing_event.is_set():
if self.args.retry:
self._warn(f"Disconnected: {e}, retrying...")
else:
self._err(f"Disconnected: {e}")
time.sleep(1)
except KeyboardInterrupt:
self._log("Exiting...")
break
except Exception as e:
self._err(f"Unhandled exception: {e}")
self._err(f"{traceback.format_exc()}")
finally:
if "download_file" in locals() and download_file:
download_file.close()
with self.sock_lock:
try:
if self.sock:
self._log("Closing remote connection (From {}:{})...".format(self.host, self.port))
self.sock.close()
except Exception as e:
self._err(f"Unable to close socket: {e}")
pass
self.sock = None
self.auth_required = False
# reset flags
self.disconnect_event.clear()
if not self.args.retry:
self.connect_event.clear()
def startup(self):
self.args = self.arguments_parser()
self.layout.focus(self.input_area)
self.cmds = get_history(self._log, self._warn)
asyncio.create_task(self.consume_incoming())
self.client_thread.start()
if __name__ == "__main__":
if run_cli_action():
sys.exit(0)
app = ServerJarClient()
app.run(pre_run=app.startup)
+1195
View File
File diff suppressed because it is too large Load Diff
+2
View File
@@ -0,0 +1,2 @@
prompt-toolkit==3.0.52
cryptography==48.0.0
+230
View File
@@ -0,0 +1,230 @@
from pathlib import Path
import requests
import click
import os
PAPER_VERSION_API = "https://api.papermc.io/v2/projects/paper/versions/{}"
PAPER_SERVER_JAR_API = "https://api.papermc.io/v2/projects/paper/versions/{}/builds/{}/downloads/paper-{}-{}.jar"
MOJANG_VERSION_MANIFEST_V2 = "https://piston-meta.mojang.com/mc/game/version_manifest_v2.json"
def jar_filename(filename: str | None, default: str) -> str:
if filename:
name = filename
if not name.endswith(".jar"):
name += ".jar"
return name
return default
def download_file(url: str, destination: Path, chunk_size: int = 1024 * 512):
destination.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, stream=True, timeout=30) as r:
if r.status_code != 200:
raise Exception(f"Download failed: {r.status_code}\nResponse: {r.text}")
total = int(r.headers.get("content-length", 0))
with destination.open(mode="wb", buffering=chunk_size) as f:
if total > 0:
with click.progressbar(length=total, label=f"Downloading {os.path.basename(destination)}") as bar:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
bar.update(len(chunk))
else:
click.echo(f"Downloading {os.path.basename(destination)} (unknown size)")
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
def get_specific_version_paper_builds(minecraft_version: str) -> list[dict[str, str]]:
"""
Get specific version of Paper builds
:param minecraft_version:
:return:
"""
url = PAPER_VERSION_API.format(minecraft_version)
try:
r = requests.get(url)
if r.status_code == 200:
return r.json().get("builds", [])
else:
raise Exception("Unable to fetch build version for {}\n"
"Response: {}".format(minecraft_version, r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get paper version from server.\n"
"URL: {}\n"
"Error: {}".format(url, e))
def get_version_list(release=True):
try:
r = requests.get(MOJANG_VERSION_MANIFEST_V2)
if r.status_code == 200:
if release:
return [version.get('id') for version in r.json().get("versions", [])
if version.get("type") == "release" if version.get('id') is not None]
return r.json()["versions"]
else:
raise Exception("Unable to fetch version list.\n"
"Response: {}".format(r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get version list from server.\n"
"URL: {}\n"
"Error: {}".format(MOJANG_VERSION_MANIFEST_V2, e))
def get_version_manifest():
try:
r = requests.get(MOJANG_VERSION_MANIFEST_V2)
if r.status_code == 200:
return r.json()
raise Exception("Unable to fetch version manifest.\n"
"Response: {}".format(r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get version manifest from server.\n"
"URL: {}\n"
"Error: {}".format(MOJANG_VERSION_MANIFEST_V2, e))
def get_minecraft_version_metadata(minecraft_version: str):
manifest = get_version_manifest()
version_info = next(
(version for version in manifest.get("versions", []) if version.get("id") == minecraft_version),
None,
)
if version_info is None:
raise Exception(f"Minecraft version {minecraft_version} was not found in Mojang version manifest.")
try:
r = requests.get(version_info["url"])
if r.status_code == 200:
return r.json()
raise Exception("Unable to fetch Minecraft version metadata for {}.\n"
"Response: {}".format(minecraft_version, r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get Minecraft version metadata from server.\n"
"URL: {}\n"
"Error: {}".format(version_info.get("url"), e))
def get_latest_version_minecraft(release=True):
version_list = get_version_list(release=release)
if not version_list:
ver = None
elif release:
ver = version_list[0]
else:
ver = version_list[0].get("id")
if ver is None:
raise Exception("Unable to find latest version in version list.\n")
return ver
def download_server_jar(minecraft_version: str, build_version: str, destination: Path, filename: str | None = None):
"""
Download server jar (paper server only)
"""
url = PAPER_SERVER_JAR_API.format(minecraft_version, build_version, minecraft_version, build_version)
jar_name = jar_filename(filename, os.path.basename(url))
destination.parent.mkdir(parents=True, exist_ok=True)
destination = Path(destination, jar_name)
try:
download_file(url, destination)
return destination
except Exception as e:
raise Exception("Unable to download server jar for version {}\nURL: {}\nError: {}".format(minecraft_version, url, e))
def get_latest_build_of_version(minecraft_version: str) -> str:
builds = get_specific_version_paper_builds(minecraft_version)
if not builds:
raise Exception(f"No builds found for Paper {minecraft_version}")
# Paper API usually lists builds ascending; latest is the last one
return str(builds[-1])
def download_latest_build_paper_jar(minecraft_version: str, destination_dir: Path, filename: str | None = None):
build = get_latest_build_of_version(minecraft_version)
return download_server_jar(minecraft_version, build, destination_dir, filename=filename)
def version_exist_from_paper(minecraft_version: str) -> bool:
try:
get_specific_version_paper_builds(minecraft_version)
return True
except Exception:
return False
def get_latest_paper_version(release) -> str:
vers = get_version_list(release=release)
index = 0
latest_paper_support_ver = None
while latest_paper_support_ver is None:
if len(vers) < index+1:
raise Exception("No supported Minecraft version available for Paper support.")
if version_exist_from_paper(minecraft_version=vers[index]):
latest_paper_support_ver = vers[index]
break
index+=1
return latest_paper_support_ver
def download_latest_paper_jar(destination_dir: Path, filename: str | None = None, release: bool = True):
"""
Download latest Minecraft version (release) paper jar
"""
vers = get_version_list(release=release)
if len(vers) == 0:
raise Exception("No versions available for Minecraft (Did the server return wrong response ?)")
latest_mc = vers[0]
return download_latest_build_paper_jar(latest_mc, destination_dir, filename=filename)
def download_vanilla_server_jar(minecraft_version: str, destination: Path, filename: str | None = None):
"""
Download a vanilla Minecraft server jar from Mojang's version manifest.
"""
metadata = get_minecraft_version_metadata(minecraft_version)
server_download = metadata.get("downloads", {}).get("server")
if not server_download or not server_download.get("url"):
raise Exception(f"Minecraft version {minecraft_version} does not provide a vanilla server jar.")
url = server_download["url"]
jar_name = jar_filename(filename, f"minecraft_server.{minecraft_version}.jar")
destination.parent.mkdir(parents=True, exist_ok=True)
destination = Path(destination, jar_name)
try:
download_file(url, destination)
return destination
except Exception as e:
raise Exception("Unable to download vanilla server jar for version {}\nURL: {}\nError: {}".format(
minecraft_version,
url,
e,
))
+366
View File
@@ -0,0 +1,366 @@
"""
FileSettings
Original author: Kitee Contributors
"""
import contextlib
import copy
import json
from pathlib import Path
def validation_rule(
default=None,
*,
children=None,
write_back_if_not_exist=False,
recover_missing_items=False,
use_same_form=False
):
rule = {
"writeBackIfNotExist": write_back_if_not_exist,
"recoverMissingItems": recover_missing_items,
"useSameForm": use_same_form,
}
if children is not None:
rule["children"] = children
else:
rule["default"] = default
return rule
def required_value(default, *, recover_missing_items=False):
return validation_rule(
default,
write_back_if_not_exist=True,
recover_missing_items=recover_missing_items,
)
def required_section(children):
return validation_rule(children=children, write_back_if_not_exist=True)
def required_list(children, use_same_form=False):
return validation_rule(children=children,
write_back_if_not_exist=True,
use_same_form=use_same_form)
class FileSettings:
"""
Simple Settings Object
IMPORTANT: Settings always use self.data as the main operate data, NOT FROM disk settings file!!!
Usage:
FileSettings.create() -> Create settings file (path is self.path)
FileSettings.load() - > Load settings file
FileSettings.save() -> Save settings file
FileSettings.edit() -> Edit settings file (Auto save when with block completes)
Example:
with FileSettings.edit() as settings:
settings["hello"] = "world"
"""
def __init__(self, path, default, validation_rules=None,
dumps_func=json.dumps, load_func=json.load,
settings_change_callback=None, loader=None):
self.data = copy.deepcopy(default)
self.default = copy.deepcopy(default)
self.path: Path = Path(path)
self.validation_rules = validation_rules
self.dumps_func = dumps_func
self.load_func = load_func
self.loader = loader
self.settings_change_callback = settings_change_callback
def reset(self):
"""Replace self.data with self.default and save"""
self.data = copy.deepcopy(self.default)
self.save()
if callable(self.settings_change_callback):
self.settings_change_callback(self.data)
def __repr__(self):
return f"<FileSettings At {self.path.as_posix()}>"
def create(self, exist_ok=False):
"""
Create settings file (path is self.path, data use default value)
:param exist_ok: If not True, raise exception if file already exists
:return:
"""
self.path.parent.mkdir(parents=True, exist_ok=True)
if not exist_ok and self.path.exists():
raise FileExistsError(f'{self.path} already exists')
self.path.write_text(self._dumps(self.default), encoding="utf-8")
def read_from_exist(self):
self.data = self.load()
def mload(self):
"""Get data from memory"""
return copy.deepcopy(self.data)
def load(self):
"""
Load settings file data into self.data (path is self.path)
:return:
"""
if not self.path.exists():
raise FileNotFoundError(f'{self.path} does not exist')
with self.path.open("r", encoding="utf-8") as settings_file:
if self.loader:
data = self.load_func(settings_file, Loader=self.loader)
else:
data = self.load_func(settings_file)
if isinstance(self.validation_rules, dict):
data = self.validate_data(data, self.default, self.validation_rules)
self.data = copy.deepcopy(data)
return data
def save(self):
"""
Save self.data (data in memory) into settings file
:return:
"""
if not self.path.exists():
raise FileNotFoundError(f'{self.path} does not exist. Create it before saving.')
self.path.write_text(self._dumps(self.data), encoding="utf-8")
def get(self, key, default=None):
return self.data.get(key, default)
def dget(self, key, default=None): # dget -> get_default
return self.default.get(key, default)
def exists(self):
return self.path.exists()
@contextlib.contextmanager
def edit(self):
"""
Edit settings (With auto save)
:return:
"""
if not self.exists():
self.create()
yield self
self.save()
if callable(self.settings_change_callback):
self.settings_change_callback(self)
def validate_data(self, data, default, rules):
"""
Validates data by validating rules.
:param data: dict data
:param default: default sample
:param rules: rules of all keys
:return:
"""
if not isinstance(data, dict):
data = {}
if not isinstance(default, dict):
default = {}
if not isinstance(rules, dict):
return copy.deepcopy(data)
validated = copy.deepcopy(data)
for key, rule in rules.items():
rule_default, options = self._parse_validation_rule(rule)
default_value = copy.deepcopy(default.get(key, rule_default))
if key not in validated:
if options.get("writeBackIfNotExist"):
validated[key] = self._validate_value(
default_value,
default_value,
rule_default,
options,
)
continue
validated[key] = self._validate_value(
validated[key],
default_value,
rule_default,
options,
)
return validated
def update(self, settings):
"""Update self.data with new settings values."""
if not isinstance(settings, dict):
raise TypeError("settings must be a dict")
def update_inner(new, old):
if not isinstance(old, dict):
return
for n_k, n_v in new.items():
if isinstance(n_v, dict) and (n_k in old and isinstance(old[n_k], dict)):
update_inner(n_v, old[n_k])
else:
old[n_k] = copy.deepcopy(n_v)
update_inner(settings, self.data)
if callable(self.settings_change_callback):
self.settings_change_callback(self)
def update_new_settings(self, new_default_settings):
"""Add missing settings to self.default and self.data."""
if not isinstance(new_default_settings, dict):
raise TypeError("new_default_settings must be a dict")
def add_missing(new, old):
if not isinstance(old, dict):
return
for n_k, n_v in new.items():
if isinstance(n_v, dict) and (n_k in old and isinstance(old[n_k], dict)):
add_missing(n_v, old[n_k])
if n_k not in old:
old[n_k] = copy.deepcopy(n_v)
add_missing(new_default_settings, self.default)
add_missing(new_default_settings, self.data)
if callable(self.settings_change_callback):
self.settings_change_callback(self)
@staticmethod
def _parse_validation_rule(rule):
if isinstance(rule, dict) and ("default" in rule or "children" in rule):
options = {
"writeBackIfNotExist": rule.get("writeBackIfNotExist", False),
"recoverMissingItems": rule.get("recoverMissingItems", False), # Recover missing item (only for list)
"useSameForm": rule.get("useSameForm", False), # For list that contains dict item (all keys are same)
}
if "children" in rule:
return rule["children"], options
return rule.get("default"), options
if (
isinstance(rule, (list, tuple))
and len(rule) == 2
and isinstance(rule[1], dict)
):
return rule[0], rule[1]
return rule, {}
def _validate_dict_form(self, sample, target):
for k, v in sample.items():
if target.get(k, None) is None:
target[k] = copy.deepcopy(v)
elif isinstance(v, dict) and isinstance(target.get(k, None), dict):
target[k] = self._validate_dict_form(v, target.get(k, {}))
elif type(target.get(k, None)) is not type(v):
candidate = target[k].get("default") if isinstance(target[k], dict) else None
if type(candidate) is type(v):
target[k] = copy.deepcopy(candidate)
else:
target[k] = copy.deepcopy(v)
return target
def _validate_value(self, value, default_value, rule_default, options):
if options.get("useSameForm"):
if not isinstance(value, list):
if isinstance(default_value, list):
return copy.deepcopy(default_value)
return []
validated = copy.deepcopy(value)
if options.get("recoverMissingItems") and isinstance(default_value, list):
for item in default_value:
if item not in validated:
validated.append(copy.deepcopy(item))
form = rule_default
if isinstance(form, list):
form = next((item for item in form if isinstance(item, dict)), None)
if not isinstance(form, dict):
return validated
for index, item in enumerate(validated):
if not isinstance(item, dict):
continue
validated[index] = self._validate_dict_form(copy.deepcopy(form), item)
return validated
if isinstance(rule_default, dict):
if not isinstance(value, dict):
value = {}
if not isinstance(default_value, dict):
default_value = {}
return self.validate_data(value, default_value, rule_default)
if isinstance(default_value, list):
if not isinstance(value, list):
return copy.deepcopy(default_value)
validated = copy.deepcopy(value)
if options.get("recoverMissingItems"):
for item in default_value:
if item not in validated:
validated.append(copy.deepcopy(item))
return validated
if default_value is None:
return value
if type(value) is not type(default_value):
return copy.deepcopy(default_value)
return value
def _dumps(self, data):
if not callable(self.dumps_func):
raise TypeError(f"dumps_func must be callable")
return self.dumps_func(data)
def __getitem__(self, key):
return self.data[key]
def __setitem__(self, key, value):
self.data[key] = value
if callable(self.settings_change_callback):
self.settings_change_callback(self)
def __eq__(self, other):
if isinstance(other, FileSettings):
return self.path == other.path and self.data == other.data
if isinstance(other, dict):
return self.data == other
return NotImplemented