#!/usr/bin/env python3 """Flask web application for managing a GE Aerospace PXE server.""" import copy import os import shutil import subprocess from pathlib import Path from flask import ( Flask, flash, jsonify, redirect, render_template, request, send_file, url_for, ) from lxml import etree from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod") app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 * 1024 # 16 GB max upload # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps") CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla") BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports") WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html") BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim") IMAGE_TYPES = [ "gea-standard", "gea-engineer", "gea-shopfloor", "ge-standard", "ge-engineer", "ge-shopfloor-lockdown", "ge-shopfloor-mce", ] FRIENDLY_NAMES = { "gea-standard": "GE Aerospace Standard", "gea-engineer": "GE Aerospace Engineer", "gea-shopfloor": "GE Aerospace Shop Floor", "ge-standard": "GE Legacy Standard", "ge-engineer": "GE Legacy Engineer", "ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown", "ge-shopfloor-mce": "GE Legacy Shop Floor MCE", } NS = "urn:schemas-microsoft-com:unattend" WCM = "http://schemas.microsoft.com/WMIConfig/2002/State" NSMAP = {None: NS, "wcm": WCM} # Convenience qualified-name helpers def qn(tag): """Return a tag qualified with the default unattend namespace.""" return f"{{{NS}}}{tag}" def qwcm(attr): """Return an attribute qualified with the wcm namespace.""" return f"{{{WCM}}}{attr}" # --------------------------------------------------------------------------- # Utility helpers # --------------------------------------------------------------------------- def deploy_path(image_type): """Return the Deploy directory for an image type.""" return os.path.join(SAMBA_SHARE, image_type, "Deploy") def unattend_path(image_type): """Return the unattend.xml path for an image type.""" return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml") def image_status(image_type): """Return a dict describing the state of an image type.""" dp = deploy_path(image_type) up = unattend_path(image_type) has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False has_unattend = os.path.isfile(up) return { "image_type": image_type, "friendly_name": FRIENDLY_NAMES.get(image_type, image_type), "deploy_path": dp, "has_content": has_content, "has_unattend": has_unattend, } def service_status(service_name): """Check whether a systemd service is active.""" try: result = subprocess.run( ["systemctl", "is-active", service_name], capture_output=True, text=True, timeout=5, ) state = result.stdout.strip() return {"name": service_name, "active": state == "active", "state": state} except Exception as exc: return {"name": service_name, "active": False, "state": str(exc)} def find_usb_mounts(): """Return a list of mount-point paths that look like removable media.""" mounts = [] try: with open("/proc/mounts", "r") as fh: for line in fh: parts = line.split() if len(parts) >= 2: mount_point = parts[1] if mount_point.startswith(("/mnt/", "/media/")): if os.path.isdir(mount_point): mounts.append(mount_point) except OSError: pass return sorted(set(mounts)) # --------------------------------------------------------------------------- # XML helpers — parse / build unattend.xml # --------------------------------------------------------------------------- UNATTEND_TEMPLATE = """\ """ def _find_or_create(parent, tag): """Find the first child with *tag* or create it.""" el = parent.find(tag, namespaces={"": NS}) if el is None: el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag) return el def _settings_pass(root, pass_name): """Return the element, creating if needed.""" for s in root.findall(qn("settings")): if s.get("pass") == pass_name: return s s = etree.SubElement(root, qn("settings")) s.set("pass", pass_name) return s def parse_unattend(xml_path): """Parse an unattend.xml and return a dict of editable data.""" data = { "driver_paths": [], "computer_name": "", "registered_organization": "", "registered_owner": "", "time_zone": "", "specialize_commands": [], "oobe": { "HideEULAPage": "true", "HideOEMRegistrationScreen": "true", "HideOnlineAccountScreens": "true", "HideWirelessSetupInOOBE": "true", "HideLocalAccountScreen": "true", "NetworkLocation": "Work", "ProtectYourPC": "3", "SkipUserOOBE": "true", "SkipMachineOOBE": "true", }, "firstlogon_commands": [], "raw_xml": "", } if not os.path.isfile(xml_path): data["raw_xml"] = UNATTEND_TEMPLATE return data raw = open(xml_path, "r", encoding="utf-8").read() data["raw_xml"] = raw try: root = etree.fromstring(raw.encode("utf-8")) except etree.XMLSyntaxError: return data ns = {"u": NS} # --- offlineServicing: DriverPaths --- for dp_el in root.xpath( "u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path", namespaces=ns, ): if dp_el.text: data["driver_paths"].append(dp_el.text.strip()) # --- specialize: Shell-Setup --- for comp in root.xpath( "u:settings[@pass='specialize']/u:component", namespaces=ns, ): comp_name = comp.get("name", "") if "Shell-Setup" in comp_name: for tag, key in [ ("ComputerName", "computer_name"), ("RegisteredOrganization", "registered_organization"), ("RegisteredOwner", "registered_owner"), ("TimeZone", "time_zone"), ]: el = comp.find(qn(tag)) if el is not None and el.text: data[key] = el.text.strip() # --- specialize: RunSynchronous commands --- for cmd in root.xpath( "u:settings[@pass='specialize']//u:RunSynchronousCommand", namespaces=ns, ): order_el = cmd.find(qn("Order")) path_el = cmd.find(qn("Path")) desc_el = cmd.find(qn("Description")) data["specialize_commands"].append({ "order": order_el.text.strip() if order_el is not None and order_el.text else "", "path": path_el.text.strip() if path_el is not None and path_el.text else "", "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", }) # --- oobeSystem --- for comp in root.xpath( "u:settings[@pass='oobeSystem']/u:component", namespaces=ns, ): comp_name = comp.get("name", "") if "OOBE" in comp_name or "Shell-Setup" in comp_name: oobe_el = comp.find(qn("OOBE")) if oobe_el is not None: for child in oobe_el: local = etree.QName(child).localname if local in data["oobe"] and child.text: data["oobe"][local] = child.text.strip() # FirstLogonCommands flc = comp.find(qn("FirstLogonCommands")) if flc is not None: for sync in flc.findall(qn("SynchronousCommand")): order_el = sync.find(qn("Order")) cl_el = sync.find(qn("CommandLine")) desc_el = sync.find(qn("Description")) data["firstlogon_commands"].append({ "order": order_el.text.strip() if order_el is not None and order_el.text else "", "commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "", "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", }) return data def build_unattend_xml(form_data): """Build a complete unattend.xml string from form data dict.""" root = etree.Element(qn("unattend"), nsmap=NSMAP) # --- windowsPE (empty) --- _settings_pass(root, "windowsPE") # --- offlineServicing: DriverPaths --- offline = _settings_pass(root, "offlineServicing") driver_paths = form_data.get("driver_paths", []) if driver_paths: comp = etree.SubElement(offline, qn("component")) comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE") comp.set("processorArchitecture", "amd64") comp.set("publicKeyToken", "31bf3856ad364e35") comp.set("language", "neutral") comp.set("versionScope", "nonSxS") dp_container = etree.SubElement(comp, qn("DriverPaths")) for idx, dp in enumerate(driver_paths, start=1): if not dp.strip(): continue pac = etree.SubElement(dp_container, qn("PathAndCredentials")) pac.set(qwcm("action"), "add") pac.set(qwcm("keyValue"), str(idx)) path_el = etree.SubElement(pac, qn("Path")) path_el.text = dp.strip() # --- specialize --- spec = _settings_pass(root, "specialize") # Shell-Setup component shell_comp = etree.SubElement(spec, qn("component")) shell_comp.set("name", "Microsoft-Windows-Shell-Setup") shell_comp.set("processorArchitecture", "amd64") shell_comp.set("publicKeyToken", "31bf3856ad364e35") shell_comp.set("language", "neutral") shell_comp.set("versionScope", "nonSxS") for tag, key in [ ("ComputerName", "computer_name"), ("RegisteredOrganization", "registered_organization"), ("RegisteredOwner", "registered_owner"), ("TimeZone", "time_zone"), ]: val = form_data.get(key, "").strip() if val: el = etree.SubElement(shell_comp, qn(tag)) el.text = val # Deployment / RunSynchronous commands spec_cmds = form_data.get("specialize_commands", []) if spec_cmds: deploy_comp = etree.SubElement(spec, qn("component")) deploy_comp.set("name", "Microsoft-Windows-Deployment") deploy_comp.set("processorArchitecture", "amd64") deploy_comp.set("publicKeyToken", "31bf3856ad364e35") deploy_comp.set("language", "neutral") deploy_comp.set("versionScope", "nonSxS") rs = etree.SubElement(deploy_comp, qn("RunSynchronous")) for idx, cmd in enumerate(spec_cmds, start=1): if not cmd.get("path", "").strip(): continue rsc = etree.SubElement(rs, qn("RunSynchronousCommand")) rsc.set(qwcm("action"), "add") order_el = etree.SubElement(rsc, qn("Order")) order_el.text = str(idx) path_el = etree.SubElement(rsc, qn("Path")) path_el.text = cmd["path"].strip() desc_el = etree.SubElement(rsc, qn("Description")) desc_el.text = cmd.get("description", "").strip() # --- oobeSystem --- oobe_settings = _settings_pass(root, "oobeSystem") oobe_comp = etree.SubElement(oobe_settings, qn("component")) oobe_comp.set("name", "Microsoft-Windows-Shell-Setup") oobe_comp.set("processorArchitecture", "amd64") oobe_comp.set("publicKeyToken", "31bf3856ad364e35") oobe_comp.set("language", "neutral") oobe_comp.set("versionScope", "nonSxS") oobe_el = etree.SubElement(oobe_comp, qn("OOBE")) oobe_data = form_data.get("oobe", {}) for key in [ "HideEULAPage", "HideOEMRegistrationScreen", "HideOnlineAccountScreens", "HideWirelessSetupInOOBE", "HideLocalAccountScreen", "NetworkLocation", "ProtectYourPC", "SkipUserOOBE", "SkipMachineOOBE", ]: val = oobe_data.get(key, "") if val: child = etree.SubElement(oobe_el, qn(key)) child.text = str(val) # FirstLogonCommands fl_cmds = form_data.get("firstlogon_commands", []) if fl_cmds: flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands")) for idx, cmd in enumerate(fl_cmds, start=1): if not cmd.get("commandline", "").strip(): continue sc = etree.SubElement(flc, qn("SynchronousCommand")) sc.set(qwcm("action"), "add") order_el = etree.SubElement(sc, qn("Order")) order_el.text = str(idx) cl_el = etree.SubElement(sc, qn("CommandLine")) cl_el.text = cmd["commandline"].strip() desc_el = etree.SubElement(sc, qn("Description")) desc_el.text = cmd.get("description", "").strip() xml_bytes = etree.tostring( root, pretty_print=True, xml_declaration=True, encoding="utf-8", ) return xml_bytes.decode("utf-8") def _extract_form_data(form): """Pull structured data from the submitted form.""" data = {} # Driver paths dp_list = form.getlist("driver_path[]") data["driver_paths"] = [p for p in dp_list if p.strip()] # Machine settings data["computer_name"] = form.get("computer_name", "") data["registered_organization"] = form.get("registered_organization", "") data["registered_owner"] = form.get("registered_owner", "") data["time_zone"] = form.get("time_zone", "") # Specialize commands spec_paths = form.getlist("spec_cmd_path[]") spec_descs = form.getlist("spec_cmd_desc[]") data["specialize_commands"] = [] for i in range(len(spec_paths)): if spec_paths[i].strip(): data["specialize_commands"].append({ "path": spec_paths[i], "description": spec_descs[i] if i < len(spec_descs) else "", }) # OOBE settings data["oobe"] = {} for key in [ "HideEULAPage", "HideOEMRegistrationScreen", "HideOnlineAccountScreens", "HideWirelessSetupInOOBE", "HideLocalAccountScreen", "SkipUserOOBE", "SkipMachineOOBE", ]: data["oobe"][key] = form.get(f"oobe_{key}", "false") data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work") data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3") # First logon commands fl_cls = form.getlist("fl_cmd_commandline[]") fl_descs = form.getlist("fl_cmd_desc[]") data["firstlogon_commands"] = [] for i in range(len(fl_cls)): if fl_cls[i].strip(): data["firstlogon_commands"].append({ "commandline": fl_cls[i], "description": fl_descs[i] if i < len(fl_descs) else "", }) return data # --------------------------------------------------------------------------- # Routes — pages # --------------------------------------------------------------------------- @app.route("/") def dashboard(): images = [image_status(it) for it in IMAGE_TYPES] services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")] return render_template( "dashboard.html", images=images, services=services, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/images/import", methods=["GET", "POST"]) def images_import(): usb_mounts = find_usb_mounts() images = [image_status(it) for it in IMAGE_TYPES] if request.method == "POST": source = request.form.get("source", "") target = request.form.get("target", "") if not source or not target: flash("Please select both a source and a target image type.", "danger") return redirect(url_for("images_import")) if target not in IMAGE_TYPES: flash("Invalid target image type.", "danger") return redirect(url_for("images_import")) if not os.path.isdir(source): flash(f"Source path does not exist: {source}", "danger") return redirect(url_for("images_import")) dest = deploy_path(target) try: os.makedirs(dest, exist_ok=True) # Use rsync-style copy: copy contents of source into dest for item in os.listdir(source): src_item = os.path.join(source, item) dst_item = os.path.join(dest, item) if os.path.isdir(src_item): if os.path.exists(dst_item): shutil.rmtree(dst_item) shutil.copytree(src_item, dst_item) else: shutil.copy2(src_item, dst_item) flash( f"Successfully imported content from {source} to {FRIENDLY_NAMES.get(target, target)}.", "success", ) except Exception as exc: flash(f"Import failed: {exc}", "danger") return redirect(url_for("images_import")) return render_template( "import.html", usb_mounts=usb_mounts, images=images, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/images//unattend", methods=["GET", "POST"]) def unattend_editor(image_type): if image_type not in IMAGE_TYPES: flash("Unknown image type.", "danger") return redirect(url_for("dashboard")) xml_file = unattend_path(image_type) if request.method == "POST": save_mode = request.form.get("save_mode", "form") if save_mode == "raw": raw_xml = request.form.get("raw_xml", "") # Validate the XML before saving try: etree.fromstring(raw_xml.encode("utf-8")) except etree.XMLSyntaxError as exc: flash(f"Invalid XML: {exc}", "danger") data = parse_unattend(xml_file) data["raw_xml"] = raw_xml return render_template( "unattend_editor.html", image_type=image_type, friendly_name=FRIENDLY_NAMES.get(image_type, image_type), data=data, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) xml_content = raw_xml else: form_data = _extract_form_data(request.form) xml_content = build_unattend_xml(form_data) # Write to disk try: os.makedirs(os.path.dirname(xml_file), exist_ok=True) with open(xml_file, "w", encoding="utf-8") as fh: fh.write(xml_content) flash("unattend.xml saved successfully.", "success") except Exception as exc: flash(f"Failed to save: {exc}", "danger") return redirect(url_for("unattend_editor", image_type=image_type)) data = parse_unattend(xml_file) return render_template( "unattend_editor.html", image_type=image_type, friendly_name=FRIENDLY_NAMES.get(image_type, image_type), data=data, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) # --------------------------------------------------------------------------- # Routes — Clonezilla Backups # --------------------------------------------------------------------------- @app.route("/backups") def clonezilla_backups(): backups = [] if os.path.isdir(CLONEZILLA_SHARE): for f in sorted(os.listdir(CLONEZILLA_SHARE)): fpath = os.path.join(CLONEZILLA_SHARE, f) if os.path.isfile(fpath) and f.lower().endswith(".zip"): stat = os.stat(fpath) backups.append({ "filename": f, "machine": os.path.splitext(f)[0], "size": stat.st_size, "modified": stat.st_mtime, }) return render_template( "backups.html", backups=backups, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/backups/upload", methods=["POST"]) def clonezilla_upload(): if "backup_file" not in request.files: flash("No file selected.", "danger") return redirect(url_for("clonezilla_backups")) f = request.files["backup_file"] if not f.filename: flash("No file selected.", "danger") return redirect(url_for("clonezilla_backups")) filename = secure_filename(f.filename) if not filename.lower().endswith(".zip"): flash("Only .zip files are accepted.", "danger") return redirect(url_for("clonezilla_backups")) os.makedirs(CLONEZILLA_SHARE, exist_ok=True) dest = os.path.join(CLONEZILLA_SHARE, filename) f.save(dest) flash(f"Uploaded {filename} successfully.", "success") return redirect(url_for("clonezilla_backups")) @app.route("/backups/download/") def clonezilla_download(filename): filename = secure_filename(filename) fpath = os.path.join(CLONEZILLA_SHARE, filename) if not os.path.isfile(fpath): flash(f"Backup not found: {filename}", "danger") return redirect(url_for("clonezilla_backups")) return send_file(fpath, as_attachment=True) @app.route("/backups/delete/", methods=["POST"]) def clonezilla_delete(filename): filename = secure_filename(filename) fpath = os.path.join(CLONEZILLA_SHARE, filename) if os.path.isfile(fpath): os.remove(fpath) flash(f"Deleted {filename}.", "success") else: flash(f"Backup not found: {filename}", "danger") return redirect(url_for("clonezilla_backups")) # --------------------------------------------------------------------------- # Routes — Blancco Reports # --------------------------------------------------------------------------- @app.route("/reports") def blancco_reports(): reports = [] if os.path.isdir(BLANCCO_REPORTS): for f in sorted(os.listdir(BLANCCO_REPORTS), reverse=True): fpath = os.path.join(BLANCCO_REPORTS, f) if os.path.isfile(fpath): stat = os.stat(fpath) ext = os.path.splitext(f)[1].lower() reports.append({ "filename": f, "size": stat.st_size, "modified": stat.st_mtime, "type": ext.lstrip(".").upper() or "FILE", }) return render_template( "reports.html", reports=reports, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/reports/download/") def blancco_download_report(filename): filename = secure_filename(filename) fpath = os.path.join(BLANCCO_REPORTS, filename) if not os.path.isfile(fpath): flash(f"Report not found: {filename}", "danger") return redirect(url_for("blancco_reports")) return send_file(fpath, as_attachment=True) @app.route("/reports/delete/", methods=["POST"]) def blancco_delete_report(filename): filename = secure_filename(filename) fpath = os.path.join(BLANCCO_REPORTS, filename) if os.path.isfile(fpath): os.remove(fpath) flash(f"Deleted {filename}.", "success") else: flash(f"Report not found: {filename}", "danger") return redirect(url_for("blancco_reports")) # --------------------------------------------------------------------------- # Routes — startnet.cmd Editor (WIM) # --------------------------------------------------------------------------- def _wim_extract_startnet(wim_path): """Extract startnet.cmd from a WIM file using wimextract.""" import tempfile tmpdir = tempfile.mkdtemp() try: result = subprocess.run( ["wimextract", wim_path, "1", "/Windows/System32/startnet.cmd", "--dest-dir", tmpdir], capture_output=True, text=True, timeout=30, ) startnet_path = os.path.join(tmpdir, "startnet.cmd") if result.returncode == 0 and os.path.isfile(startnet_path): content = open(startnet_path, "r", encoding="utf-8", errors="replace").read() return content return None except Exception: return None finally: shutil.rmtree(tmpdir, ignore_errors=True) def _wim_update_startnet(wim_path, content): """Update startnet.cmd inside a WIM file using wimupdate.""" import tempfile tmpdir = tempfile.mkdtemp() try: startnet_path = os.path.join(tmpdir, "startnet.cmd") with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh: fh.write(content) # wimupdate reads commands from stdin update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n" result = subprocess.run( ["wimupdate", wim_path, "1"], input=update_cmd, capture_output=True, text=True, timeout=60, ) if result.returncode != 0: return False, result.stderr.strip() return True, "" except Exception as exc: return False, str(exc) finally: shutil.rmtree(tmpdir, ignore_errors=True) def _wim_list_files(wim_path, path="/"): """List files inside a WIM at the given path.""" try: result = subprocess.run( ["wimdir", wim_path, "1", path], capture_output=True, text=True, timeout=30, ) if result.returncode == 0: return [l.strip() for l in result.stdout.splitlines() if l.strip()] return [] except Exception: return [] @app.route("/startnet") def startnet_editor(): wim_exists = os.path.isfile(BOOT_WIM) content = "" wim_info = {} if wim_exists: content = _wim_extract_startnet(BOOT_WIM) or "" # Get WIM info try: result = subprocess.run( ["wiminfo", BOOT_WIM], capture_output=True, text=True, timeout=15, ) if result.returncode == 0: for line in result.stdout.splitlines(): if ":" in line: key, _, val = line.partition(":") wim_info[key.strip()] = val.strip() except Exception: pass return render_template( "startnet_editor.html", wim_exists=wim_exists, wim_path=BOOT_WIM, content=content, wim_info=wim_info, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/startnet/save", methods=["POST"]) def startnet_save(): if not os.path.isfile(BOOT_WIM): flash("boot.wim not found.", "danger") return redirect(url_for("startnet_editor")) content = request.form.get("content", "") ok, err = _wim_update_startnet(BOOT_WIM, content) if ok: flash("startnet.cmd updated successfully in boot.wim.", "success") else: flash(f"Failed to update boot.wim: {err}", "danger") return redirect(url_for("startnet_editor")) # --------------------------------------------------------------------------- # Routes — API # --------------------------------------------------------------------------- @app.route("/api/services") def api_services(): services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")} return jsonify(services) @app.route("/api/images") def api_images(): images = [image_status(it) for it in IMAGE_TYPES] return jsonify(images) @app.route("/api/images//unattend", methods=["POST"]) def api_save_unattend(image_type): if image_type not in IMAGE_TYPES: return jsonify({"error": "Unknown image type"}), 404 xml_file = unattend_path(image_type) payload = request.get_json(silent=True) if not payload: return jsonify({"error": "No JSON body provided"}), 400 if "raw_xml" in payload: raw_xml = payload["raw_xml"] try: etree.fromstring(raw_xml.encode("utf-8")) except etree.XMLSyntaxError as exc: return jsonify({"error": f"Invalid XML: {exc}"}), 400 xml_content = raw_xml else: try: xml_content = build_unattend_xml(payload) except Exception as exc: return jsonify({"error": f"Failed to build XML: {exc}"}), 400 try: os.makedirs(os.path.dirname(xml_file), exist_ok=True) with open(xml_file, "w", encoding="utf-8") as fh: fh.write(xml_content) except Exception as exc: return jsonify({"error": f"Failed to write file: {exc}"}), 500 return jsonify({"status": "ok", "path": xml_file}) # --------------------------------------------------------------------------- # Template filters # --------------------------------------------------------------------------- @app.template_filter("timestamp_fmt") def timestamp_fmt(ts): """Format a Unix timestamp to a human-readable date string.""" from datetime import datetime return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") # --------------------------------------------------------------------------- # Context processor — make data available to all templates # --------------------------------------------------------------------------- @app.context_processor def inject_globals(): return { "all_image_types": IMAGE_TYPES, "all_friendly_names": FRIENDLY_NAMES, } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)