import http.server import json import os import sys ### Variables --> SECRET_PATH = "/run/user/{}/numbus".format(os.getuid()) if os.path.exists("/run/user/{}".format(os.getuid())) else "../secrets" os.makedirs(SECRET_PATH, exist_ok=True) LOGS_DIR = "../web/logs/" PAGES_DIR = "../web/pages/" CONFIG_DIR = "../web/config/" SIGNALS_DIR = "../web/signal/" ### <-- Variables class BridgeHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): # Route for logs: /logs?type=out or /logs?type=err if self.path.startswith('/logs'): log_type = "out" if "type=err" not in self.path else "err" log_path = os.path.join(LOGS_DIR, f'deploy-{log_type}.log') self.send_response(200) self.send_header('Content-type', 'text/plain') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() if os.path.exists(log_path): with open(log_path, 'r') as f: # Read last 50 lines for better context during errors self.wfile.write("".join(f.readlines()[-50:]).encode()) return return http.server.SimpleHTTPRequestHandler.do_GET(self) def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) if self.path == '/discovery': # Store secrets in memory-backed filesystem with open(os.path.join(SECRET_PATH, "live_settings.json"), "wb") as f: f.write(post_data) self.send_response(200) self.end_headers() # Signal Bash that discovery data is ready with open(os.path.join(SIGNALS_DIR, ".discovery_ready"), "w") as f: f.write("1") elif self.path == '/deploy': with open(os.path.join(CONFIG_DIR, "numbus.yaml"), "wb") as f: f.write(post_data) self.send_response(200) self.end_headers() with open(os.path.join(SIGNALS_DIR, ".deploy_signal"), "w") as f: f.write("1") os.chdir(PAGES_DIR) http.server.HTTPServer(('localhost', 8088), BridgeHandler).serve_forever()