From d1663c96e9e714eb872c54df7ae1679d688e1f71 Mon Sep 17 00:00:00 2001 From: Wei Hong Date: Sun, 17 May 2026 17:16:58 +0800 Subject: [PATCH] Update files. --- README.md | 2 +- client.py | 433 ++++++++++++++++++++++++ main.py | 739 +++++++++++++++++++++++++++++++++++++++++ utils/common.py | 129 +++++++ utils/file_settings.py | 360 ++++++++++++++++++++ 5 files changed, 1662 insertions(+), 1 deletion(-) create mode 100644 client.py create mode 100644 main.py create mode 100644 utils/common.py create mode 100644 utils/file_settings.py diff --git a/README.md b/README.md index e7864d2..93030cf 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # ServerJar -Create/Manage multiple Mineraft server all in one tool. \ No newline at end of file +Create/Manage multiple Minecraft server all in one tool. \ No newline at end of file diff --git a/client.py b/client.py new file mode 100644 index 0000000..7697604 --- /dev/null +++ b/client.py @@ -0,0 +1,433 @@ +import argparse +import asyncio +import queue +import sys +import threading +import socket +import time +import traceback +from prompt_toolkit import Application +from prompt_toolkit.layout import Layout, HSplit +from prompt_toolkit.widgets import TextArea +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.styles import Style +from prompt_toolkit.filters import has_focus +from prompt_toolkit.shortcuts import clear as ptk_clear + +version = "Beta-1" + + +class ServerJarClient(Application): + def __init__(self, **kwargs): + super().__init__(**kwargs, mouse_support=True) + + # Text style + self.style = Style.from_dict({ + "log": "bg:#000000 #ffffff", + "input": "bg:#222222 #ffffff", + "separator-area": "bg:#000000 #ffffff", + "message-area": "bg:#111111 #ffffff" + }) + # Socket + self.sock = None + + # event + self.kb = KeyBindings() + + # Areas + # self.log_lines = [] + + # self.log_control = FormattedTextControl( + # text=lambda: ANSI("".join(self.log_lines)) + # ) + + self.log_area = TextArea( + style="class:log", + wrap_lines=True, + ) + + self.separator_area = TextArea(text="=" * 10 + " Enter Command Here " + "=" * 10, height=1, + style="class:separator-area") + self.message_area = TextArea(height=1, multiline=False, style="class:message-area") + self.input_area = TextArea(height=1, prompt="> ", style="class:input", multiline=False) + + self.layout = Layout(HSplit([ + self.log_area, + self.separator_area, + self.message_area, + self.input_area, + ])) + + # Thread + self.sock_lock = threading.Lock() + self.closing_event = threading.Event() + self.client_thread = threading.Thread(target=self.client, daemon=True) + self.connect_event = threading.Event() + self.disconnect_event = threading.Event() + + # Queue + self.incoming = queue.Queue() + + # Command History + self.cmds = [] + self.current_index = None + self.start_history_flag = False + + @self.kb.add("c-c") + def closing_kb(event): + self.shutdown("Ctrl-C (Stopped by user)") + + @self.kb.add("up", filter=has_focus(self.input_area)) + def get_old_cmd(event): + # check if there's no old command available in the command history list + if len(self.cmds) < 2: + return + + # Save entered command to history list + if not self.start_history_flag: + self.insert_new_cmd_to_history(self.get_input_area_output()) + self.start_history_flag = True + + # Set current_index to 1 if it's not initiated yet + if self.current_index is None: + self.current_index = 0 + + # Avoid IndexError when it's the last one command + if self.current_index >= len(self.cmds)-1: + return + + self.current_index += 1 + + old_cmd = self.cmds[self.current_index] + + self.set_input_area_text(old_cmd) + + @self.kb.add("down", filter=has_focus(self.input_area)) + def get_new_cmd(event): + # Check if there's no old command available in the command history list + if len(self.cmds) < 2: + return + + # Get new command only working when current_index greater then 0 + if self.current_index is None: + return + + # Avoid IndexError when it's the last one command + if self.current_index-1 < 0: + return + + self.current_index -= 1 + + new_cmd = self.cmds[self.current_index] + + self.set_input_area_text(new_cmd) + + @self.kb.add("enter", filter=has_focus(self.input_area)) + def enter_kb(event): + # Disable history flag + self.start_history_flag = False + + cmd = self.input_area.text + self.input_area.text = "" + + # Save command + self.insert_new_cmd_to_history(cmd) + + if cmd == "_exit": + self.shutdown("_exit command detected") + return + + # if re.match(r"^_[A-Za-z0-9]+(?:$|_.*)", cmd): + exit_flag = self.command_parser(cmd) + + if exit_flag: + return + + with self.sock_lock: + s = self.sock + + if s: + try: + s.sendall((cmd + "\n").encode("utf-8")) + except OSError as e: + self._err(f"Send failed: {e}\n") + else: + self._err("The remote server is not connected yet.") + + @self.kb.add("c-w", filter=has_focus(self.input_area)) + def focus_log_area(event): + self.layout.focus(self.log_area) + self.display_message("Now focus at log area.") + + @self.kb.add("c-w", filter=has_focus(self.log_area)) + def focus_log_area(event): + self.layout.focus(self.input_area) + self.display_message("Now focus at input area.") + + self.key_bindings = self.kb + self.full_screen = True + + # Host and Port + self.host = None + self.port = None + + def command_parser(self, command): + def connect_to_server(host, port): + # update target + self.host = host + self.port = port + + # trigger connection + self.disconnect_event.clear() + self.connect_event.set() + + return True + + def disconnect_from_server(cmd): + self._log("Disconnecting...") + self.disconnect_event.set() + self.connect_event.clear() + + with self.sock_lock: + s = self.sock + + if s: + try: + s.shutdown(socket.SHUT_RDWR) + except Exception as e: + self._err("An error occurred while shutting down sock: " + str(e)) + try: + s.close() + except Exception as e: + self._err("An error occurred while closing the socket: {}".format(e)) + else: + self._err("The remote server is not connected yet.") + + return True + + def connect_to_server_parser(cmd): + target = cmd[3:].strip() + try: + host, port_str = target.split(":", 1) + host = host.strip() + port = int(port_str.strip()) + + if not host: + raise ValueError("empty host") + except Exception as _: + self._err("Usage: _c host:port") + return True + + connect_to_server(host, port) + + return True + + def _shutdown(cmd): + self.shutdown("_exit command detected") + return True + + def _top(cmd): + self.log_area.buffer.cursor_position = 0 + self.invalidate() + return True + + def _bottom(cmd): + self.log_area.buffer.cursor_position = len(self.log_area.buffer.text) + self.invalidate() + return True + + def _version(cmd): + self._log("ServerJar Client Version {}".format(version)) + return True + + cmd_map = { + "_exit": _shutdown, + "_c": connect_to_server_parser, + "_d": disconnect_from_server, + "_top": _top, + "_bottom": _bottom, + "_version": _version, + } + + for cmd in cmd_map.keys(): + if command.startswith(cmd): + return_flag = cmd_map[cmd](command) + return return_flag + + # self._err("Unknown command '%s'" % command) + + return False + + def display_message(self, message): + self.message_area.text = message + + def get_input_area_output(self): + return self.input_area.text + + def set_input_area_text(self, text): + self.input_area.text = text + + def insert_new_cmd_to_history(self, cmd): + self.cmds.insert(0, cmd) + + class ServerInfoInvalidException(Exception): + def __init__(self, message, **kwargs): + super().__init__() + self.msg = message + + def __str__(self): + return self.msg + + def arguments_parser(self): + parser = argparse.ArgumentParser() + + # parser.add_argument("-p", "--port", type=int, help="Port number", required=True) + # parser.add_argument('-host', '--host', type=str, help="Hostname", required=True) + + args = parser.parse_args() + + return args + + def shutdown(self, reason=""): + if self.closing_event.is_set(): + return + + self.closing_event.set() + + self._log(f"Shutting down for reason: {reason}") + + with self.sock_lock: + s = self.sock + if s: + try: + s.close() + except Exception as e: + self._err(f"Unable to close socket: {e}") + pass + + # Exit ui event loop + self.full_exit() + + # @staticmethod + # def clear_screen(): + # os.system("cls" if os.name == "nt" else "clear") + + def log(self, message): + self.incoming.put(f"{message}") + + def _log(self, message): + # Nothing change + self.incoming.put(f"[client] {message}") + # self.display_message(f"{message}") + + def _err(self, message): + # WIP... (display text as red color if the log is an error message) + self.incoming.put(f"[client|err] {message}") + # self.display_message(f"ERROR: {message}") + + def _warn(self, message): + # WIP... (Display text as yellow color if the log is a warning message) + self.incoming.put(f"[client|warn] {message}") + # self.display_message(f"WARNING: {message}") + + def full_exit(self): + self.exit() + sys.exit() + + async def consume_incoming(self): + loop = asyncio.get_running_loop() + while True: + msg = await loop.run_in_executor(None, self.incoming.get) + + # self.log_lines.append(msg) + # + # if len(self.log_lines) > 2000: + # self.log_lines = self.log_lines[-2000:] + + if len(self.log_area.text) > 0: + self.log_area.text += "\n" + msg + else: + self.log_area.text += msg + + if len(self.log_area.text) > 300_000: + self.log_area.text = "New log start here.\n" + self.log_area.text[-250_000:] + + self.log_area.buffer.cursor_position = len(self.log_area.buffer.text) + + self.invalidate() + + def client(self): + ptk_clear() + + + while not self.closing_event.is_set(): + self._log("Type _c host:port to connect. (_d to disconnect)") + self.connect_event.wait() + + if self.closing_event.is_set(): + break + + if not self.host or not self.port: + self._err("No host/port set. Usage: _c host:port") + self.connect_event.clear() + continue + + try: + self._log(f"Connecting to {self.host}:{self.port} ...") + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((self.host, self.port)) + + with self.sock_lock: + self.sock = s + + self._log("Remote socket server connected [HOST: {}, PORT: {}]".format(self.host, self.port)) + + buffer = "" + while True: + # Receive remote server broadcast message and display it on log area + data = s.recv(4096) + if not data: + raise ConnectionError("Server closed") + buffer += data.decode("utf-8", errors="replace") + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + # ### Use normal log method ### + self.log(line) + + except (ConnectionError, OSError) as e: + if not self.closing_event.is_set(): + self._warn(f"Disconnected: {e}, retrying...") + time.sleep(1) + except KeyboardInterrupt: + self._log("Exiting...") + break + except Exception as e: + self._err(f"Unhandled exception: {e}") + self._err(f"{traceback.format_exc()}") + finally: + with self.sock_lock: + try: + if self.sock: + self._log("Closing remote connection (From {}:{})...".format(self.host, self.port)) + self.sock.close() + except Exception as e: + self._err(f"Unable to close socket: {e}") + pass + self.sock = None + + # reset flags + self.disconnect_event.clear() + self.connect_event.clear() + + + def startup(self): + self.arguments_parser() + self.layout.focus(self.input_area) + asyncio.create_task(self.consume_incoming()) + + self.client_thread.start() + + +if __name__ == "__main__": + app = ServerJarClient() + app.run(pre_run=app.startup) diff --git a/main.py b/main.py new file mode 100644 index 0000000..d15e984 --- /dev/null +++ b/main.py @@ -0,0 +1,739 @@ +""" +ServerJar + +Wei - 2026 +""" +import re +import shlex +import signal +import socketserver +import logging +import os +import queue +import sys +import subprocess +import threading +import time +from pathlib import Path +import click +import yaml +from utils.common import download_latest_paper_jar, get_latest_version_minecraft, get_specific_version_paper_builds, \ + download_server_jar, download_latest_build_paper_jar +from utils.file_settings import FileSettings +from utils.file_settings import required_list, required_value + +ROOT_DIR = Path(os.getcwd()) +SERVER_CONFIG_PATH = ROOT_DIR / "config" / "server.yml" + + +def exit(message): + click.echo(click.style(message, fg='green')) + + +@click.group() +def main(): + print("ServerJar\n" + "WorkDir: {}".format(ROOT_DIR)) + + +@main.command() +@click.option("--name", "-d", default="Unnamed Server", show_default=True, help="Server name") +@click.option("--mc-version", "-m", + default=None, + help="Specify Minecraft version to download (If not specified, download latest Minecraft version)", + required=False) +@click.option("--build", "-b", default=None, + help="Specify paper build to download (Use latest Minecraft version if not specified)") +@click.option("--snapshot", is_flag=True, + help="Download snapshot version Minecraft (Use it if the current mc-version type is snapshot)") +@click.option("--latest", is_flag=True, help="Download latest Minecraft version (With latest build paper)") +@click.option("--list-builds", is_flag=True, help="List available paper build versions") +@click.option("--filename", default=None, help="Custom SERVER.jar file name") +def create_server(name, mc_version, build, snapshot, latest, list_builds, filename): + server_dir = Path("servers", name) + + if server_dir.exists(): + result = str(input("Found existing server dir. Do you want to overwrite it and continue? [y/N] ")) + + if not result.lower() == "y": + exit("User aborted.") + + server_dir.mkdir(parents=True, exist_ok=True) + + try: + release = True if not snapshot else False + if latest: + click.echo("Fetching latest Mojang release version...") + out = download_latest_paper_jar(server_dir, filename=filename, release=release) + click.echo(f"Done: {out}") + return + + if mc_version is None: + click.echo("The mc-version is not specified. Fetching latest Minecraft release version...") + mc_version = get_latest_version_minecraft(release=release) + + if list_builds: + builds = get_specific_version_paper_builds(mc_version) + if not builds: + click.echo(f"No builds found for Paper {mc_version}") + return + click.echo(f"Paper {mc_version} builds:") + click.echo(", ".join(map(str, builds[-20:]))) + click.echo("(Only list latest 20 builds)") + return + + if build: + click.echo(f"Downloading Paper {mc_version} build {build} ...") + out = download_server_jar(mc_version, str(build), server_dir, filename=filename) + click.echo(f"Done: {out}") + else: + click.echo(f"Downloading latest Paper build for {mc_version} ...") + out = download_latest_build_paper_jar(mc_version, server_dir, filename=filename) + click.echo(f"Done: {out}") + + except Exception as e: + raise click.ClickException(str(e)) + + +def load_settings(): + s = FileSettings( + SERVER_CONFIG_PATH, + { + "servers": [], + "socketServerHostname": "127.0.0.1", + "socketServerPort": 25560 + }, + { + "socketServerHostname": required_value("127.0.0.1"), + "socketServerPort": required_value(25560), + "servers": required_list( + { + "name": "Unnamed Server", + "version": "unknown", + "description": "", + "command": "", + "workDir": "", + "port": 25565, + "host": "127.0.0.1", + "enable": True + }, + use_same_form=True, + ) + }, + dumps_func=yaml.safe_dump, + load_func=yaml.safe_load, + ) + + if not s.exists(): + s.create() + + s.read_from_exist() + + return s + + +@main.command() +@click.option("--server-folder-path", "-sf", + help="The destination of the folder", required=True) +@click.option("--server-jar-path", "-sp", + help="The destination of the SERVER.jar", required=True) +@click.option("--socket-server-host", "-srh", + help="Hostname of the socket server", required=True) +@click.option("--socket-server-port", "-srp", + help="Port of the socket server", required=True) +@click.option("--java-exec-path", "-p", show_default=True, + help="The destination of the java executable", default="java") +@click.option("--x-memory-initial", "-xms", show_default=True, + help="Initial allocation size of the memory for server", + type=str, default="1G") +@click.option("--x-memory-maximum", "-xmx", show_default=True, + help="Maximum allocation size of the memory for server", + type=str, default="4G") +@click.option("--nogui", "-ng", + help="Disable server window", + is_flag=True) +@click.option("--extra-args", "-e", + help="Extra java arguments", type=str, default="") +@click.option("--custom-commands", "-cd", + help="Custom run commands", type=str, default="") +def create_bootstrap(server_folder_path, server_jar_path, socket_server_host, socket_server_port, + java_exec_path, x_memory_initial, x_memory_maximum, nogui, extra_args, custom_commands): + + settings = load_settings() + + print("There's some information you need to fill for server config.") + name = str(input("New server name: ")) + version = str(input("Server version: ")) + desc = str(input("Server description: ")) + + found_exist = False + for srv in settings["servers"]: + if name == srv["name"]: + found_exist = True + + if found_exist: + result = str(input("WARNING: Found duplicate server name. Would you like to continue? [y/N] ")) + if not result.lower() == "y": + exit("User aborted.") + + extra_args += " nogui" if nogui else "" + cmd = f"{java_exec_path} -Xms{x_memory_initial} -Xmx{x_memory_maximum} -jar {server_jar_path} {extra_args}" + + if custom_commands: + print("Will use custom commands as replacement.") + cmd = custom_commands + + print(f"Server command: {cmd}") + + with settings.edit() as s: + print("Saving...") + s["servers"].append({ + "name": name, + "version": version, + "description": desc, + "command": cmd, + "workDir": server_folder_path, + "port": socket_server_port, + "host": socket_server_host, + "enable": True, + }) + + print("Done") + +class SocketServer: + def __init__(self, host, port): + self.logger = logging.getLogger("SocketServer") + self.stdout_handler = logging.StreamHandler(sys.stdout) + self.stdout_handler.setFormatter(logging.Formatter("%(level)s:%(message)s")) + + # flags + self.stop_event = threading.Event() + + # Server + self.host = host + self.port = port + + self._tcp_server: socketserver.ThreadingTCPServer | None = None + self._tcp_thread: threading.Thread | None = None + + self._log_subscribers: set[queue.Queue] = set() + self._sub_lock = threading.Lock() + + self.command_receivers = {} + + # ------------------------- + # Socket Server + # ------------------------- + def publish_log(self, server_name: str, line: str | None = None): + if line is None: + message = server_name + else: + message = f"[{server_name}] {line}" + + with self._sub_lock: + for q in list(self._log_subscribers): + try: + q.put_nowait(message) + except queue.Full: + pass + + def subscribe_logs(self) -> queue.Queue: + q = queue.Queue(maxsize=2000) + with self._sub_lock: + self._log_subscribers.add(q) + return q + + def unsubscribe_logs(self, q: queue.Queue): + with self._sub_lock: + self._log_subscribers.discard(q) + + def handler_command(self, command: str): + self.logger.info("On...no co", command) + + def _build_tcp_server(self): + manager = self + + class TCPServer(socketserver.ThreadingTCPServer): + allow_reuse_address = True + daemon_threads = True + + def __init__(self, server_address, RequestHandlerClass): + super().__init__(server_address, RequestHandlerClass) + self.manager = manager + + class Handler(socketserver.BaseRequestHandler): + current_server_record = { + } + def setup(self): + mgr: Server = self.server.manager + + mgr.logger.info(f"[SYS] Client from {self.client_address[0]}:{self.client_address[1]} connected,") + + def handle(self): + mgr: Server = self.server.manager + + log_q = mgr.subscribe_logs() + stop_evt = threading.Event() + + def push_logs(): + while not stop_evt.is_set(): + try: + line = log_q.get(timeout=0.5) + except Exception: + continue + try: + self.request.sendall(f"[LOG] {line}\n".encode("utf-8")) + except OSError: + break + + t = threading.Thread(target=push_logs, daemon=True) + t.start() + + try: + self.request.sendall(b"[SYS] connected\n") + buf = b"" + + while True: + data = self.request.recv(4096) + if not data: + break + + buf += data + while b"\n" in buf: + raw, buf = buf.split(b"\n", 1) + cmd = raw.decode("utf-8", errors="replace").strip() + + if not cmd: + continue + + current_server = self.current_server_record.get( + f"{self.client_address[0]}:{self.client_address[1]}", + None) + + mgr.logger.info(f"[SYS] Client from {self.client_address[0]}:{self.client_address[1]} send command \"{cmd}\".") + + ok = None + message = None + + if cmd.startswith("__"): + if cmd == "__exit": + # Exit socket + self.request.sendall(b"[SYS] bye\n") + return + if cmd == "__stop_all": + self.request.sendall( + f"[SYS] Stopping all servers...bye\n".encode("utf-8") + ) + mgr.stop_event.set() + return + elif cmd.startswith("__c"): + match = re.match(r"^__c\s+(.+)$", cmd) + if match: + server_name = match.group(1).strip() + receiver = mgr.get_command_receiver(server_name) + else: + server_name = None + receiver = None + + if receiver is not None: + self.current_server_record[f"{self.client_address[0]}:{self.client_address[1]}"] = server_name + self.request.sendall( + f"[SYS] Connected to server \"{server_name}\" shell.\n".encode( + "utf-8") + ) + else: + self.request.sendall( + f"[SYS:ERR] Target server \"{server_name}\" does not exist.\n".encode( + "utf-8") + ) + elif cmd == "__d": + self.current_server_record[f"{self.client_address[0]}:{self.client_address[1]}"] = None + self.request.sendall( + f"[SYS] Disconnected from current server \"{current_server}\"'s shell.\n".encode( + "utf-8") + ) + else: + if current_server is not None: + receiver = mgr.get_command_receiver(current_server) + func = receiver.get("receiver") if receiver else None + + if callable(func): + ok, message = func(cmd) + else: + self.request.sendall( + "[SYS:ERR] Target server's receiver are not callable.\n".encode( + "utf-8") + ) + else: + # "target server" is Minecraft server + self.request.sendall( + "[SYS:ERR] You are not connected to any target server.\n".encode("utf-8") + ) + else: + if current_server is not None: + receiver = mgr.get_command_receiver(current_server) + func = receiver.get("processReceiver") if receiver else None + + if callable(func): + ok, message = func(cmd) + else: + self.request.sendall( + "[SYS:ERR] Target server's processReceiver are not callable.\n".encode( + "utf-8") + ) + else: + # "target server" is Minecraft server + self.request.sendall( + "[SYS:ERR] You are not connected to any target server.\n".encode("utf-8") + ) + + if ok is not None: + msg = f"[OK] Command received. {cmd}\n" if ok else "[ERR] An error occurred\n" + if message is not None: + msg = msg + message + "\n" + self.request.sendall(msg.encode("utf-8")) + except ConnectionResetError: + mgr.logger.info( + "[SYS] Client disconnected. From {}:{}".format(self.client_address[0], self.client_address[1])) + finally: + stop_evt.set() + mgr.unsubscribe_logs(log_q) + + return TCPServer((self.host, self.port), Handler) + + def start_socket_server(self): + if self._tcp_server: + print("[SOCK] already running") + return + + self._tcp_server = self._build_tcp_server() + + def loop(): + self.logger.info(f"[SOCK] listening on {self.host}:{self.port}") + self._tcp_server.serve_forever(poll_interval=0.5) + + self._tcp_thread = threading.Thread(target=loop, daemon=True) + self._tcp_thread.start() + + def stop_socket_server(self): + if not self._tcp_server: + return + self.logger.info("[SOCK] shutting down") + self._tcp_server.shutdown() + self._tcp_server.server_close() + self._tcp_server = None + if self._tcp_thread and self._tcp_thread.is_alive(): + self._tcp_thread.join(timeout=2) + self._tcp_thread = None + + def register_command_receiver(self, server_name, receiver, process_receiver): + if server_name in self.command_receivers.keys(): + self.logger.warning(f"[SYS] Command receiver name \"{server_name}\" already registered") + else: + self.command_receivers[server_name] = { + "receiver": receiver, + "processReceiver": process_receiver, + } + + def get_command_receiver(self, server_name): + if server_name in self.command_receivers.keys(): + return self.command_receivers[server_name] + + return None + +class Server: + def __init__(self, name, version, description, command, work_dir, port, host, enable): + self._stdout_thread = None + + # Process + self.proc: subprocess.Popen | None = None + self.proc_lock = threading.Lock() + + self.running = False + self.stopping = False + + # logger + self.logger = None + # Ensure all servers name are not duplicated + if f"Server.{name}" in logging.root.manager.loggerDict: + index = 1 + name = f"Server.{name}_1" + while name not in logging.root.manager.loggerDict: + name = f"Server.{name}_{index}" + + self.logger = logging.getLogger(name) + self.logger.setLevel(logging.INFO) + self.stdout_handler = logging.StreamHandler(sys.stdout) + self.stdout_handler.setFormatter(logging.Formatter("[%(asctime)s:%(level)s]: %(message)s")) + self.logger.addHandler(self.stdout_handler) + + # Values from config + self.name = name + self.version = version + self.description = description + self.command = command + self.work_dir = work_dir + self.port = port + self.host = host + self.enable = enable + + self.log_queue = queue.Queue() # stdout lines + self._threads: list[threading.Thread] = [] + self.broadcaster = None + + def start_process(self): + self.logger.info("Starting process...") + + with self.proc_lock: + if self.proc and self.proc.poll() is None: + self.logger.warning("[PROC] already running, skip") + return + + args = shlex.split(self.command) + if not args: + raise ValueError(f"Server \"{self.name}\" command is empty.") + + self.logger.info("[PROC] spawning: %s", self.command) + + self.proc = subprocess.Popen( + args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + cwd=self.work_dir, + encoding="utf-8", + errors="replace" + ) + + self.logger.info(f"[PROC] Process spawned. PID = {self.proc.pid}") + + self._stdout_thread = threading.Thread(target=self._stdout_reader_loop, daemon=True) + self._stdout_thread.start() + + def publish_log(self, line): + if self.broadcaster is None: + return + + self.broadcaster(self.name, line) + + def register_broadcaster(self, broadcaster): + self.broadcaster = broadcaster + + def _stdout_reader_loop(self): + self.logger.info("[PROC] stdout reader started") + while self.running: + with self.proc_lock: + proc = self.proc + out = proc.stdout if proc else None + + if not proc or proc.poll() is not None or not out: + self.logger.info("[PROC] process ended / stdout closed") + break + + line = out.readline() + if not line: + break + + line = line.rstrip("\n") + self.logger.info("[PROC] %s", line) + self.publish_log(line) + + def send_command(self, command: str) -> bool: + with self.proc_lock: + if not self.proc or self.proc.poll() is not None: + return False + if not self.proc.stdin: + return False + + self.proc.stdin.write(command + "\n") + self.proc.stdin.flush() + return True + + def stop_process(self, timeout: float = 10.0): + with self.proc_lock: + proc = self.proc + + if not proc: + return + + # Minecraft only + self.send_command("stop") + + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + with self.proc_lock: + self.proc = None + + # ------------------------- + # Manager lifecycle + # ------------------------- + def start(self): + if self.running: + return + self.running = True + self.stopping = False + + try: + self.start_process() + except Exception: + self.running = False + self.stopping = False + raise + + def stop(self): + if not self.running: + return + + self.stopping = True + self.running = False + + self.stop_process() + + def is_process_alive(self) -> bool: + with self.proc_lock: + return self.proc is not None and self.proc.poll() is None + + def command_receiver(self, command): + if command == "__status": + with self.proc_lock: + pid = self.proc.pid if self.proc else None + return True, (f"processAlive: {self.is_process_alive()}\n" + f"processPID: {pid}\n" + f"workdir: {self.work_dir}") + elif command == "__stop": + self.stop() + return True, f"Server \"{self.name}\" process stopped" + elif command == "__start": + self.start() + return True, f"Server \"{self.name}\" process started" + else: + return False, f"Unknown command: {command}" + + def process_command_receiver(self, command): + with self.proc_lock: + if not self.proc or self.proc.poll() is not None: + return False, "Process is not running." + if not self.proc.stdin: + return False, "Process standard input are not available." + + self.proc.stdin.write(command + "\n") + self.proc.stdin.flush() + return True, "Command sent." + +def load_all_server_from_settings(settings: FileSettings): + servers = [] + with settings.edit() as s: + for server_conf in s.get("servers", []): + servers.append(Server( + name=server_conf.get("name"), + version=server_conf.get("version"), + description=server_conf.get("description"), + command=server_conf.get("command"), + work_dir=server_conf.get("workDir"), + port=server_conf.get("port"), + host=server_conf.get("host"), + enable=server_conf.get("enable") + )) + + return servers + + +@main.command() +def runserver(): + logger = logging.getLogger(__name__) + formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s') + logger.setLevel(logging.INFO) + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter(formatter) + logger.addHandler(stdout_handler) + + # Server + settings = load_settings() + servers = load_all_server_from_settings(settings) + logger.info("{} servers available".format(len(servers))) + + # Socket + logger.info("Starting socket server") + socket_server = SocketServer(settings.get("socketServerHostname", "127.0.0.1"), + settings.get("socketServerPort", 25560)) + socket_server.start_socket_server() + + # Flags + stop_once = False + + def cleanup(): + nonlocal stop_once + if stop_once: + return + + stop_once = True + + for s in servers: + logger.info("Stopping server {}...".format(s.name)) + s.stop() + + logger.info("Closing socket server...") + socket_server.stop_socket_server() + logger.info("Done") + + def sigint_handler(signum, frame): + logger.info("Caught SIGINT, exiting...") + cleanup() + sys.exit(0) + + signal.signal( + signal.SIGINT, + sigint_handler + ) + + # Boot server + logger.info("Starting server") + for server in servers: + if server.enable: + server.register_broadcaster(socket_server.publish_log) + socket_server.register_command_receiver(server.name, server.command_receiver, + server.process_command_receiver) + try: + server.start() + except Exception as e: + logger.error(f"Server {server.name} failed to start: {e}") + else: + logger.info(f"Server {server.name} started.") + else: + logger.info(f"Server {server.name} is disabled.") + try: + stop = False + + while not stop: + if socket_server.stop_event.is_set(): + logger.info("Remote stop event triggered. Stopping...") + cleanup() + stop = True + continue + + for server in list(servers): + if server.running and not server.is_process_alive(): + logger.info(f"Server {server.name} stopped.") + server.running = False + + if servers and not any(server.running for server in servers): + cleanup() + stop = True + continue + + time.sleep(0.5) + except KeyboardInterrupt: + logger.info("Stopping server...") + cleanup() + + logger.info("Stopped!") + + +if __name__ == "__main__": + main() diff --git a/utils/common.py b/utils/common.py new file mode 100644 index 0000000..8b7d5d6 --- /dev/null +++ b/utils/common.py @@ -0,0 +1,129 @@ +from pathlib import Path +import requests +import click +import os + +PAPER_VERSION_API = "https://api.papermc.io/v2/projects/paper/versions/{}" +PAPER_SERVER_JAR_API = "https://api.papermc.io/v2/projects/paper/versions/{}/builds/{}/downloads/paper-{}-{}.jar" +MOJANG_VERSION_MANIFEST_V2 = "https://piston-meta.mojang.com/mc/game/version_manifest_v2.json" + + +def download_file(url: str, destination: Path, chunk_size: int = 1024 * 512): + destination.mkdir(parents=True, exist_ok=True) + + with requests.get(url, stream=True, timeout=30) as r: + if r.status_code != 200: + raise Exception(f"Download failed: {r.status_code}\nResponse: {r.text}") + + total = int(r.headers.get("content-length", 0)) + + with destination.open(mode="wb", buffering=chunk_size).write(r.content) as f: + if total > 0: + with click.progressbar(length=total, label=f"Downloading {os.path.basename(destination)}") as bar: + for chunk in r.iter_content(chunk_size=chunk_size): + if chunk: + f.write(chunk) + bar.update(len(chunk)) + else: + click.echo(f"Downloading {os.path.basename(destination)} (unknown size)") + for chunk in r.iter_content(chunk_size=chunk_size): + if chunk: + f.write(chunk) + + +def get_specific_version_paper_builds(minecraft_version: str) -> list[dict[str, str]]: + """ + Get specific version of Paper builds + :param minecraft_version: + :return: + """ + url = PAPER_VERSION_API.format(minecraft_version) + try: + r = requests.get(url) + + if r.status_code == 200: + return r.json().get("builds", []) + else: + raise Exception("Unable to fetch build version for {}\n" + "Response: {}".format(minecraft_version, r.text)) + except requests.exceptions.RequestException as e: + raise Exception("Unable to get paper version from server.\n" + "URL: {}\n" + "Error: {}".format(url, e)) + + +def get_version_list(release=True): + try: + r = requests.get(MOJANG_VERSION_MANIFEST_V2) + + if r.status_code == 200: + if release: + return [version.get('id') for version in r.json().get("versions", []) + if version.get("type") == "release" if version.get('id') is not None] + return r.json()["versions"] + else: + raise Exception("Unable to fetch version list.\n" + "Response: {}".format(r.text)) + except requests.exceptions.RequestException as e: + raise Exception("Unable to get version list from server.\n" + "URL: {}\n" + "Error: {}".format(MOJANG_VERSION_MANIFEST_V2, e)) + + +def get_latest_version_minecraft(release=True): + version_list = get_version_list(release=release) + ver = version_list[0].get("id") if version_list else None + + if ver is None: + raise Exception("Unable to find latest version in version list.\n") + + return ver + + +def download_server_jar(minecraft_version: str, build_version: str, destination: Path, filename: str | None = None): + """ + Download server jar (paper server only) + """ + url = PAPER_SERVER_JAR_API.format(minecraft_version, build_version, minecraft_version, build_version) + + if filename: + jar_name = filename + if not jar_name.endswith(".jar"): + jar_name += ".jar" + else: + jar_name = os.path.basename(url) + + destination.parent.mkdir(parents=True, exist_ok=True) + destination = Path(destination, jar_name) + + try: + download_file(url, destination) + return destination + except Exception as e: + raise Exception("Unable to download server jar for version {}\nURL: {}\nError: {}".format(minecraft_version, url, e)) + + +def get_latest_build_of_version(minecraft_version: str) -> str: + builds = get_specific_version_paper_builds(minecraft_version) + if not builds: + raise Exception(f"No builds found for Paper {minecraft_version}") + # Paper API usually lists builds ascending; latest is the last one + return str(builds[-1]) + + +def download_latest_build_paper_jar(minecraft_version: str, destination_dir: Path, filename: str | None = None): + build = get_latest_build_of_version(minecraft_version) + return download_server_jar(minecraft_version, build, destination_dir, filename=filename) + + +def download_latest_paper_jar(destination_dir: Path, filename: str | None = None, release: bool = True): + """ + Download latest Minecraft version (release) paper jar + """ + vers = get_version_list(release=release) + + if len(vers) == 0: + raise Exception("No versions available for Minecraft (Did the server return wrong response ?)") + + latest_mc = vers[0] + return download_latest_build_paper_jar(latest_mc, destination_dir, filename=filename) \ No newline at end of file diff --git a/utils/file_settings.py b/utils/file_settings.py new file mode 100644 index 0000000..29d28d4 --- /dev/null +++ b/utils/file_settings.py @@ -0,0 +1,360 @@ +""" +FileSettings + +Original author: Kitee Contributors +""" +import contextlib +import copy +import json +from pathlib import Path + + +def validation_rule( + default=None, + *, + children=None, + write_back_if_not_exist=False, + recover_missing_items=False, + use_same_form=False +): + rule = { + "writeBackIfNotExist": write_back_if_not_exist, + "recoverMissingItems": recover_missing_items, + "useSameForm": use_same_form, + } + + if children is not None: + rule["children"] = children + else: + rule["default"] = default + + return rule + + +def required_value(default, *, recover_missing_items=False): + return validation_rule( + default, + write_back_if_not_exist=True, + recover_missing_items=recover_missing_items, + ) + + +def required_section(children): + return validation_rule(children=children, write_back_if_not_exist=True) + +def required_list(children, use_same_form=False): + return validation_rule(children=children, + write_back_if_not_exist=True, + use_same_form=use_same_form) + + +class FileSettings: + """ + Simple Settings Object + IMPORTANT: Settings always use self.data as the main operate data, NOT FROM disk settings file!!! + + Usage: + FileSettings.create() -> Create settings file (path is self.path) + FileSettings.load() - > Load settings file + FileSettings.save() -> Save settings file + FileSettings.edit() -> Edit settings file (Auto save when with block completes) + + Example: + with FileSettings.edit() as settings: + settings["hello"] = "world" + """ + def __init__(self, path, default, validation_rules=None, + dumps_func=json.dumps, load_func=json.load, + settings_change_callback=None, loader=None): + self.data = copy.deepcopy(default) + self.default = copy.deepcopy(default) + self.path: Path = Path(path) + self.validation_rules = validation_rules + self.dumps_func = dumps_func + self.load_func = load_func + self.loader = loader + self.settings_change_callback = settings_change_callback + + def reset(self): + """Replace self.data with self.default and save""" + self.data = copy.deepcopy(self.default) + self.save() + + if callable(self.settings_change_callback): + self.settings_change_callback(self.data) + + def __repr__(self): + return f"" + + def create(self, exist_ok=False): + """ + Create settings file (path is self.path, data use default value) + :param exist_ok: If not True, raise exception if file already exists + :return: + """ + self.path.parent.mkdir(parents=True, exist_ok=True) + + if not exist_ok and self.path.exists(): + raise FileExistsError(f'{self.path} already exists') + + self.path.write_text(self._dumps(self.default), encoding="utf-8") + + def read_from_exist(self): + self.data = self.load() + + def mload(self): + """Get data from memory""" + return copy.deepcopy(self.data) + + def load(self): + """ + Load settings file data into self.data (path is self.path) + :return: + """ + if not self.path.exists(): + raise FileNotFoundError(f'{self.path} does not exist') + + with self.path.open("r", encoding="utf-8") as settings_file: + if self.loader: + data = self.load_func(settings_file, Loader=self.loader) + else: + data = self.load_func(settings_file) + + if isinstance(self.validation_rules, dict): + data = self.validate_data(data, self.default, self.validation_rules) + + self.data = copy.deepcopy(data) + + return data + + def save(self): + """ + Save self.data (data in memory) into settings file + :return: + """ + if not self.path.exists(): + raise FileNotFoundError(f'{self.path} does not exist. Create it before saving.') + + self.path.write_text(self._dumps(self.data), encoding="utf-8") + + def get(self, key, default=None): + return self.data.get(key, default) + + def dget(self, key, default=None): # dget -> get_default + return self.default.get(key, default) + + def exists(self): + return self.path.exists() + + @contextlib.contextmanager + def edit(self): + """ + Edit settings (With auto save) + :return: + """ + if not self.exists(): + self.create() + yield self + + self.save() + + if callable(self.settings_change_callback): + self.settings_change_callback(self) + + def validate_data(self, data, default, rules): + """ + Validates data by validating rules. + :param data: dict data + :param default: default sample + :param rules: rules of all keys + :return: + """ + if not isinstance(data, dict): + data = {} + + if not isinstance(default, dict): + default = {} + + if not isinstance(rules, dict): + return copy.deepcopy(data) + + validated = copy.deepcopy(data) + + for key, rule in rules.items(): + rule_default, options = self._parse_validation_rule(rule) + default_value = copy.deepcopy(default.get(key, rule_default)) + + if key not in validated: + if options.get("writeBackIfNotExist"): + validated[key] = self._validate_value( + default_value, + default_value, + rule_default, + options, + ) + continue + + validated[key] = self._validate_value( + validated[key], + default_value, + rule_default, + options, + ) + + return validated + + def update(self, settings): + """Update self.data with new settings values.""" + if not isinstance(settings, dict): + raise TypeError("settings must be a dict") + + def update_inner(new, old): + if not isinstance(old, dict): + return + + for n_k, n_v in new.items(): + if isinstance(n_v, dict) and (n_k in old and isinstance(old[n_k], dict)): + update_inner(n_v, old[n_k]) + else: + old[n_k] = copy.deepcopy(n_v) + + update_inner(settings, self.data) + + if callable(self.settings_change_callback): + self.settings_change_callback(self) + + def update_new_settings(self, new_default_settings): + """Add missing settings to self.default and self.data.""" + if not isinstance(new_default_settings, dict): + raise TypeError("new_default_settings must be a dict") + + def add_missing(new, old): + if not isinstance(old, dict): + return + + for n_k, n_v in new.items(): + if isinstance(n_v, dict) and (n_k in old and isinstance(old[n_k], dict)): + add_missing(n_v, old[n_k]) + if n_k not in old: + old[n_k] = copy.deepcopy(n_v) + + add_missing(new_default_settings, self.default) + add_missing(new_default_settings, self.data) + + if callable(self.settings_change_callback): + self.settings_change_callback(self) + + @staticmethod + def _parse_validation_rule(rule): + if isinstance(rule, dict) and ("default" in rule or "children" in rule): + options = { + "writeBackIfNotExist": rule.get("writeBackIfNotExist", False), + "recoverMissingItems": rule.get("recoverMissingItems", False), # Recover missing item (only for list) + "useSameForm": rule.get("useSameForm", False), # For list that contains dict item (all keys are same) + } + + if "children" in rule: + return rule["children"], options + + return rule.get("default"), options + + if ( + isinstance(rule, (list, tuple)) + and len(rule) == 2 + and isinstance(rule[1], dict) + ): + return rule[0], rule[1] + + return rule, {} + + def _validate_dict_form(self, sample, target): + for k, v in sample.items(): + if target.get(k, None) is None: + target[k] = v + elif isinstance(v, dict) and isinstance(target.get(k, None), dict): + target[k] = self._validate_dict_form(v, target.get(k, {})) + + return target + + def _validate_value(self, value, default_value, rule_default, options): + if options.get("useSameForm"): + if not isinstance(value, list): + if isinstance(default_value, list): + return copy.deepcopy(default_value) + return [] + + validated = copy.deepcopy(value) + + if options.get("recoverMissingItems") and isinstance(default_value, list): + for item in default_value: + if item not in validated: + validated.append(copy.deepcopy(item)) + + form = rule_default + if isinstance(form, list): + form = next((item for item in form if isinstance(item, dict)), None) + + if not isinstance(form, dict): + return validated + + for index, item in enumerate(validated): + if not isinstance(item, dict): + continue + + validated[index] = self._validate_dict_form(copy.deepcopy(form), item) + + return validated + + if isinstance(rule_default, dict): + if not isinstance(value, dict): + value = {} + + if not isinstance(default_value, dict): + default_value = {} + + return self.validate_data(value, default_value, rule_default) + + if isinstance(default_value, list): + if not isinstance(value, list): + return copy.deepcopy(default_value) + + validated = copy.deepcopy(value) + + if options.get("recoverMissingItems"): + for item in default_value: + if item not in validated: + validated.append(copy.deepcopy(item)) + + return validated + + if default_value is None: + return value + + if type(value) is not type(default_value): + return copy.deepcopy(default_value) + + return value + + def _dumps(self, data): + if not callable(self.dumps_func): + raise TypeError(f"dumps_func must be callable") + + return self.dumps_func(data) + + def __getitem__(self, key): + return self.data[key] + + def __setitem__(self, key, value): + self.data[key] = value + + if callable(self.settings_change_callback): + self.settings_change_callback(self) + + def __eq__(self, other): + if isinstance(other, FileSettings): + return self.path == other.path and self.data == other.data + + if isinstance(other, dict): + return self.data == other + + return NotImplemented