Compare commits

...

4 Commits

Author SHA1 Message Date
wei 8a3b1b1638 Add .gitignore. 2026-05-17 21:22:25 +08:00
wei 07276f6dd7 Some changes. 2026-05-17 21:14:40 +08:00
wei 2d751f6f19 Some changes. 2026-05-17 19:16:00 +08:00
wei d1663c96e9 Update files. 2026-05-17 17:16:58 +08:00
6 changed files with 2469 additions and 1 deletions
+3
View File
@@ -0,0 +1,3 @@
/servers/
/data/
/config/
+1 -1
View File
@@ -1,3 +1,3 @@
# ServerJar
Create/Manage multiple Mineraft server all in one tool.
Create/Manage multiple Minecraft server all in one tool.
+720
View File
@@ -0,0 +1,720 @@
import argparse
import asyncio
import base64
import binascii
import queue
import shutil
import ssl
import sys
import threading
import socket
import time
import traceback
from pathlib import Path
from prompt_toolkit import Application
from prompt_toolkit.layout import Layout, HSplit
from prompt_toolkit.widgets import TextArea
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.styles import Style
from prompt_toolkit.filters import has_focus
from prompt_toolkit.shortcuts import clear as ptk_clear
version = "Beta-1"
SERVER_JAR_DIR = Path.home() / ".serverjar"
CLIENT_CERT_DIR = Path.home() / ".serverjar" / "client" / "cert"
CLIENT_CERT_SUFFIXES = {".pem", ".crt", ".cer"}
CLIENT_HISTORY_FILE = Path.home() / ".serverjar" / "history" / "history.txt"
CLIENT_LOG_DIR = Path.home() / ".serverjar" / "client" / "logs"
def add_client_cert(cert_path):
source = Path(cert_path).expanduser()
if not source.exists():
raise FileNotFoundError(f"{source} does not exist")
if not source.is_file():
raise IsADirectoryError(f"{source} is not a file")
if source.suffix.lower() not in CLIENT_CERT_SUFFIXES:
allowed = ", ".join(sorted(CLIENT_CERT_SUFFIXES))
raise ValueError(f"Unsupported certificate suffix '{source.suffix}'. Allowed: {allowed}")
CLIENT_CERT_DIR.mkdir(parents=True, exist_ok=True)
target = CLIENT_CERT_DIR / source.name
if source.resolve() == target.resolve():
return target
shutil.copy2(source, target)
return target
def run_cli_action(argv=None):
argv = list(sys.argv[1:] if argv is None else argv)
if not argv or argv[0] != "--add-cert":
return False
parser = argparse.ArgumentParser(prog=f"{Path(sys.argv[0]).name} --add-cert")
parser.add_argument("cert_path", help="Path to a PEM/CRT/CER certificate file")
args = parser.parse_args(argv[1:])
try:
target = add_client_cert(args.cert_path)
except Exception as e:
print(f"Unable to add certificate: {e}", file=sys.stderr)
sys.exit(1)
print(f"Certificate added: {target}")
return True
def create_client_tls_context(log=None, warn=None):
CLIENT_CERT_DIR.mkdir(parents=True, exist_ok=True)
context = ssl.create_default_context()
loaded_certs = []
for cert_path in sorted(CLIENT_CERT_DIR.iterdir()):
if not cert_path.is_file() or cert_path.suffix.lower() not in CLIENT_CERT_SUFFIXES:
continue
try:
context.load_verify_locations(cafile=cert_path)
except ssl.SSLError as e:
if callable(warn):
warn(f"Unable to load TLS certificate {cert_path}: {e}")
continue
loaded_certs.append(cert_path.name)
if callable(log):
if loaded_certs:
log("Loaded TLS certificate(s): {}".format(", ".join(loaded_certs)))
else:
log(f"No custom TLS certificates found in {CLIENT_CERT_DIR}")
return context
def get_history(log=None, warn=None):
if not CLIENT_HISTORY_FILE.exists():
return []
CLIENT_HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
if callable(log):
log("Restoring command history...")
try:
with CLIENT_HISTORY_FILE.open("r", encoding="utf-8") as f:
return [line for line in f.read().splitlines() if line]
except Exception as e:
if callable(warn):
warn(f"Unable to restore command history: {e}")
return []
def save_history(history, log=None, warn=None):
CLIENT_HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
if callable(log):
log("Saving command history...")
try:
with CLIENT_HISTORY_FILE.open("w", encoding="utf-8") as f:
f.write("\n".join(history))
except Exception as e:
if callable(warn):
warn(f"Unable to save command history: {e}")
class ServerJarClient(Application):
def __init__(self, **kwargs):
super().__init__(**kwargs, mouse_support=True)
# Text style
self.style = Style.from_dict({
"log": "bg:#000000 #ffffff",
"input": "bg:#222222 #ffffff",
"separator-area": "bg:#000000 #ffffff",
"message-area": "bg:#111111 #ffffff"
})
# Socket
self.sock = None
# Args
self.args = None
# event
self.kb = KeyBindings()
# Areas
# self.log_lines = []
# self.log_control = FormattedTextControl(
# text=lambda: ANSI("".join(self.log_lines))
# )
self.log_area = TextArea(
style="class:log",
wrap_lines=True,
)
self.separator_area = TextArea(text="=" * 10 + " Enter Command Here " + "=" * 10, height=1,
style="class:separator-area")
self.message_area = TextArea(height=1, multiline=False, style="class:message-area")
self.input_area = TextArea(height=1, prompt="> ", style="class:input", multiline=False)
self.layout = Layout(HSplit([
self.log_area,
self.separator_area,
self.message_area,
self.input_area,
]))
# Thread
self.sock_lock = threading.Lock()
self.closing_event = threading.Event()
self.client_thread = threading.Thread(target=self.client, daemon=True)
self.connect_event = threading.Event()
self.disconnect_event = threading.Event()
self.auth_required = False
# Queue
self.incoming = queue.Queue()
# Command History
self.cmds = []
self.current_index = None
self.history_draft = ""
@self.kb.add("c-c")
def closing_kb(event):
self.shutdown("Ctrl-C (Stopped by user)")
@self.kb.add("up", filter=has_focus(self.input_area))
def get_old_cmd(event):
# check if there's no old command available in the command history list
if not self.cmds:
return
# Save the current input so Down can restore it after browsing history.
if self.current_index is None:
self.history_draft = self.get_input_area_output()
self.current_index = 0
elif self.current_index < len(self.cmds) - 1:
self.current_index += 1
old_cmd = self.cmds[self.current_index]
self.set_input_area_text(old_cmd)
@self.kb.add("down", filter=has_focus(self.input_area))
def get_new_cmd(event):
# Check if there's no old command available in the command history list
if len(self.cmds) < 2:
return
# Get new command only working when current_index greater then 0
if self.current_index is None:
return
if self.current_index == 0:
self.current_index = None
self.set_input_area_text(self.history_draft)
return
self.current_index -= 1
new_cmd = self.cmds[self.current_index]
self.set_input_area_text(new_cmd)
@self.kb.add("enter", filter=has_focus(self.input_area))
def enter_kb(event):
cmd = self.input_area.text
self.input_area.text = ""
self.current_index = None
self.history_draft = ""
if cmd == "_exit":
self.shutdown("_exit command detected")
return
if self.auth_required:
if cmd == "_d":
self.command_parser(cmd)
return
with self.sock_lock:
s = self.sock
if s:
try:
s.sendall(("__auth " + cmd + "\n").encode("utf-8"))
self.display_message("Password sent, waiting for server...")
except OSError as e:
self._err(f"Send failed: {e}\n")
else:
self._err("The remote server is not connected yet.")
return
# Save command
self.insert_new_cmd_to_history(cmd)
# if re.match(r"^_[A-Za-z0-9]+(?:$|_.*)", cmd):
exit_flag = self.command_parser(cmd)
if exit_flag:
return
with self.sock_lock:
s = self.sock
if s:
try:
s.sendall((cmd + "\n").encode("utf-8"))
except OSError as e:
self._err(f"Send failed: {e}\n")
else:
self._err("The remote server is not connected yet.")
@self.kb.add("c-w", filter=has_focus(self.input_area))
def focus_log_area(event):
self.layout.focus(self.log_area)
self.display_message("Now focus at log area.")
@self.kb.add("c-w", filter=has_focus(self.log_area))
def focus_log_area(event):
self.layout.focus(self.input_area)
self.display_message("Now focus at input area.")
self.key_bindings = self.kb
self.full_screen = True
# Host and Port
self.host = None
self.port = None
def command_parser(self, command):
def connect_to_server(host, port):
# update target
self.host = host
self.port = port
# trigger connection
self.disconnect_event.clear()
self.connect_event.set()
return True
def disconnect_from_server(cmd):
self._log("Disconnecting...")
self.disconnect_event.set()
self.connect_event.clear()
with self.sock_lock:
s = self.sock
if s:
try:
s.shutdown(socket.SHUT_RDWR)
except Exception as e:
self._err("An error occurred while shutting down sock: " + str(e))
try:
s.close()
except Exception as e:
self._err("An error occurred while closing the socket: {}".format(e))
elif self.connect_event.is_set():
self.host = None
self.port = None
self._log("Auto-reconnect stopped.")
else:
self._err("The remote server is not connected yet.")
return True
def connect_to_server_parser(cmd):
target = cmd[3:].strip()
try:
host, port_str = target.split(":", 1)
host = host.strip()
port = int(port_str.strip())
if not host:
raise ValueError("empty host")
except Exception as _:
self._err("Usage: _c host:port")
return True
connect_to_server(host, port)
return True
def _shutdown(cmd):
self.shutdown("_exit command detected")
return True
def _top(cmd):
self.log_area.buffer.cursor_position = 0
self.invalidate()
return True
def _bottom(cmd):
self.log_area.buffer.cursor_position = len(self.log_area.buffer.text)
self.invalidate()
return True
def _version(cmd):
self._log("ServerJar Client Version {}".format(version))
return True
def _help(cmd):
for key, value in cmd_map.items():
self._log(f"{key}: {value.get("description")}")
return True
def _clear(cmd):
self.log_area.text = ""
self._log("Log cleared")
return True
def _clear_history(cmd):
self.cmds = []
self.current_index = None
self.history_draft = ""
self._log("History cleared")
return True
cmd_map = {
"_exit": {
"func": _shutdown,
"description": "Exit the shell",
},
"_c": {
"func": connect_to_server_parser,
"description": "Connect to the remote server (Usage: _c host:port)",
},
"_d": {
"func": disconnect_from_server,
"description": "Disconnect from the remote server",
},
"_top": {
"func": _top,
"description": "Go to the top of the log area",
},
"_bottom": {
"func": _bottom,
"description": "Go to the bottom of the log area",
},
"_version": {
"func": _version,
"description": "Display the version of the client",
},
"_clear_history": {
"func": _clear_history,
"description": "Clear the command history",
},
"_clear": {
"func": _clear,
"description": "Clear the log area",
},
"_help": {
"func": _help,
"description": "Display the help message",
},
}
if not command.strip():
return True
header = command.split()[0]
for cmd in cmd_map.keys():
if cmd == header:
func = cmd_map.get(cmd).get("func")
return_flag = func(command)
return return_flag
# self._err("Unknown command '%s'" % command)
return False
def display_message(self, message):
self.message_area.text = message
def get_input_area_output(self):
return self.input_area.text
def set_input_area_text(self, text):
self.input_area.text = text
self.input_area.buffer.cursor_position = len(text)
def insert_new_cmd_to_history(self, cmd):
if not cmd:
return
if self.cmds and self.cmds[0] == cmd:
return
self.cmds.insert(0, cmd)
class ServerInfoInvalidException(Exception):
def __init__(self, message, **kwargs):
super().__init__()
self.msg = message
def __str__(self):
return self.msg
def arguments_parser(self):
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", type=int, help="Port number", required=False)
parser.add_argument('-host', '--host', type=str, help="Hostname", required=False)
parser.add_argument('-no-tls', '--no-tls', help="Enable TLS support", action='store_true', default=False,
required=False)
parser.add_argument('-r', '--retry', help="Retry when disconnect", action='store_true', default=False,
required=False)
parser.add_argument('--add-cert', help="Add server certificate", type=str, default=None, required=False)
args = parser.parse_args()
return args
def get_tls_context(self):
return create_client_tls_context(log=self._log, warn=self._warn)
def shutdown(self, reason=""):
if self.closing_event.is_set():
return
self.closing_event.set()
self._log(f"Shutting down for reason: {reason}")
with self.sock_lock:
s = self.sock
if s:
try:
s.close()
except Exception as e:
self._err(f"Unable to close socket: {e}")
pass
# Save history
save_history(self.cmds, self._log, self._warn)
# Exit ui event loop
self.full_exit()
# @staticmethod
# def clear_screen():
# os.system("cls" if os.name == "nt" else "clear")
def log(self, message):
self.incoming.put(f"{message}")
def _log(self, message):
# Nothing change
self.incoming.put(f"[client] {message}")
# self.display_message(f"{message}")
def _err(self, message):
# WIP... (display text as red color if the log is an error message)
self.incoming.put(f"[client|err] {message}")
# self.display_message(f"ERROR: {message}")
def _warn(self, message):
# WIP... (Display text as yellow color if the log is a warning message)
self.incoming.put(f"[client|warn] {message}")
# self.display_message(f"WARNING: {message}")
def full_exit(self):
self.exit()
sys.exit()
async def consume_incoming(self):
loop = asyncio.get_running_loop()
while True:
msg = await loop.run_in_executor(None, self.incoming.get)
# self.log_lines.append(msg)
#
# if len(self.log_lines) > 2000:
# self.log_lines = self.log_lines[-2000:]
if len(self.log_area.text) > 0:
self.log_area.text += "\n" + msg
else:
self.log_area.text += msg
if len(self.log_area.text) > 300_000:
self.log_area.text = "New log start here.\n" + self.log_area.text[-250_000:]
self.log_area.buffer.cursor_position = len(self.log_area.buffer.text)
self.invalidate()
def client(self):
ptk_clear()
while not self.closing_event.is_set():
if self.closing_event.is_set():
break
if not self.connect_event.is_set() and (self.args.port is None or self.args.host is None):
self._log("Type _c host:port to connect. (_d to disconnect)")
self.connect_event.wait()
elif self.connect_event.is_set():
self._log(f"Reconnecting...")
else:
self._log(f"Connecting to remote server from {self.args.host}:{self.args.port} (Value from sys.argv)...")
self.host, self.port = self.args.host, self.args.port
if not self.host or not self.port:
self._err("No host/port set. Usage: _c host:port")
self.connect_event.clear()
continue
try:
self._log(f"Connecting to {self.host}:{self.port} ...")
# Create connect
if self.args.no_tls:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.port))
else:
raw = socket.create_connection((self.host, self.port))
context = self.get_tls_context()
s = context.wrap_socket(raw, server_hostname=self.host)
with self.sock_lock:
self.sock = s
self.auth_required = False
self._log("Remote socket server connected [HOST: {}, PORT: {}]".format(self.host, self.port))
buffer = ""
downloading_log = False
download_path = None
download_file = None
while True:
# Receive remote server broadcast message and display it on log area
data = s.recv(4096)
if not data:
raise ConnectionError("Server closed")
buffer += data.decode("utf-8", errors="replace")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("[AUTH_REQUIRED]"):
self.auth_required = True
self.display_message("Password required. Type the server password to continue.")
self._warn("Server requires a password before continuing.")
if self.args.no_tls:
self._warn("TLS is disabled. The password will be sent without encryption.")
continue
if line.startswith("[AUTH_OK]"):
self.auth_required = False
self.display_message("Authenticated.")
self._log("Server password accepted.")
continue
if line.startswith("[AUTH_ERR]"):
self.auth_required = True
self.display_message("Invalid password. Try again, or use _d to disconnect.")
self._err(line)
continue
if line.startswith("[DOWNLOAD_LOG_BEGIN]"):
if download_file:
download_file.close()
file_name = line[len("[DOWNLOAD_LOG_BEGIN]"):].strip()
file_name = Path(file_name).name or "serverjar.log"
CLIENT_LOG_DIR.mkdir(parents=True, exist_ok=True)
download_path = CLIENT_LOG_DIR / file_name
download_file = download_path.open("wb")
downloading_log = True
self._log(f"Downloading log to {download_path}")
continue
if line == "[DOWNLOAD_LOG_END]":
if download_file:
download_file.close()
download_file = None
downloading_log = False
self._log(f"Log downloaded: {download_path}")
download_path = None
continue
if downloading_log:
if line.startswith("["):
if line.startswith("[SYS:ERR]"):
if download_file:
download_file.close()
download_file = None
downloading_log = False
download_path = None
self.log(line)
continue
try:
if download_file:
download_file.write(base64.b64decode(line.encode("ascii")))
except (binascii.Error, OSError) as e:
self._err(f"Unable to write downloaded log: {e}")
if download_file:
download_file.close()
download_file = None
downloading_log = False
continue
# ### Use normal log method ###
self.log(line)
except (ConnectionError, OSError) as e:
if not self.closing_event.is_set():
if self.args.retry:
self._warn(f"Disconnected: {e}, retrying...")
else:
self._err(f"Disconnected: {e}")
time.sleep(1)
except KeyboardInterrupt:
self._log("Exiting...")
break
except Exception as e:
self._err(f"Unhandled exception: {e}")
self._err(f"{traceback.format_exc()}")
finally:
if "download_file" in locals() and download_file:
download_file.close()
with self.sock_lock:
try:
if self.sock:
self._log("Closing remote connection (From {}:{})...".format(self.host, self.port))
self.sock.close()
except Exception as e:
self._err(f"Unable to close socket: {e}")
pass
self.sock = None
self.auth_required = False
# reset flags
self.disconnect_event.clear()
if not self.args.retry:
self.connect_event.clear()
def startup(self):
self.args = self.arguments_parser()
self.layout.focus(self.input_area)
self.cmds = get_history(self._log, self._warn)
asyncio.create_task(self.consume_incoming())
self.client_thread.start()
if __name__ == "__main__":
if run_cli_action():
sys.exit(0)
app = ServerJarClient()
app.run(pre_run=app.startup)
+1149
View File
File diff suppressed because it is too large Load Diff
+230
View File
@@ -0,0 +1,230 @@
from pathlib import Path
import requests
import click
import os
PAPER_VERSION_API = "https://api.papermc.io/v2/projects/paper/versions/{}"
PAPER_SERVER_JAR_API = "https://api.papermc.io/v2/projects/paper/versions/{}/builds/{}/downloads/paper-{}-{}.jar"
MOJANG_VERSION_MANIFEST_V2 = "https://piston-meta.mojang.com/mc/game/version_manifest_v2.json"
def jar_filename(filename: str | None, default: str) -> str:
if filename:
name = filename
if not name.endswith(".jar"):
name += ".jar"
return name
return default
def download_file(url: str, destination: Path, chunk_size: int = 1024 * 512):
destination.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, stream=True, timeout=30) as r:
if r.status_code != 200:
raise Exception(f"Download failed: {r.status_code}\nResponse: {r.text}")
total = int(r.headers.get("content-length", 0))
with destination.open(mode="wb", buffering=chunk_size) as f:
if total > 0:
with click.progressbar(length=total, label=f"Downloading {os.path.basename(destination)}") as bar:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
bar.update(len(chunk))
else:
click.echo(f"Downloading {os.path.basename(destination)} (unknown size)")
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
def get_specific_version_paper_builds(minecraft_version: str) -> list[dict[str, str]]:
"""
Get specific version of Paper builds
:param minecraft_version:
:return:
"""
url = PAPER_VERSION_API.format(minecraft_version)
try:
r = requests.get(url)
if r.status_code == 200:
return r.json().get("builds", [])
else:
raise Exception("Unable to fetch build version for {}\n"
"Response: {}".format(minecraft_version, r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get paper version from server.\n"
"URL: {}\n"
"Error: {}".format(url, e))
def get_version_list(release=True):
try:
r = requests.get(MOJANG_VERSION_MANIFEST_V2)
if r.status_code == 200:
if release:
return [version.get('id') for version in r.json().get("versions", [])
if version.get("type") == "release" if version.get('id') is not None]
return r.json()["versions"]
else:
raise Exception("Unable to fetch version list.\n"
"Response: {}".format(r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get version list from server.\n"
"URL: {}\n"
"Error: {}".format(MOJANG_VERSION_MANIFEST_V2, e))
def get_version_manifest():
try:
r = requests.get(MOJANG_VERSION_MANIFEST_V2)
if r.status_code == 200:
return r.json()
raise Exception("Unable to fetch version manifest.\n"
"Response: {}".format(r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get version manifest from server.\n"
"URL: {}\n"
"Error: {}".format(MOJANG_VERSION_MANIFEST_V2, e))
def get_minecraft_version_metadata(minecraft_version: str):
manifest = get_version_manifest()
version_info = next(
(version for version in manifest.get("versions", []) if version.get("id") == minecraft_version),
None,
)
if version_info is None:
raise Exception(f"Minecraft version {minecraft_version} was not found in Mojang version manifest.")
try:
r = requests.get(version_info["url"])
if r.status_code == 200:
return r.json()
raise Exception("Unable to fetch Minecraft version metadata for {}.\n"
"Response: {}".format(minecraft_version, r.text))
except requests.exceptions.RequestException as e:
raise Exception("Unable to get Minecraft version metadata from server.\n"
"URL: {}\n"
"Error: {}".format(version_info.get("url"), e))
def get_latest_version_minecraft(release=True):
version_list = get_version_list(release=release)
if not version_list:
ver = None
elif release:
ver = version_list[0]
else:
ver = version_list[0].get("id")
if ver is None:
raise Exception("Unable to find latest version in version list.\n")
return ver
def download_server_jar(minecraft_version: str, build_version: str, destination: Path, filename: str | None = None):
"""
Download server jar (paper server only)
"""
url = PAPER_SERVER_JAR_API.format(minecraft_version, build_version, minecraft_version, build_version)
jar_name = jar_filename(filename, os.path.basename(url))
destination.parent.mkdir(parents=True, exist_ok=True)
destination = Path(destination, jar_name)
try:
download_file(url, destination)
return destination
except Exception as e:
raise Exception("Unable to download server jar for version {}\nURL: {}\nError: {}".format(minecraft_version, url, e))
def get_latest_build_of_version(minecraft_version: str) -> str:
builds = get_specific_version_paper_builds(minecraft_version)
if not builds:
raise Exception(f"No builds found for Paper {minecraft_version}")
# Paper API usually lists builds ascending; latest is the last one
return str(builds[-1])
def download_latest_build_paper_jar(minecraft_version: str, destination_dir: Path, filename: str | None = None):
build = get_latest_build_of_version(minecraft_version)
return download_server_jar(minecraft_version, build, destination_dir, filename=filename)
def version_exist_from_paper(minecraft_version: str) -> bool:
try:
get_specific_version_paper_builds(minecraft_version)
return True
except Exception:
return False
def get_latest_paper_version(release) -> str:
vers = get_version_list(release=release)
index = 0
latest_paper_support_ver = None
while latest_paper_support_ver is None:
if len(vers) < index+1:
raise Exception("No supported Minecraft version available for Paper support.")
if version_exist_from_paper(minecraft_version=vers[index]):
latest_paper_support_ver = vers[index]
break
index+=1
return latest_paper_support_ver
def download_latest_paper_jar(destination_dir: Path, filename: str | None = None, release: bool = True):
"""
Download latest Minecraft version (release) paper jar
"""
vers = get_version_list(release=release)
if len(vers) == 0:
raise Exception("No versions available for Minecraft (Did the server return wrong response ?)")
latest_mc = vers[0]
return download_latest_build_paper_jar(latest_mc, destination_dir, filename=filename)
def download_vanilla_server_jar(minecraft_version: str, destination: Path, filename: str | None = None):
"""
Download a vanilla Minecraft server jar from Mojang's version manifest.
"""
metadata = get_minecraft_version_metadata(minecraft_version)
server_download = metadata.get("downloads", {}).get("server")
if not server_download or not server_download.get("url"):
raise Exception(f"Minecraft version {minecraft_version} does not provide a vanilla server jar.")
url = server_download["url"]
jar_name = jar_filename(filename, f"minecraft_server.{minecraft_version}.jar")
destination.parent.mkdir(parents=True, exist_ok=True)
destination = Path(destination, jar_name)
try:
download_file(url, destination)
return destination
except Exception as e:
raise Exception("Unable to download vanilla server jar for version {}\nURL: {}\nError: {}".format(
minecraft_version,
url,
e,
))
+366
View File
@@ -0,0 +1,366 @@
"""
FileSettings
Original author: Kitee Contributors
"""
import contextlib
import copy
import json
from pathlib import Path
def validation_rule(
default=None,
*,
children=None,
write_back_if_not_exist=False,
recover_missing_items=False,
use_same_form=False
):
rule = {
"writeBackIfNotExist": write_back_if_not_exist,
"recoverMissingItems": recover_missing_items,
"useSameForm": use_same_form,
}
if children is not None:
rule["children"] = children
else:
rule["default"] = default
return rule
def required_value(default, *, recover_missing_items=False):
return validation_rule(
default,
write_back_if_not_exist=True,
recover_missing_items=recover_missing_items,
)
def required_section(children):
return validation_rule(children=children, write_back_if_not_exist=True)
def required_list(children, use_same_form=False):
return validation_rule(children=children,
write_back_if_not_exist=True,
use_same_form=use_same_form)
class FileSettings:
"""
Simple Settings Object
IMPORTANT: Settings always use self.data as the main operate data, NOT FROM disk settings file!!!
Usage:
FileSettings.create() -> Create settings file (path is self.path)
FileSettings.load() - > Load settings file
FileSettings.save() -> Save settings file
FileSettings.edit() -> Edit settings file (Auto save when with block completes)
Example:
with FileSettings.edit() as settings:
settings["hello"] = "world"
"""
def __init__(self, path, default, validation_rules=None,
dumps_func=json.dumps, load_func=json.load,
settings_change_callback=None, loader=None):
self.data = copy.deepcopy(default)
self.default = copy.deepcopy(default)
self.path: Path = Path(path)
self.validation_rules = validation_rules
self.dumps_func = dumps_func
self.load_func = load_func
self.loader = loader
self.settings_change_callback = settings_change_callback
def reset(self):
"""Replace self.data with self.default and save"""
self.data = copy.deepcopy(self.default)
self.save()
if callable(self.settings_change_callback):
self.settings_change_callback(self.data)
def __repr__(self):
return f"<FileSettings At {self.path.as_posix()}>"
def create(self, exist_ok=False):
"""
Create settings file (path is self.path, data use default value)
:param exist_ok: If not True, raise exception if file already exists
:return:
"""
self.path.parent.mkdir(parents=True, exist_ok=True)
if not exist_ok and self.path.exists():
raise FileExistsError(f'{self.path} already exists')
self.path.write_text(self._dumps(self.default), encoding="utf-8")
def read_from_exist(self):
self.data = self.load()
def mload(self):
"""Get data from memory"""
return copy.deepcopy(self.data)
def load(self):
"""
Load settings file data into self.data (path is self.path)
:return:
"""
if not self.path.exists():
raise FileNotFoundError(f'{self.path} does not exist')
with self.path.open("r", encoding="utf-8") as settings_file:
if self.loader:
data = self.load_func(settings_file, Loader=self.loader)
else:
data = self.load_func(settings_file)
if isinstance(self.validation_rules, dict):
data = self.validate_data(data, self.default, self.validation_rules)
self.data = copy.deepcopy(data)
return data
def save(self):
"""
Save self.data (data in memory) into settings file
:return:
"""
if not self.path.exists():
raise FileNotFoundError(f'{self.path} does not exist. Create it before saving.')
self.path.write_text(self._dumps(self.data), encoding="utf-8")
def get(self, key, default=None):
return self.data.get(key, default)
def dget(self, key, default=None): # dget -> get_default
return self.default.get(key, default)
def exists(self):
return self.path.exists()
@contextlib.contextmanager
def edit(self):
"""
Edit settings (With auto save)
:return:
"""
if not self.exists():
self.create()
yield self
self.save()
if callable(self.settings_change_callback):
self.settings_change_callback(self)
def validate_data(self, data, default, rules):
"""
Validates data by validating rules.
:param data: dict data
:param default: default sample
:param rules: rules of all keys
:return:
"""
if not isinstance(data, dict):
data = {}
if not isinstance(default, dict):
default = {}
if not isinstance(rules, dict):
return copy.deepcopy(data)
validated = copy.deepcopy(data)
for key, rule in rules.items():
rule_default, options = self._parse_validation_rule(rule)
default_value = copy.deepcopy(default.get(key, rule_default))
if key not in validated:
if options.get("writeBackIfNotExist"):
validated[key] = self._validate_value(
default_value,
default_value,
rule_default,
options,
)
continue
validated[key] = self._validate_value(
validated[key],
default_value,
rule_default,
options,
)
return validated
def update(self, settings):
"""Update self.data with new settings values."""
if not isinstance(settings, dict):
raise TypeError("settings must be a dict")
def update_inner(new, old):
if not isinstance(old, dict):
return
for n_k, n_v in new.items():
if isinstance(n_v, dict) and (n_k in old and isinstance(old[n_k], dict)):
update_inner(n_v, old[n_k])
else:
old[n_k] = copy.deepcopy(n_v)
update_inner(settings, self.data)
if callable(self.settings_change_callback):
self.settings_change_callback(self)
def update_new_settings(self, new_default_settings):
"""Add missing settings to self.default and self.data."""
if not isinstance(new_default_settings, dict):
raise TypeError("new_default_settings must be a dict")
def add_missing(new, old):
if not isinstance(old, dict):
return
for n_k, n_v in new.items():
if isinstance(n_v, dict) and (n_k in old and isinstance(old[n_k], dict)):
add_missing(n_v, old[n_k])
if n_k not in old:
old[n_k] = copy.deepcopy(n_v)
add_missing(new_default_settings, self.default)
add_missing(new_default_settings, self.data)
if callable(self.settings_change_callback):
self.settings_change_callback(self)
@staticmethod
def _parse_validation_rule(rule):
if isinstance(rule, dict) and ("default" in rule or "children" in rule):
options = {
"writeBackIfNotExist": rule.get("writeBackIfNotExist", False),
"recoverMissingItems": rule.get("recoverMissingItems", False), # Recover missing item (only for list)
"useSameForm": rule.get("useSameForm", False), # For list that contains dict item (all keys are same)
}
if "children" in rule:
return rule["children"], options
return rule.get("default"), options
if (
isinstance(rule, (list, tuple))
and len(rule) == 2
and isinstance(rule[1], dict)
):
return rule[0], rule[1]
return rule, {}
def _validate_dict_form(self, sample, target):
for k, v in sample.items():
if target.get(k, None) is None:
target[k] = copy.deepcopy(v)
elif isinstance(v, dict) and isinstance(target.get(k, None), dict):
target[k] = self._validate_dict_form(v, target.get(k, {}))
elif type(target.get(k, None)) is not type(v):
candidate = target[k].get("default") if isinstance(target[k], dict) else None
if type(candidate) is type(v):
target[k] = copy.deepcopy(candidate)
else:
target[k] = copy.deepcopy(v)
return target
def _validate_value(self, value, default_value, rule_default, options):
if options.get("useSameForm"):
if not isinstance(value, list):
if isinstance(default_value, list):
return copy.deepcopy(default_value)
return []
validated = copy.deepcopy(value)
if options.get("recoverMissingItems") and isinstance(default_value, list):
for item in default_value:
if item not in validated:
validated.append(copy.deepcopy(item))
form = rule_default
if isinstance(form, list):
form = next((item for item in form if isinstance(item, dict)), None)
if not isinstance(form, dict):
return validated
for index, item in enumerate(validated):
if not isinstance(item, dict):
continue
validated[index] = self._validate_dict_form(copy.deepcopy(form), item)
return validated
if isinstance(rule_default, dict):
if not isinstance(value, dict):
value = {}
if not isinstance(default_value, dict):
default_value = {}
return self.validate_data(value, default_value, rule_default)
if isinstance(default_value, list):
if not isinstance(value, list):
return copy.deepcopy(default_value)
validated = copy.deepcopy(value)
if options.get("recoverMissingItems"):
for item in default_value:
if item not in validated:
validated.append(copy.deepcopy(item))
return validated
if default_value is None:
return value
if type(value) is not type(default_value):
return copy.deepcopy(default_value)
return value
def _dumps(self, data):
if not callable(self.dumps_func):
raise TypeError(f"dumps_func must be callable")
return self.dumps_func(data)
def __getitem__(self, key):
return self.data[key]
def __setitem__(self, key, value):
self.data[key] = value
if callable(self.settings_change_callback):
self.settings_change_callback(self)
def __eq__(self, other):
if isinstance(other, FileSettings):
return self.path == other.path and self.data == other.data
if isinstance(other, dict):
return self.data == other
return NotImplemented