#!/usr/bin/env python3 """Flask web application for managing a GE Aerospace PXE server. This file is the route surface; most logic lives in ``services/``: services.audit - audit log writer services.csrf - session CSRF token + before_request validator services.fs - path helpers + JSON load/save services.system - systemd service status + USB mounts services.images - image_status + load_image_config services.deploy - import_deploy + merge_tree + symlink dance services.unattend - parse + build + form-extract for unattend.xml services.wim - boot.wim startnet.cmd extract/update via wimtools """ import hashlib import json import os import shutil import tempfile import time from datetime import datetime from pathlib import Path from flask import ( Flask, Response, abort, flash, jsonify, redirect, render_template, request, send_file, stream_with_context, url_for, ) from lxml import etree from werkzeug.utils import secure_filename import config from services import ( blancco_report, deploy, fs, image_registry, images, imaging_status, system, unattend, wim, ) from services.audit import audit from services.csrf import init_csrf app = Flask(__name__) app.secret_key = config.FLASK_SECRET_KEY app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH init_csrf(app) # Pull IMAGE_TYPES/FRIENDLY_NAMES from the registry file (created from the # baked-in config.py defaults on first run). Mutates config.* in place. image_registry.reload() # --------------------------------------------------------------------------- # Routes - pages # --------------------------------------------------------------------------- @app.route("/") def dashboard(): image_list = [images.image_status(it) for it in config.IMAGE_TYPES] services = [system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")] return render_template( "dashboard.html", images=image_list, services=services, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) @app.route("/images/import", methods=["GET", "POST"]) def images_import(): usb_mounts = system.find_usb_mounts() upload_sources = system.find_upload_sources() image_list = [images.image_status(it) for it in config.IMAGE_TYPES] if request.method == "POST": source = request.form.get("source", "") target = request.form.get("target", "") if not source or not target: flash("Please select both a source and a target image type.", "danger") return redirect(url_for("images_import")) if target not in config.IMAGE_TYPES: flash("Invalid target image type.", "danger") return redirect(url_for("images_import")) if not deploy.allowed_import_source(source): flash("Source path is not a valid import location.", "danger") return redirect(url_for("images_import")) if not os.path.isdir(source): flash(f"Source path does not exist: {source}", "danger") return redirect(url_for("images_import")) root = fs.image_root(target) dest = fs.deploy_path(target) try: os.makedirs(dest, exist_ok=True) src_items = os.listdir(source) # Move files from network upload to save disk space; copy from USB. use_move = source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/") _transfer = shutil.move if use_move else shutil.copy2 _transfer_tree = shutil.move if use_move else shutil.copytree top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))} full_layout = "Deploy" in top_dirs if full_layout: shared_root = [] for prefix, dirs in config.SHARED_ROOT_DIRS.items(): if target.startswith(prefix): shared_root = dirs break for item in src_items: src_item = os.path.join(source, item) if item == "Deploy": deploy.import_deploy(src_item, dest, target, move=use_move) elif os.path.isdir(src_item) and item in shared_root: prefix_key = target.split("-")[0] + "-" shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}") os.makedirs(shared_dest, exist_ok=True) deploy._merge_tree(src_item, shared_dest, move=use_move) dst_item = os.path.join(root, item) if os.path.islink(dst_item): os.remove(dst_item) elif os.path.isdir(dst_item): shutil.rmtree(dst_item) os.symlink(shared_dest, dst_item) elif os.path.isdir(src_item): dst_item = os.path.join(root, item) if os.path.exists(dst_item): shutil.rmtree(dst_item) _transfer_tree(src_item, dst_item) else: _transfer(src_item, os.path.join(root, item)) else: deploy.import_deploy(source, dest, target, move=use_move) # Ensure Media.tag exists (FlatSetupLoader.exe drive detection). control_dir = os.path.join(dest, "Control") os.makedirs(control_dir, exist_ok=True) media_tag = os.path.join(control_dir, "Media.tag") Path(media_tag).touch() audit("IMAGE_IMPORT", f"{source} -> {target}") flash( f"Successfully imported content to {config.FRIENDLY_NAMES.get(target, target)}.", "success", ) except Exception as exc: flash(f"Import failed: {exc}", "danger") return redirect(url_for("images_import")) return render_template( "import.html", usb_mounts=usb_mounts, upload_sources=upload_sources, images=image_list, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) @app.route("/images//unattend", methods=["GET", "POST"]) def unattend_editor(image_type): if image_type not in config.IMAGE_TYPES: flash("Unknown image type.", "danger") return redirect(url_for("dashboard")) xml_file = fs.unattend_path(image_type) if request.method == "POST": save_mode = request.form.get("save_mode", "form") if save_mode == "raw": raw_xml = request.form.get("raw_xml", "") try: etree.fromstring(raw_xml.encode("utf-8")) except etree.XMLSyntaxError as exc: flash(f"Invalid XML: {exc}", "danger") data = unattend.parse_unattend(xml_file) data["raw_xml"] = raw_xml return render_template( "unattend_editor.html", image_type=image_type, friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type), data=data, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) xml_content = raw_xml else: form_data = unattend.extract_form_data(request.form) xml_content = unattend.build_unattend_xml(form_data) try: os.makedirs(os.path.dirname(xml_file), exist_ok=True) with open(xml_file, "w", encoding="utf-8") as fh: fh.write(xml_content) audit("UNATTEND_SAVE", f"{image_type} ({save_mode})") flash("unattend.xml saved successfully.", "success") except Exception as exc: flash(f"Failed to save: {exc}", "danger") return redirect(url_for("unattend_editor", image_type=image_type)) data = unattend.parse_unattend(xml_file) return render_template( "unattend_editor.html", image_type=image_type, friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type), data=data, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) @app.route("/images//config") def image_config(image_type): if image_type not in config.IMAGE_TYPES: flash("Unknown image type.", "danger") return redirect(url_for("dashboard")) cfg = images.load_image_config(image_type) return render_template( "image_config.html", image_type=image_type, friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type), config=cfg, ) @app.route("/images//config/save", methods=["POST"]) def image_config_save(image_type): if image_type not in config.IMAGE_TYPES: flash("Unknown image type.", "danger") return redirect(url_for("dashboard")) section = request.form.get("section", "") payload = request.form.get("payload", "[]") try: data = json.loads(payload) except json.JSONDecodeError: flash("Invalid JSON payload.", "danger") return redirect(url_for("image_config", image_type=image_type)) ctrl = fs.control_path(image_type) tools = fs.tools_path(image_type) try: if section == "hardware_models": us_file = os.path.join(tools, "user_selections.json") us_raw = fs.load_json(us_file) us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} us_data["HardwareModelSelection"] = data fs.save_json(us_file, [us_data]) audit("CONFIG_SAVE", f"{image_type}/hardware_models") elif section == "drivers": clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] fs.save_json(os.path.join(ctrl, "HardwareDriver.json"), clean) audit("CONFIG_SAVE", f"{image_type}/drivers") elif section == "operating_systems": clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] fs.save_json(os.path.join(ctrl, "OperatingSystem.json"), clean) audit("CONFIG_SAVE", f"{image_type}/operating_systems") elif section == "packages": clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] fs.save_json(os.path.join(ctrl, "packages.json"), clean) audit("CONFIG_SAVE", f"{image_type}/packages") else: flash(f"Unknown section: {section}", "danger") return redirect(url_for("image_config", image_type=image_type)) flash(f"Saved {section.replace('_', ' ')} successfully.", "success") except Exception as exc: flash(f"Failed to save {section}: {exc}", "danger") return redirect(url_for("image_config", image_type=image_type)) # --------------------------------------------------------------------------- # Routes - Driver/package upload + orphan adoption # --------------------------------------------------------------------------- @app.route("/images//drivers/upload", methods=["POST"]) def images_drivers_upload(image_type): if image_type not in config.IMAGE_TYPES: flash(f"Unknown image type: {image_type}", "danger") return redirect(url_for("dashboard")) f = request.files.get("driver_file") if not f or not f.filename: flash("No file selected.", "danger") return redirect(url_for("image_config", image_type=image_type)) family = (request.form.get("family") or "").strip() dest = (request.form.get("destination_dir") or "").strip() overwrite = request.form.get("overwrite") == "1" try: rec = images.upload_driver(image_type, f, family=family, destination_dir=dest, overwrite=overwrite) audit("DRIVER_UPLOAD", f"{image_type}/{rec['filename']} registered={rec['registered']}") msg = f"Uploaded {rec['filename']}." msg += " Registered to family in HardwareDriver.json." if rec["registered"] else " Now in orphans list until adopted." flash(msg, "success") except (ValueError, FileExistsError, FileNotFoundError) as ex: flash(str(ex), "danger") except Exception as ex: flash(f"Upload failed: {ex}", "danger") return redirect(url_for("image_config", image_type=image_type)) @app.route("/images//drivers/adopt", methods=["POST"]) def images_drivers_adopt(image_type): if image_type not in config.IMAGE_TYPES: flash(f"Unknown image type: {image_type}", "danger") return redirect(url_for("dashboard")) filename = (request.form.get("filename") or "").strip() family = (request.form.get("family") or "").strip() dest = (request.form.get("destination_dir") or "").strip() try: rec = images.adopt_orphan(image_type, filename, family, dest) audit("DRIVER_ADOPT", f"{image_type}/{rec['filename']} family={family}") if rec.get("already_registered"): flash(f"{rec['filename']} was already registered.", "info") else: flash(f"Adopted {rec['filename']} into HardwareDriver.json.", "success") except (ValueError, FileNotFoundError) as ex: flash(str(ex), "danger") except Exception as ex: flash(f"Adopt failed: {ex}", "danger") return redirect(url_for("image_config", image_type=image_type)) @app.route("/images//drivers/orphans/delete", methods=["POST"]) def images_drivers_orphans_delete(image_type): if image_type not in config.IMAGE_TYPES: flash(f"Unknown image type: {image_type}", "danger") return redirect(url_for("dashboard")) # Filenames come as repeated form fields (one per checkbox). filenames = request.form.getlist("filename") if not filenames: flash("No files selected for removal.", "warning") return redirect(url_for("image_config", image_type=image_type)) rec = images.remove_orphans(image_type, filenames) audit("DRIVER_ORPHAN_REMOVE", f"{image_type} removed={len(rec['removed'])} missing={len(rec['missing'])}") parts = [] if rec["removed"]: parts.append(f"Removed {len(rec['removed'])} orphan(s).") if rec["missing"]: parts.append(f"{len(rec['missing'])} not found / errored.") flash(" ".join(parts) or "No-op.", "success" if rec["removed"] else "warning") return redirect(url_for("image_config", image_type=image_type)) @app.route("/images//packages/upload", methods=["POST"]) def images_packages_upload(image_type): if image_type not in config.IMAGE_TYPES: flash(f"Unknown image type: {image_type}", "danger") return redirect(url_for("dashboard")) f = request.files.get("package_file") if not f or not f.filename: flash("No file selected.", "danger") return redirect(url_for("image_config", image_type=image_type)) dest = (request.form.get("destination_dir") or "").strip() overwrite = request.form.get("overwrite") == "1" try: rec = images.upload_package(image_type, f, destination_dir=dest, overwrite=overwrite) audit("PACKAGE_UPLOAD", f"{image_type}/{rec['filename']} registered={rec['registered']}") msg = f"Uploaded {rec['filename']}." msg += " Registered in packages.json." if rec["registered"] else " Not registered (no destination_dir provided)." flash(msg, "success") except (ValueError, FileExistsError) as ex: flash(str(ex), "danger") except Exception as ex: flash(f"Upload failed: {ex}", "danger") return redirect(url_for("image_config", image_type=image_type)) # --------------------------------------------------------------------------- # Routes - Image type CRUD (registry-backed) # --------------------------------------------------------------------------- @app.route("/images/new", methods=["POST"]) def images_new(): key = (request.form.get("key") or "").strip().lower() friendly_name = (request.form.get("friendly_name") or "").strip() try: rec = image_registry.create(key, friendly_name) audit("IMAGE_REGISTRY_CREATE", f"{rec['key']} ({rec['friendly_name']})") flash(f"Created image type {rec['key']}.", "success") except image_registry.RegistryError as ex: flash(str(ex), "danger") return redirect(url_for("dashboard")) @app.route("/images//clone", methods=["POST"]) def images_clone(image_type): if image_type not in config.IMAGE_TYPES: flash(f"Unknown source image type: {image_type}", "danger") return redirect(url_for("dashboard")) dst_key = (request.form.get("dst_key") or "").strip().lower() friendly_name = (request.form.get("friendly_name") or "").strip() or None try: rec = image_registry.clone(image_type, dst_key, friendly_name) audit("IMAGE_REGISTRY_CLONE", f"{image_type} -> {rec['key']}") flash(f"Cloned {image_type} -> {rec['key']}.", "success") except image_registry.RegistryError as ex: flash(str(ex), "danger") except Exception as ex: flash(f"Clone failed: {ex}", "danger") return redirect(url_for("dashboard")) @app.route("/images//delete", methods=["POST"]) def images_delete(image_type): if image_type not in config.IMAGE_TYPES: flash(f"Unknown image type: {image_type}", "danger") return redirect(url_for("dashboard")) delete_content = request.form.get("delete_content") == "1" try: rec = image_registry.delete(image_type, delete_content=delete_content) audit("IMAGE_REGISTRY_DELETE", f"{rec['key']} content={rec['removed_content']}") msg = f"Removed image type {rec['key']} from registry." if rec["removed_content"]: msg += " On-disk content wiped." flash(msg, "success") except image_registry.RegistryError as ex: flash(str(ex), "danger") return redirect(url_for("dashboard")) @app.route("/images//rename", methods=["POST"]) def images_rename(image_type): if image_type not in config.IMAGE_TYPES: flash(f"Unknown image type: {image_type}", "danger") return redirect(url_for("dashboard")) friendly_name = (request.form.get("friendly_name") or "").strip() try: rec = image_registry.rename_friendly(image_type, friendly_name) audit("IMAGE_REGISTRY_RENAME", f"{rec['key']} -> {rec['friendly_name']}") flash(f"Renamed {rec['key']} to '{rec['friendly_name']}'.", "success") except image_registry.RegistryError as ex: flash(str(ex), "danger") return redirect(url_for("dashboard")) # --------------------------------------------------------------------------- # Routes - Clonezilla Backups # --------------------------------------------------------------------------- @app.route("/backups") def clonezilla_backups(): backups = [] if os.path.isdir(config.CLONEZILLA_SHARE): for f in sorted(os.listdir(config.CLONEZILLA_SHARE)): fpath = os.path.join(config.CLONEZILLA_SHARE, f) if os.path.isfile(fpath) and f.lower().endswith(".zip"): stat = os.stat(fpath) backups.append({ "filename": f, "machine": os.path.splitext(f)[0], "size": stat.st_size, "modified": stat.st_mtime, }) return render_template( "backups.html", backups=backups, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) @app.route("/backups/upload", methods=["POST"]) def clonezilla_upload(): if "backup_file" not in request.files: flash("No file selected.", "danger") return redirect(url_for("clonezilla_backups")) f = request.files["backup_file"] if not f.filename: flash("No file selected.", "danger") return redirect(url_for("clonezilla_backups")) filename = secure_filename(f.filename) if not filename.lower().endswith(".zip"): flash("Only .zip files are accepted.", "danger") return redirect(url_for("clonezilla_backups")) os.makedirs(config.CLONEZILLA_SHARE, exist_ok=True) dest = os.path.join(config.CLONEZILLA_SHARE, filename) f.save(dest) audit("BACKUP_UPLOAD", filename) flash(f"Uploaded {filename} successfully.", "success") return redirect(url_for("clonezilla_backups")) @app.route("/backups/download/") def clonezilla_download(filename): filename = secure_filename(filename) fpath = os.path.join(config.CLONEZILLA_SHARE, filename) if not os.path.isfile(fpath): flash(f"Backup not found: {filename}", "danger") return redirect(url_for("clonezilla_backups")) return send_file(fpath, as_attachment=True) @app.route("/backups/delete/", methods=["POST"]) def clonezilla_delete(filename): filename = secure_filename(filename) fpath = os.path.join(config.CLONEZILLA_SHARE, filename) if os.path.isfile(fpath): os.remove(fpath) audit("BACKUP_DELETE", filename) flash(f"Deleted {filename}.", "success") else: flash(f"Backup not found: {filename}", "danger") return redirect(url_for("clonezilla_backups")) # --------------------------------------------------------------------------- # Routes - Blancco Reports # --------------------------------------------------------------------------- @app.route("/reports") def blancco_reports(): reports = [] if os.path.isdir(config.BLANCCO_REPORTS): for f in sorted(os.listdir(config.BLANCCO_REPORTS), reverse=True): fpath = os.path.join(config.BLANCCO_REPORTS, f) if not os.path.isfile(fpath): continue stat = os.stat(fpath) ext = os.path.splitext(f)[1].lower() # Surface BIOS serial + system model so operators can find a # report for a specific bay without opening each one. Parse # cost is one XML walk per .xml report (~50KB-3MB each; # negligible at fleet sizes). serial = "" model = "" state = "" if ext == ".xml": try: data = blancco_report.parse(fpath) sysinfo = (data.get("hardware") or {}).get("system") or {} serial = sysinfo.get("serial", "") or "" model = sysinfo.get("model", "") or "" # Overall erasure result: each erasure entry has its own # 'state' (Successful / Failed / ...). If any drive failed # the report rolls up to Failed; otherwise Successful. erasures = data.get("erasures") or [] states = [(e.get("state") or "").strip() for e in erasures if e.get("state")] if not states: state = "" elif any(s.lower() != "successful" for s in states): state = "Failed" else: state = "Successful" except Exception: pass reports.append({ "filename": f, "size": stat.st_size, "modified": stat.st_mtime, "type": ext.lstrip(".").upper() or "FILE", "serial": serial, "model": model, "state": state, }) return render_template( "reports.html", reports=reports, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) @app.route("/reports/download/") def blancco_download_report(filename): filename = secure_filename(filename) fpath = os.path.join(config.BLANCCO_REPORTS, filename) if not os.path.isfile(fpath): flash(f"Report not found: {filename}", "danger") return redirect(url_for("blancco_reports")) return send_file(fpath, as_attachment=True) @app.route("/reports/view/") def blancco_view_report(filename): filename = secure_filename(filename) fpath = os.path.join(config.BLANCCO_REPORTS, filename) if not os.path.isfile(fpath): flash(f"Report not found: {filename}", "danger") return redirect(url_for("blancco_reports")) if not filename.lower().endswith(".xml"): flash("Formatted view supports XML reports only.", "warning") return redirect(url_for("blancco_reports")) try: data = blancco_report.parse(fpath) except Exception as ex: flash(f"Failed to parse {filename}: {ex}", "danger") return redirect(url_for("blancco_reports")) return render_template("report_view.html", filename=filename, data=data) @app.route("/reports/delete/", methods=["POST"]) def blancco_delete_report(filename): filename = secure_filename(filename) fpath = os.path.join(config.BLANCCO_REPORTS, filename) if os.path.isfile(fpath): os.remove(fpath) audit("REPORT_DELETE", filename) flash(f"Deleted {filename}.", "success") else: flash(f"Report not found: {filename}", "danger") return redirect(url_for("blancco_reports")) # --------------------------------------------------------------------------- # Routes - Imaging Progress # --------------------------------------------------------------------------- @app.route("/imaging") def imaging_dashboard(): sessions = imaging_status.list_sessions() return render_template("imaging.html", sessions=sessions) @app.route("/imaging/tiles") def imaging_tiles_partial(): """HTML fragment of the per-bay tile loop only, used by the dashboard's SSE/polling refresh to swap #imaging-tiles innerHTML without a full page reload.""" sessions = imaging_status.list_sessions() return render_template("_imaging_tiles.html", sessions=sessions) def _sessions_hash() -> str: """Compact fingerprint of the current session list. Used by the SSE stream to detect changes without sending the full payload. Hashing (serial-or-key, status, stage_index, current_stage, last_updated) covers every field the dashboard renders prominently.""" sessions = imaging_status.list_sessions() h = hashlib.sha256() for s in sessions: key = s.get("serial") or f"{s.get('mac','')}-{s.get('ip','')}" h.update(repr(( key, s.get("source", "client"), s.get("status", ""), s.get("stage_index", 0), s.get("current_stage", ""), s.get("last_updated", ""), s.get("laps_password", "") and "1" or "0", )).encode()) return h.hexdigest()[:16] @app.route("/imaging/stream") def imaging_stream(): """Server-Sent Events stream of session-list change pings. Emits one JSON event every SSE_PING_INTERVAL seconds. When the hash changes from the previously sent value, the client fetches /imaging/tiles and re-renders. A keepalive heartbeat is sent on the same cadence so intermediate proxies don't close the connection. Single-threaded dev server can only serve one SSE client at a time. The live PXE box runs gunicorn with multiple workers (see playbook) so this is fine in production. """ SSE_PING_INTERVAL = 5 # seconds between hash checks SSE_MAX_DURATION = 600 # cap connection length so the worker recycles @stream_with_context def gen(): start = time.time() last = None while time.time() - start < SSE_MAX_DURATION: try: cur = _sessions_hash() except Exception as ex: yield f": error {ex}\n\n" cur = last payload = json.dumps({"hash": cur, "ts": int(time.time())}) yield f"data: {payload}\n\n" last = cur time.sleep(SSE_PING_INTERVAL) headers = { "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # disable nginx/apache proxy buffering "Connection": "keep-alive", } return Response(gen(), mimetype="text/event-stream", headers=headers) @app.route("/imaging/status", methods=["POST"]) def imaging_status_post(): # CSRF-exempt machine-to-machine endpoint; see services/csrf.py exempt list. payload = request.get_json(silent=True) or {} if not payload.get("serial"): return jsonify({"error": "missing serial"}), 400 try: state = imaging_status.update_session(payload) except Exception as ex: audit("IMAGING_STATUS_ERROR", str(ex)) return jsonify({"error": str(ex)}), 500 return jsonify({"ok": True, "serial": state["serial"]}), 200 @app.route("/imaging/.json") def imaging_session_json(serial): serial = secure_filename(serial) s = imaging_status.get_session(serial) if not s: return jsonify({"error": "not found"}), 404 return jsonify(s) @app.route("/imaging/session/") def imaging_session_detail(serial): """Per-bay forensics page: stage timeline, full sidecar log, all session metadata. Linked from the dashboard tile. Returns 404 if no session JSON exists for the serial.""" serial = secure_filename(serial) s = imaging_status.get_session(serial) if not s: flash(f"No session for serial {serial}.", "danger") return redirect(url_for("imaging_dashboard")) full_log, truncated = imaging_status.read_full_log(serial) return render_template( "imaging_detail.html", session=s, full_log=full_log, full_log_truncated=truncated, ) @app.route("/imaging/delete/", methods=["POST"]) def imaging_delete_session(serial): serial = secure_filename(serial) if imaging_status.delete_session(serial): audit("IMAGING_DELETE", serial) flash(f"Cleared imaging session {serial}.", "success") else: flash(f"Session not found: {serial}", "danger") return redirect(url_for("imaging_dashboard")) @app.route("/imaging/delete_all", methods=["POST"]) def imaging_delete_all(): n = imaging_status.delete_all_sessions() audit("IMAGING_DELETE_ALL", str(n)) flash(f"Cleared {n} imaging session(s).", "success") return redirect(url_for("imaging_dashboard")) @app.route("/imaging//laps", methods=["POST"]) def imaging_set_laps(serial): """Save (or clear with empty value) the LAPS password for a bay so it survives the dashboard's 5s auto-refresh. JSON body: {"password": "..."}. Empty string removes the field. Daily reset wipes natural risk.""" serial = secure_filename(serial) body = request.get_json(silent=True) or {} pw = body.get("password", "") if not isinstance(pw, str): return {"ok": False, "error": "password must be string"}, 400 if pw == "": # Clear by direct file write. update_session() merges payload INTO # existing state and skips empty values, so it cannot remove a key. # Pop the laps_password key directly from the session JSON and # write the result atomically. path = imaging_status._path_for(serial) if os.path.isfile(path): try: with open(path, "r") as f: state = json.load(f) except (json.JSONDecodeError, OSError): state = {} if "laps_password" in state: state.pop("laps_password", None) state["last_updated"] = imaging_status._now_iso() fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json") try: with os.fdopen(fd, "w") as f: json.dump(state, f, indent=2) os.replace(tmp, path) except Exception: try: os.unlink(tmp) except OSError: pass raise return {"ok": True, "cleared": True} imaging_status.update_session({"serial": serial, "laps_password": pw}) return {"ok": True} # --------------------------------------------------------------------------- # Routes - Enrollment Packages # --------------------------------------------------------------------------- @app.route("/enrollment") def enrollment(): packages = [] if os.path.isdir(config.ENROLLMENT_PPKG_DIR): for f in sorted(os.listdir(config.ENROLLMENT_PPKG_DIR)): fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, f) if os.path.isfile(fpath) and f.lower().endswith(".ppkg"): stat = os.stat(fpath) packages.append({ "filename": f, "size": stat.st_size, "modified": stat.st_mtime, }) return render_template( "enrollment.html", packages=packages, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) @app.route("/enrollment/upload", methods=["POST"]) def enrollment_upload(): if "ppkg_file" not in request.files: flash("No file selected.", "danger") return redirect(url_for("enrollment")) f = request.files["ppkg_file"] if not f.filename: flash("No file selected.", "danger") return redirect(url_for("enrollment")) filename = secure_filename(f.filename) if not filename.lower().endswith(".ppkg"): flash("Only .ppkg files are accepted.", "danger") return redirect(url_for("enrollment")) os.makedirs(config.ENROLLMENT_PPKG_DIR, exist_ok=True) dest = os.path.join(config.ENROLLMENT_PPKG_DIR, filename) f.save(dest) audit("ENROLLMENT_UPLOAD", filename) flash(f"Uploaded {filename} successfully.", "success") return redirect(url_for("enrollment")) @app.route("/enrollment/download/") def enrollment_download(filename): filename = secure_filename(filename) fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, filename) if not os.path.isfile(fpath): flash(f"Package not found: {filename}", "danger") return redirect(url_for("enrollment")) return send_file(fpath, as_attachment=True) @app.route("/enrollment/delete/", methods=["POST"]) def enrollment_delete(filename): filename = secure_filename(filename) fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, filename) if os.path.isfile(fpath): os.remove(fpath) audit("ENROLLMENT_DELETE", filename) flash(f"Deleted {filename}.", "success") else: flash(f"Package not found: {filename}", "danger") return redirect(url_for("enrollment")) # --------------------------------------------------------------------------- # Routes - startnet.cmd Editor (boot.wim) # --------------------------------------------------------------------------- @app.route("/startnet") def startnet_editor(): import subprocess wim_exists = os.path.isfile(config.BOOT_WIM) content = "" wim_info = {} if wim_exists: content = wim.extract_startnet(config.BOOT_WIM) or "" try: result = subprocess.run( ["wiminfo", config.BOOT_WIM], capture_output=True, text=True, timeout=15, ) if result.returncode == 0: for line in result.stdout.splitlines(): if ":" in line: key, _, val = line.partition(":") wim_info[key.strip()] = val.strip() except Exception: pass return render_template( "startnet_editor.html", wim_exists=wim_exists, wim_path=config.BOOT_WIM, content=content, wim_info=wim_info, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) @app.route("/startnet/save", methods=["POST"]) def startnet_save(): if not os.path.isfile(config.BOOT_WIM): flash("boot.wim not found.", "danger") return redirect(url_for("startnet_editor")) content = request.form.get("content", "") ok, err = wim.update_startnet(config.BOOT_WIM, content) if ok: audit("STARTNET_SAVE", "boot.wim updated") flash("startnet.cmd updated successfully in boot.wim.", "success") else: flash(f"Failed to update boot.wim: {err}", "danger") return redirect(url_for("startnet_editor")) # --------------------------------------------------------------------------- # Routes - Audit Log # --------------------------------------------------------------------------- @app.route("/audit") def audit_log(): entries = [] if os.path.isfile(config.AUDIT_LOG): with open(config.AUDIT_LOG, "r") as fh: for line in fh: entries.append(line.strip()) entries.reverse() return render_template( "audit.html", entries=entries, image_types=config.IMAGE_TYPES, friendly_names=config.FRIENDLY_NAMES, ) # --------------------------------------------------------------------------- # Routes - JSON API # --------------------------------------------------------------------------- @app.route("/api/services") def api_services(): services = {s: system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")} return jsonify(services) @app.route("/api/images") def api_images(): image_list = [images.image_status(it) for it in config.IMAGE_TYPES] return jsonify(image_list) @app.route("/api/images//unattend", methods=["POST"]) def api_save_unattend(image_type): if image_type not in config.IMAGE_TYPES: return jsonify({"error": "Unknown image type"}), 404 xml_file = fs.unattend_path(image_type) payload = request.get_json(silent=True) if not payload: return jsonify({"error": "No JSON body provided"}), 400 if "raw_xml" in payload: raw_xml = payload["raw_xml"] try: etree.fromstring(raw_xml.encode("utf-8")) except etree.XMLSyntaxError as exc: return jsonify({"error": f"Invalid XML: {exc}"}), 400 xml_content = raw_xml else: try: xml_content = unattend.build_unattend_xml(payload) except Exception as exc: return jsonify({"error": f"Failed to build XML: {exc}"}), 400 try: os.makedirs(os.path.dirname(xml_file), exist_ok=True) with open(xml_file, "w", encoding="utf-8") as fh: fh.write(xml_content) except Exception as exc: return jsonify({"error": f"Failed to write file: {exc}"}), 500 audit("UNATTEND_SAVE_API", image_type) return jsonify({"status": "ok", "path": xml_file}) # --------------------------------------------------------------------------- # Template helpers # --------------------------------------------------------------------------- @app.template_filter("timestamp_fmt") def timestamp_fmt(ts): """Format a Unix timestamp to a human-readable date string.""" return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") @app.context_processor def inject_globals(): return { "all_image_types": config.IMAGE_TYPES, "all_friendly_names": config.FRIENDLY_NAMES, } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": app.run(host="127.0.0.1", port=9010, debug=False, threaded=True)