diff --git a/webapp/app.py b/webapp/app.py
index 8eafa78..d9f7ea4 100644
--- a/webapp/app.py
+++ b/webapp/app.py
@@ -1,1621 +1,625 @@
-#!/usr/bin/env python3
-"""Flask web application for managing a GE Aerospace PXE server."""
-
-import json
-import logging
-import os
-import secrets
-import shutil
-import subprocess
-import tempfile
-from datetime import datetime
-from pathlib import Path
-
-from flask import (
- Flask,
- abort,
- flash,
- jsonify,
- redirect,
- render_template,
- request,
- send_file,
- session,
- url_for,
-)
-from lxml import etree
-from werkzeug.utils import secure_filename
-
-app = Flask(__name__)
-app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
-app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 * 1024 # 16 GB max upload
-
-# ---------------------------------------------------------------------------
-# Audit logging
-# ---------------------------------------------------------------------------
-AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log")
-audit_logger = logging.getLogger("pxe_audit")
-audit_logger.setLevel(logging.INFO)
-_audit_handler = logging.FileHandler(AUDIT_LOG, mode="a")
-_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
-audit_logger.addHandler(_audit_handler)
-
-
-def audit(action, detail=""):
- """Write an entry to the audit log."""
- ip = request.remote_addr if request else "system"
- audit_logger.info(f"[{ip}] {action}: {detail}")
-
-# ---------------------------------------------------------------------------
-# Configuration
-# ---------------------------------------------------------------------------
-SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
-CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla")
-BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports")
-ENROLLMENT_SHARE = os.environ.get("ENROLLMENT_SHARE", "/srv/samba/enrollment")
-UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload")
-SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared")
-# Subdirs inside Deploy/ shared across ALL image types
-SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"]
-# Subdirs inside Deploy/ shared within the same image family (by prefix)
-SHARED_DEPLOY_SCOPED = {
- "gea-": ["Operating Systems", "Packages"],
- "ge-": ["Operating Systems", "Packages"],
-}
-# Sibling dirs at image root shared within the same image family
-SHARED_ROOT_DIRS = {
- "gea-": ["Sources"],
- "ge-": ["Sources"],
-}
-WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html")
-BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim")
-
-IMAGE_TYPES = [
- "gea-standard",
- "gea-engineer",
- "gea-shopfloor",
- "gea-shopfloor-mce",
- "ge-standard",
- "ge-engineer",
- "ge-shopfloor-lockdown",
- "ge-shopfloor-mce",
-]
-
-FRIENDLY_NAMES = {
- "gea-standard": "GE Aerospace Standard",
- "gea-engineer": "GE Aerospace Engineer",
- "gea-shopfloor": "GE Aerospace Shop Floor",
- "gea-shopfloor-mce": "GE Aerospace Shop Floor MCE",
- "ge-standard": "GE Legacy Standard",
- "ge-engineer": "GE Legacy Engineer",
- "ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown",
- "ge-shopfloor-mce": "GE Legacy Shop Floor MCE",
-}
-
-# ---------------------------------------------------------------------------
-# CSRF protection
-# ---------------------------------------------------------------------------
-def generate_csrf_token():
- """Return the CSRF token for the current session, creating one if needed."""
- if "_csrf_token" not in session:
- session["_csrf_token"] = secrets.token_hex(32)
- return session["_csrf_token"]
-
-
-@app.context_processor
-def inject_csrf_token():
- """Make csrf_token() available in all templates."""
- return {"csrf_token": generate_csrf_token}
-
-
-@app.before_request
-def validate_csrf():
- """Reject POST requests with a missing or invalid CSRF token."""
- if request.method != "POST":
- return
- token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token")
- if not token or token != generate_csrf_token():
- abort(403)
-
-
-NS = "urn:schemas-microsoft-com:unattend"
-WCM = "http://schemas.microsoft.com/WMIConfig/2002/State"
-NSMAP = {None: NS, "wcm": WCM}
-
-# Convenience qualified-name helpers
-def qn(tag):
- """Return a tag qualified with the default unattend namespace."""
- return f"{{{NS}}}{tag}"
-
-def qwcm(attr):
- """Return an attribute qualified with the wcm namespace."""
- return f"{{{WCM}}}{attr}"
-
-
-# ---------------------------------------------------------------------------
-# Utility helpers
-# ---------------------------------------------------------------------------
-
-def image_root(image_type):
- """Return the root directory for an image type."""
- return os.path.join(SAMBA_SHARE, image_type)
-
-
-def deploy_path(image_type):
- """Return the Deploy directory for an image type."""
- return os.path.join(SAMBA_SHARE, image_type, "Deploy")
-
-
-def unattend_path(image_type):
- """Return the unattend.xml path for an image type."""
- return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml")
-
-
-def control_path(image_type):
- """Return the Deploy/Control directory for an image type."""
- return os.path.join(deploy_path(image_type), "Control")
-
-
-def tools_path(image_type):
- """Return the Tools directory for an image type."""
- return os.path.join(SAMBA_SHARE, image_type, "Tools")
-
-
-def _load_json(filepath):
- """Parse a JSON file and return its contents, or [] on failure."""
- try:
- with open(filepath, "r", encoding="utf-8-sig") as fh:
- return json.load(fh)
- except (OSError, json.JSONDecodeError):
- return []
-
-
-def _save_json(filepath, data):
- """Write data as pretty-printed JSON."""
- os.makedirs(os.path.dirname(filepath), exist_ok=True)
- with open(filepath, "w", encoding="utf-8") as fh:
- json.dump(data, fh, indent=2, ensure_ascii=False)
- fh.write("\n")
-
-
-def _resolve_destination(dest_dir, image_type):
- """Convert a Windows *destinationdir* path to a Linux filesystem path.
-
- Replaces the ``*destinationdir*`` placeholder and backslashes, then
- prepends ``SAMBA_SHARE/image_type/`` and resolves symlinks.
- """
- if not dest_dir:
- return ""
- # Replace the placeholder (case-insensitive)
- path = dest_dir
- lower = path.lower()
- idx = lower.find("*destinationdir*")
- if idx != -1:
- path = path[idx + len("*destinationdir*"):]
- # Backslash → forward slash, strip leading slash
- path = path.replace("\\", "/").lstrip("/")
- full = os.path.join(SAMBA_SHARE, image_type, path)
- # Resolve symlinks so shared dirs are found
- try:
- full = os.path.realpath(full)
- except OSError:
- pass
- return full
-
-
-def _load_image_config(image_type):
- """Load all JSON configs for an image and check on-disk presence."""
- ctrl = control_path(image_type)
- tools = tools_path(image_type)
-
- # --- Drivers (merge HardwareDriver.json + hw_drivers.json) ---
- hw_driver_file = os.path.join(ctrl, "HardwareDriver.json")
- hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json")
- drivers_raw = _load_json(hw_driver_file)
- extra_raw = _load_json(hw_drivers_extra)
-
- # Merge: dedup by FileName (case-insensitive)
- seen_files = set()
- drivers = []
- for d in drivers_raw + extra_raw:
- fname = (d.get("FileName") or d.get("fileName") or "").lower()
- if fname and fname in seen_files:
- continue
- if fname:
- seen_files.add(fname)
- drivers.append(d)
-
- # Check disk presence for each driver
- for d in drivers:
- fname = d.get("FileName") or d.get("fileName") or ""
- dest = d.get("DestinationDir") or d.get("destinationDir") or ""
- resolved = _resolve_destination(dest, image_type)
- if resolved and fname:
- d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
- else:
- d["_on_disk"] = False
-
- # --- Operating Systems ---
- os_file = os.path.join(ctrl, "OperatingSystem.json")
- operating_systems = _load_json(os_file)
- for entry in operating_systems:
- osv = entry.get("operatingSystemVersion", {})
- wim = osv.get("wim", {})
- dest = wim.get("DestinationDir") or wim.get("destinationDir") or ""
- resolved = _resolve_destination(dest, image_type)
- if resolved:
- entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim"))
- else:
- entry["_on_disk"] = False
-
- # --- Packages ---
- pkg_file = os.path.join(ctrl, "packages.json")
- packages = _load_json(pkg_file)
- for p in packages:
- fname = p.get("fileName") or p.get("FileName") or ""
- dest = p.get("destinationDir") or p.get("DestinationDir") or ""
- resolved = _resolve_destination(dest, image_type)
- if resolved and fname:
- p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
- else:
- p["_on_disk"] = False
-
- # --- Hardware Models (user_selections.json) ---
- us_file = os.path.join(tools, "user_selections.json")
- us_raw = _load_json(us_file)
- us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
- hardware_models = us_data.get("HardwareModelSelection", [])
- os_selection = str(us_data.get("OperatingSystemSelection", ""))
-
- # Build a lookup: family → driver entry (for disk-presence on models)
- family_lookup = {}
- for d in drivers:
- family = d.get("family", "")
- if family:
- family_lookup[family] = d
-
- for hm in hardware_models:
- family_id = hm.get("Id", "")
- matched = family_lookup.get(family_id)
- hm["_on_disk"] = matched["_on_disk"] if matched else False
-
- # --- Orphan drivers: zip files on disk not referenced in any JSON ---
- orphan_drivers = []
- oob_dir = os.path.join(deploy_path(image_type), "Out-of-box Drivers")
- # Resolve symlinks
- try:
- oob_dir = os.path.realpath(oob_dir)
- except OSError:
- pass
- registered_files = set()
- for d in drivers:
- fname = d.get("FileName") or d.get("fileName") or ""
- if fname:
- registered_files.add(fname.lower())
- if os.path.isdir(oob_dir):
- for dirpath, _dirnames, filenames in os.walk(oob_dir):
- for fn in filenames:
- if fn.lower().endswith(".zip") and fn.lower() not in registered_files:
- rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir)
- orphan_drivers.append({"fileName": fn, "relPath": rel})
-
- return {
- "hardware_models": hardware_models,
- "drivers": drivers,
- "operating_systems": operating_systems,
- "packages": packages,
- "orphan_drivers": orphan_drivers,
- "os_selection": os_selection,
- }
-
-
-def image_status(image_type):
- """Return a dict describing the state of an image type."""
- dp = deploy_path(image_type)
- up = unattend_path(image_type)
- has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
- has_unattend = os.path.isfile(up)
- return {
- "image_type": image_type,
- "friendly_name": FRIENDLY_NAMES.get(image_type, image_type),
- "deploy_path": dp,
- "has_content": has_content,
- "has_unattend": has_unattend,
- }
-
-
-def service_status(service_name):
- """Check whether a systemd service is active."""
- try:
- result = subprocess.run(
- ["systemctl", "is-active", service_name],
- capture_output=True,
- text=True,
- timeout=5,
- )
- state = result.stdout.strip()
- return {"name": service_name, "active": state == "active", "state": state}
- except Exception as exc:
- return {"name": service_name, "active": False, "state": str(exc)}
-
-
-def find_usb_mounts():
- """Return a list of mount-point paths that look like removable media."""
- mounts = []
- try:
- with open("/proc/mounts", "r") as fh:
- for line in fh:
- parts = line.split()
- if len(parts) >= 2:
- mount_point = parts[1]
- if mount_point.startswith(("/mnt/", "/media/")):
- if os.path.isdir(mount_point):
- mounts.append(mount_point)
- except OSError:
- pass
- return sorted(set(mounts))
-
-
-def find_upload_sources():
- """Return sub-directories inside UPLOAD_DIR that look like image content."""
- sources = []
- if os.path.isdir(UPLOAD_DIR):
- # Include the upload dir itself if it has content
- try:
- entries = os.listdir(UPLOAD_DIR)
- if entries:
- sources.append(UPLOAD_DIR)
- # Also include immediate subdirectories
- for entry in entries:
- full = os.path.join(UPLOAD_DIR, entry)
- if os.path.isdir(full):
- sources.append(full)
- except OSError:
- pass
- return sources
-
-
-def _import_deploy(src_deploy, dst_deploy, target="", move=False):
- """Import Deploy directory contents, merging shared subdirs into _shared.
- When move=True, files are moved instead of copied (saves disk space)."""
- # Build list of scoped shared dirs for this target
- scoped_shared = []
- prefix_key = ""
- for prefix, dirs in SHARED_DEPLOY_SCOPED.items():
- if target.startswith(prefix):
- scoped_shared = dirs
- prefix_key = prefix
- break
-
- _transfer = shutil.move if move else shutil.copy2
- _transfer_tree = shutil.move if move else shutil.copytree
-
- os.makedirs(dst_deploy, exist_ok=True)
- for item in os.listdir(src_deploy):
- src_item = os.path.join(src_deploy, item)
- dst_item = os.path.join(dst_deploy, item)
-
- if not os.path.isdir(src_item):
- _transfer(src_item, dst_item)
- continue
-
- # Global shared (e.g., Out-of-box Drivers) — one copy for all
- if item in SHARED_DEPLOY_GLOBAL:
- shared_dest = os.path.join(SHARED_DIR, item)
- os.makedirs(shared_dest, exist_ok=True)
- _merge_tree(src_item, shared_dest, move=move)
- _replace_with_symlink(dst_item, shared_dest)
- continue
-
- # Scoped shared (e.g., Operating Systems) — per family prefix
- if item in scoped_shared:
- shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}")
- os.makedirs(shared_dest, exist_ok=True)
- _merge_tree(src_item, shared_dest, move=move)
- _replace_with_symlink(dst_item, shared_dest)
- continue
-
- # Normal transfer — merge to preserve existing custom content
- if os.path.isdir(dst_item):
- _merge_tree(src_item, dst_item, move=move)
- else:
- _transfer_tree(src_item, dst_item)
-
-
-def _replace_with_symlink(link_path, target_path):
- """Replace a file/dir/symlink at link_path with a symlink to target_path."""
- if os.path.islink(link_path):
- os.remove(link_path)
- elif os.path.isdir(link_path):
- shutil.rmtree(link_path)
- os.symlink(target_path, link_path)
-
-
-def _merge_tree(src, dst, move=False):
- """Recursively merge src tree into dst, overwriting existing files.
- When move=True, files are moved instead of copied."""
- _transfer = shutil.move if move else shutil.copy2
- _transfer_tree = shutil.move if move else shutil.copytree
- for item in os.listdir(src):
- s = os.path.join(src, item)
- d = os.path.join(dst, item)
- if os.path.isdir(s):
- if os.path.isdir(d):
- _merge_tree(s, d, move=move)
- else:
- if os.path.exists(d):
- os.remove(d)
- _transfer_tree(s, d)
- else:
- os.makedirs(os.path.dirname(d), exist_ok=True)
- _transfer(s, d)
-
-
-def allowed_import_source(source):
- """Check if a source path is a valid import location (USB or upload dir)."""
- usb = find_usb_mounts()
- if any(source == m or source.startswith(m + "/") for m in usb):
- return True
- if source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/"):
- return os.path.isdir(source)
- return False
-
-
-# ---------------------------------------------------------------------------
-# XML helpers — parse / build unattend.xml
-# ---------------------------------------------------------------------------
-
-UNATTEND_TEMPLATE = """\
-
-
-
-
-
-
-
-"""
-
-
-def _find_or_create(parent, tag):
- """Find the first child with *tag* or create it."""
- el = parent.find(tag, namespaces={"": NS})
- if el is None:
- el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag)
- return el
-
-
-def _settings_pass(root, pass_name):
- """Return the element, creating if needed."""
- for s in root.findall(qn("settings")):
- if s.get("pass") == pass_name:
- return s
- s = etree.SubElement(root, qn("settings"))
- s.set("pass", pass_name)
- return s
-
-
-def parse_unattend(xml_path):
- """Parse an unattend.xml and return a dict of editable data."""
- data = {
- "driver_paths": [],
- "computer_name": "",
- "registered_organization": "",
- "registered_owner": "",
- "time_zone": "",
- "specialize_commands": [],
- "oobe": {
- "HideEULAPage": "true",
- "HideOEMRegistrationScreen": "true",
- "HideOnlineAccountScreens": "true",
- "HideWirelessSetupInOOBE": "true",
- "HideLocalAccountScreen": "true",
- "NetworkLocation": "Work",
- "ProtectYourPC": "3",
- "SkipUserOOBE": "true",
- "SkipMachineOOBE": "true",
- },
- "firstlogon_commands": [],
- "user_accounts": [],
- "autologon": {
- "enabled": "",
- "username": "",
- "password": "",
- "plain_text": "true",
- "logon_count": "",
- },
- "intl": {
- "input_locale": "",
- "system_locale": "",
- "ui_language": "",
- "user_locale": "",
- },
- "oobe_timezone": "",
- "raw_xml": "",
- }
-
- if not os.path.isfile(xml_path):
- data["raw_xml"] = UNATTEND_TEMPLATE
- return data
-
- with open(xml_path, "r", encoding="utf-8") as fh:
- raw = fh.read()
- data["raw_xml"] = raw
-
- try:
- root = etree.fromstring(raw.encode("utf-8"))
- except etree.XMLSyntaxError:
- return data
-
- ns = {"u": NS}
-
- # --- offlineServicing: DriverPaths ---
- for dp_el in root.xpath(
- "u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path",
- namespaces=ns,
- ):
- if dp_el.text:
- data["driver_paths"].append(dp_el.text.strip())
-
- # --- specialize: Shell-Setup ---
- for comp in root.xpath(
- "u:settings[@pass='specialize']/u:component",
- namespaces=ns,
- ):
- comp_name = comp.get("name", "")
- if "Shell-Setup" in comp_name:
- for tag, key in [
- ("ComputerName", "computer_name"),
- ("RegisteredOrganization", "registered_organization"),
- ("RegisteredOwner", "registered_owner"),
- ("TimeZone", "time_zone"),
- ]:
- el = comp.find(qn(tag))
- if el is not None and el.text:
- data[key] = el.text.strip()
-
- # --- specialize: RunSynchronous commands ---
- for cmd in root.xpath(
- "u:settings[@pass='specialize']//u:RunSynchronousCommand",
- namespaces=ns,
- ):
- order_el = cmd.find(qn("Order"))
- path_el = cmd.find(qn("Path"))
- desc_el = cmd.find(qn("Description"))
- data["specialize_commands"].append({
- "order": order_el.text.strip() if order_el is not None and order_el.text else "",
- "path": path_el.text.strip() if path_el is not None and path_el.text else "",
- "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
- })
-
- # --- oobeSystem ---
- for comp in root.xpath(
- "u:settings[@pass='oobeSystem']/u:component",
- namespaces=ns,
- ):
- comp_name = comp.get("name", "")
-
- # International-Core component
- if "International-Core" in comp_name:
- for tag, key in [
- ("InputLocale", "input_locale"),
- ("SystemLocale", "system_locale"),
- ("UILanguage", "ui_language"),
- ("UserLocale", "user_locale"),
- ]:
- el = comp.find(qn(tag))
- if el is not None and el.text:
- data["intl"][key] = el.text.strip()
-
- if "OOBE" in comp_name or "Shell-Setup" in comp_name:
- oobe_el = comp.find(qn("OOBE"))
- if oobe_el is not None:
- for child in oobe_el:
- local = etree.QName(child).localname
- if local in data["oobe"] and child.text:
- data["oobe"][local] = child.text.strip()
-
- # UserAccounts / LocalAccounts
- ua = comp.find(qn("UserAccounts"))
- if ua is not None:
- la_container = ua.find(qn("LocalAccounts"))
- if la_container is not None:
- for acct in la_container.findall(qn("LocalAccount")):
- name_el = acct.find(qn("Name"))
- group_el = acct.find(qn("Group"))
- display_el = acct.find(qn("DisplayName"))
- pw_el = acct.find(qn("Password"))
- pw_val = ""
- pw_plain = "true"
- if pw_el is not None:
- v = pw_el.find(qn("Value"))
- p = pw_el.find(qn("PlainText"))
- if v is not None and v.text:
- pw_val = v.text.strip()
- if p is not None and p.text:
- pw_plain = p.text.strip()
- data["user_accounts"].append({
- "name": name_el.text.strip() if name_el is not None and name_el.text else "",
- "password": pw_val,
- "plain_text": pw_plain,
- "group": group_el.text.strip() if group_el is not None and group_el.text else "Administrators",
- "display_name": display_el.text.strip() if display_el is not None and display_el.text else "",
- })
-
- # AutoLogon
- al = comp.find(qn("AutoLogon"))
- if al is not None:
- enabled_el = al.find(qn("Enabled"))
- user_el = al.find(qn("Username"))
- count_el = al.find(qn("LogonCount"))
- pw_el = al.find(qn("Password"))
- pw_val = ""
- pw_plain = "true"
- if pw_el is not None:
- v = pw_el.find(qn("Value"))
- p = pw_el.find(qn("PlainText"))
- if v is not None and v.text:
- pw_val = v.text.strip()
- if p is not None and p.text:
- pw_plain = p.text.strip()
- data["autologon"] = {
- "enabled": enabled_el.text.strip() if enabled_el is not None and enabled_el.text else "",
- "username": user_el.text.strip() if user_el is not None and user_el.text else "",
- "password": pw_val,
- "plain_text": pw_plain,
- "logon_count": count_el.text.strip() if count_el is not None and count_el.text else "",
- }
-
- # FirstLogonCommands
- flc = comp.find(qn("FirstLogonCommands"))
- if flc is not None:
- for sync in flc.findall(qn("SynchronousCommand")):
- order_el = sync.find(qn("Order"))
- cl_el = sync.find(qn("CommandLine"))
- desc_el = sync.find(qn("Description"))
- data["firstlogon_commands"].append({
- "order": order_el.text.strip() if order_el is not None and order_el.text else "",
- "commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "",
- "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
- })
-
- # TimeZone (oobeSystem pass)
- tz_el = comp.find(qn("TimeZone"))
- if tz_el is not None and tz_el.text:
- data["oobe_timezone"] = tz_el.text.strip()
-
- return data
-
-
-def build_unattend_xml(form_data):
- """Build a complete unattend.xml string from form data dict."""
- root = etree.Element(qn("unattend"), nsmap=NSMAP)
-
- # --- windowsPE (empty) ---
- _settings_pass(root, "windowsPE")
-
- # --- offlineServicing: DriverPaths ---
- offline = _settings_pass(root, "offlineServicing")
- driver_paths = form_data.get("driver_paths", [])
- if driver_paths:
- comp = etree.SubElement(offline, qn("component"))
- comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE")
- comp.set("processorArchitecture", "amd64")
- comp.set("publicKeyToken", "31bf3856ad364e35")
- comp.set("language", "neutral")
- comp.set("versionScope", "nonSxS")
- dp_container = etree.SubElement(comp, qn("DriverPaths"))
- for idx, dp in enumerate(driver_paths, start=1):
- if not dp.strip():
- continue
- pac = etree.SubElement(dp_container, qn("PathAndCredentials"))
- pac.set(qwcm("action"), "add")
- pac.set(qwcm("keyValue"), str(idx))
- path_el = etree.SubElement(pac, qn("Path"))
- path_el.text = dp.strip()
-
- # --- specialize ---
- spec = _settings_pass(root, "specialize")
-
- # Shell-Setup component
- shell_comp = etree.SubElement(spec, qn("component"))
- shell_comp.set("name", "Microsoft-Windows-Shell-Setup")
- shell_comp.set("processorArchitecture", "amd64")
- shell_comp.set("publicKeyToken", "31bf3856ad364e35")
- shell_comp.set("language", "neutral")
- shell_comp.set("versionScope", "nonSxS")
-
- for tag, key in [
- ("ComputerName", "computer_name"),
- ("RegisteredOrganization", "registered_organization"),
- ("RegisteredOwner", "registered_owner"),
- ("TimeZone", "time_zone"),
- ]:
- val = form_data.get(key, "").strip()
- if val:
- el = etree.SubElement(shell_comp, qn(tag))
- el.text = val
-
- # Deployment / RunSynchronous commands
- spec_cmds = form_data.get("specialize_commands", [])
- if spec_cmds:
- deploy_comp = etree.SubElement(spec, qn("component"))
- deploy_comp.set("name", "Microsoft-Windows-Deployment")
- deploy_comp.set("processorArchitecture", "amd64")
- deploy_comp.set("publicKeyToken", "31bf3856ad364e35")
- deploy_comp.set("language", "neutral")
- deploy_comp.set("versionScope", "nonSxS")
- rs = etree.SubElement(deploy_comp, qn("RunSynchronous"))
- for idx, cmd in enumerate(spec_cmds, start=1):
- if not cmd.get("path", "").strip():
- continue
- rsc = etree.SubElement(rs, qn("RunSynchronousCommand"))
- rsc.set(qwcm("action"), "add")
- order_el = etree.SubElement(rsc, qn("Order"))
- order_el.text = str(idx)
- path_el = etree.SubElement(rsc, qn("Path"))
- path_el.text = cmd["path"].strip()
- desc_el = etree.SubElement(rsc, qn("Description"))
- desc_el.text = cmd.get("description", "").strip()
-
- # --- oobeSystem ---
- oobe_settings = _settings_pass(root, "oobeSystem")
-
- # International-Core component (before Shell-Setup)
- intl = form_data.get("intl", {})
- if any(v.strip() for v in intl.values() if v):
- intl_comp = etree.SubElement(oobe_settings, qn("component"))
- intl_comp.set("name", "Microsoft-Windows-International-Core")
- intl_comp.set("processorArchitecture", "amd64")
- intl_comp.set("publicKeyToken", "31bf3856ad364e35")
- intl_comp.set("language", "neutral")
- intl_comp.set("versionScope", "nonSxS")
- for tag, key in [
- ("InputLocale", "input_locale"),
- ("SystemLocale", "system_locale"),
- ("UILanguage", "ui_language"),
- ("UserLocale", "user_locale"),
- ]:
- val = intl.get(key, "").strip()
- if val:
- el = etree.SubElement(intl_comp, qn(tag))
- el.text = val
-
- # Shell-Setup component
- oobe_comp = etree.SubElement(oobe_settings, qn("component"))
- oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
- oobe_comp.set("processorArchitecture", "amd64")
- oobe_comp.set("publicKeyToken", "31bf3856ad364e35")
- oobe_comp.set("language", "neutral")
- oobe_comp.set("versionScope", "nonSxS")
-
- oobe_el = etree.SubElement(oobe_comp, qn("OOBE"))
- oobe_data = form_data.get("oobe", {})
- for key in [
- "HideEULAPage",
- "HideOEMRegistrationScreen",
- "HideOnlineAccountScreens",
- "HideWirelessSetupInOOBE",
- "HideLocalAccountScreen",
- "NetworkLocation",
- "ProtectYourPC",
- "SkipUserOOBE",
- "SkipMachineOOBE",
- ]:
- val = oobe_data.get(key, "")
- if val:
- child = etree.SubElement(oobe_el, qn(key))
- child.text = str(val)
-
- # UserAccounts / LocalAccounts
- accounts = form_data.get("user_accounts", [])
- if accounts:
- ua = etree.SubElement(oobe_comp, qn("UserAccounts"))
- la_container = etree.SubElement(ua, qn("LocalAccounts"))
- for acct in accounts:
- if not acct.get("name", "").strip():
- continue
- la = etree.SubElement(la_container, qn("LocalAccount"))
- la.set(qwcm("action"), "add")
- pw = etree.SubElement(la, qn("Password"))
- pw_val = etree.SubElement(pw, qn("Value"))
- pw_val.text = acct.get("password", "")
- pw_plain = etree.SubElement(pw, qn("PlainText"))
- pw_plain.text = acct.get("plain_text", "true")
- name_el = etree.SubElement(la, qn("Name"))
- name_el.text = acct["name"].strip()
- group_el = etree.SubElement(la, qn("Group"))
- group_el.text = acct.get("group", "Administrators").strip()
- display_el = etree.SubElement(la, qn("DisplayName"))
- display_el.text = acct.get("display_name", acct["name"]).strip()
-
- # AutoLogon
- autologon = form_data.get("autologon", {})
- if autologon.get("username", "").strip():
- al = etree.SubElement(oobe_comp, qn("AutoLogon"))
- al_pw = etree.SubElement(al, qn("Password"))
- al_pw_val = etree.SubElement(al_pw, qn("Value"))
- al_pw_val.text = autologon.get("password", "")
- al_pw_plain = etree.SubElement(al_pw, qn("PlainText"))
- al_pw_plain.text = autologon.get("plain_text", "true")
- al_enabled = etree.SubElement(al, qn("Enabled"))
- al_enabled.text = autologon.get("enabled", "true")
- al_user = etree.SubElement(al, qn("Username"))
- al_user.text = autologon["username"].strip()
- logon_count = autologon.get("logon_count", "").strip()
- if logon_count:
- al_count = etree.SubElement(al, qn("LogonCount"))
- al_count.text = logon_count
-
- # FirstLogonCommands
- fl_cmds = form_data.get("firstlogon_commands", [])
- if fl_cmds:
- flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands"))
- for idx, cmd in enumerate(fl_cmds, start=1):
- if not cmd.get("commandline", "").strip():
- continue
- sc = etree.SubElement(flc, qn("SynchronousCommand"))
- sc.set(qwcm("action"), "add")
- order_el = etree.SubElement(sc, qn("Order"))
- order_el.text = str(idx)
- cl_el = etree.SubElement(sc, qn("CommandLine"))
- cl_el.text = cmd["commandline"].strip()
- desc_el = etree.SubElement(sc, qn("Description"))
- desc_el.text = cmd.get("description", "").strip()
-
- # TimeZone (oobeSystem pass)
- oobe_tz = form_data.get("oobe_timezone", "").strip()
- if oobe_tz:
- tz_el = etree.SubElement(oobe_comp, qn("TimeZone"))
- tz_el.text = oobe_tz
-
- xml_bytes = etree.tostring(
- root,
- pretty_print=True,
- xml_declaration=True,
- encoding="utf-8",
- )
- return xml_bytes.decode("utf-8")
-
-
-def _extract_form_data(form):
- """Pull structured data from the submitted form."""
- data = {}
-
- # Driver paths
- dp_list = form.getlist("driver_path[]")
- data["driver_paths"] = [p for p in dp_list if p.strip()]
-
- # Machine settings
- data["computer_name"] = form.get("computer_name", "")
- data["registered_organization"] = form.get("registered_organization", "")
- data["registered_owner"] = form.get("registered_owner", "")
- data["time_zone"] = form.get("time_zone", "")
-
- # Specialize commands
- spec_paths = form.getlist("spec_cmd_path[]")
- spec_descs = form.getlist("spec_cmd_desc[]")
- data["specialize_commands"] = []
- for i in range(len(spec_paths)):
- if spec_paths[i].strip():
- data["specialize_commands"].append({
- "path": spec_paths[i],
- "description": spec_descs[i] if i < len(spec_descs) else "",
- })
-
- # OOBE settings
- data["oobe"] = {}
- for key in [
- "HideEULAPage",
- "HideOEMRegistrationScreen",
- "HideOnlineAccountScreens",
- "HideWirelessSetupInOOBE",
- "HideLocalAccountScreen",
- "SkipUserOOBE",
- "SkipMachineOOBE",
- ]:
- data["oobe"][key] = form.get(f"oobe_{key}", "false")
- data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work")
- data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3")
-
- # First logon commands
- fl_cls = form.getlist("fl_cmd_commandline[]")
- fl_descs = form.getlist("fl_cmd_desc[]")
- data["firstlogon_commands"] = []
- for i in range(len(fl_cls)):
- if fl_cls[i].strip():
- data["firstlogon_commands"].append({
- "commandline": fl_cls[i],
- "description": fl_descs[i] if i < len(fl_descs) else "",
- })
-
- # User accounts
- accounts = []
- i = 0
- while form.get(f"account_name_{i}"):
- accounts.append({
- "name": form.get(f"account_name_{i}", ""),
- "password": form.get(f"account_password_{i}", ""),
- "plain_text": form.get(f"account_plaintext_{i}", "true"),
- "group": form.get(f"account_group_{i}", "Administrators"),
- "display_name": form.get(f"account_display_{i}", ""),
- })
- i += 1
- data["user_accounts"] = accounts
-
- # AutoLogon
- data["autologon"] = {
- "enabled": form.get("autologon_enabled", ""),
- "username": form.get("autologon_username", ""),
- "password": form.get("autologon_password", ""),
- "plain_text": form.get("autologon_plaintext", "true"),
- "logon_count": form.get("autologon_logoncount", ""),
- }
-
- # International settings
- data["intl"] = {
- "input_locale": form.get("intl_input_locale", ""),
- "system_locale": form.get("intl_system_locale", ""),
- "ui_language": form.get("intl_ui_language", ""),
- "user_locale": form.get("intl_user_locale", ""),
- }
-
- # OOBE TimeZone
- data["oobe_timezone"] = form.get("oobe_timezone", "")
-
- return data
-
-
-# ---------------------------------------------------------------------------
-# Routes — pages
-# ---------------------------------------------------------------------------
-
-@app.route("/")
-def dashboard():
- images = [image_status(it) for it in IMAGE_TYPES]
- services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
- return render_template(
- "dashboard.html",
- images=images,
- services=services,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-@app.route("/images/import", methods=["GET", "POST"])
-def images_import():
- usb_mounts = find_usb_mounts()
- upload_sources = find_upload_sources()
- images = [image_status(it) for it in IMAGE_TYPES]
-
- if request.method == "POST":
- source = request.form.get("source", "")
- target = request.form.get("target", "")
-
- if not source or not target:
- flash("Please select both a source and a target image type.", "danger")
- return redirect(url_for("images_import"))
-
- if target not in IMAGE_TYPES:
- flash("Invalid target image type.", "danger")
- return redirect(url_for("images_import"))
-
- if not allowed_import_source(source):
- flash("Source path is not a valid import location.", "danger")
- return redirect(url_for("images_import"))
-
- if not os.path.isdir(source):
- flash(f"Source path does not exist: {source}", "danger")
- return redirect(url_for("images_import"))
-
- root = image_root(target)
- dest = deploy_path(target)
- try:
- os.makedirs(dest, exist_ok=True)
- src_items = os.listdir(source)
-
- # Move files from network upload to save disk space; copy from USB
- use_move = source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/")
- _transfer = shutil.move if use_move else shutil.copy2
- _transfer_tree = shutil.move if use_move else shutil.copytree
-
- # Detect layout: if source has Deploy/, Sources/, Tools/ at top
- # level, it's the full image root structure (USB-style).
- # Otherwise treat it as Deploy/ contents directly.
- top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))}
- full_layout = "Deploy" in top_dirs
-
- if full_layout:
- # Determine which root-level dirs are shared for this target
- shared_root = []
- for prefix, dirs in SHARED_ROOT_DIRS.items():
- if target.startswith(prefix):
- shared_root = dirs
- break
-
- # Full image root: import Deploy contents + sibling dirs
- for item in src_items:
- src_item = os.path.join(source, item)
- if item == "Deploy":
- _import_deploy(src_item, dest, target, move=use_move)
- elif os.path.isdir(src_item) and item in shared_root:
- # Shared sibling: merge into _shared/{prefix}{item}
- # and symlink from image root
- prefix_key = target.split("-")[0] + "-"
- shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}")
- os.makedirs(shared_dest, exist_ok=True)
- _merge_tree(src_item, shared_dest, move=use_move)
- dst_item = os.path.join(root, item)
- if os.path.islink(dst_item):
- os.remove(dst_item)
- elif os.path.isdir(dst_item):
- shutil.rmtree(dst_item)
- os.symlink(shared_dest, dst_item)
- elif os.path.isdir(src_item):
- # Non-shared sibling dirs (Tools) go into image root
- dst_item = os.path.join(root, item)
- if os.path.exists(dst_item):
- shutil.rmtree(dst_item)
- _transfer_tree(src_item, dst_item)
- else:
- _transfer(src_item, os.path.join(root, item))
- else:
- # Flat layout: treat source as Deploy contents
- _import_deploy(source, dest, target, move=use_move)
-
- # Ensure Media.tag exists (FlatSetupLoader.exe drive detection)
- control_dir = os.path.join(dest, "Control")
- os.makedirs(control_dir, exist_ok=True)
- media_tag = os.path.join(control_dir, "Media.tag")
- Path(media_tag).touch()
-
- audit("IMAGE_IMPORT", f"{source} -> {target}")
- flash(
- f"Successfully imported content to {FRIENDLY_NAMES.get(target, target)}.",
- "success",
- )
- except Exception as exc:
- flash(f"Import failed: {exc}", "danger")
-
- return redirect(url_for("images_import"))
-
- return render_template(
- "import.html",
- usb_mounts=usb_mounts,
- upload_sources=upload_sources,
- images=images,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-@app.route("/images//unattend", methods=["GET", "POST"])
-def unattend_editor(image_type):
- if image_type not in IMAGE_TYPES:
- flash("Unknown image type.", "danger")
- return redirect(url_for("dashboard"))
-
- xml_file = unattend_path(image_type)
-
- if request.method == "POST":
- save_mode = request.form.get("save_mode", "form")
-
- if save_mode == "raw":
- raw_xml = request.form.get("raw_xml", "")
- # Validate the XML before saving
- try:
- etree.fromstring(raw_xml.encode("utf-8"))
- except etree.XMLSyntaxError as exc:
- flash(f"Invalid XML: {exc}", "danger")
- data = parse_unattend(xml_file)
- data["raw_xml"] = raw_xml
- return render_template(
- "unattend_editor.html",
- image_type=image_type,
- friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
- data=data,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
- xml_content = raw_xml
- else:
- form_data = _extract_form_data(request.form)
- xml_content = build_unattend_xml(form_data)
-
- # Write to disk
- try:
- os.makedirs(os.path.dirname(xml_file), exist_ok=True)
- with open(xml_file, "w", encoding="utf-8") as fh:
- fh.write(xml_content)
- audit("UNATTEND_SAVE", f"{image_type} ({save_mode})")
- flash("unattend.xml saved successfully.", "success")
- except Exception as exc:
- flash(f"Failed to save: {exc}", "danger")
-
- return redirect(url_for("unattend_editor", image_type=image_type))
-
- data = parse_unattend(xml_file)
- return render_template(
- "unattend_editor.html",
- image_type=image_type,
- friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
- data=data,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-@app.route("/images//config")
-def image_config(image_type):
- if image_type not in IMAGE_TYPES:
- flash("Unknown image type.", "danger")
- return redirect(url_for("dashboard"))
-
- config = _load_image_config(image_type)
- return render_template(
- "image_config.html",
- image_type=image_type,
- friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
- config=config,
- )
-
-
-@app.route("/images//config/save", methods=["POST"])
-def image_config_save(image_type):
- if image_type not in IMAGE_TYPES:
- flash("Unknown image type.", "danger")
- return redirect(url_for("dashboard"))
-
- section = request.form.get("section", "")
- payload = request.form.get("payload", "[]")
- try:
- data = json.loads(payload)
- except json.JSONDecodeError:
- flash("Invalid JSON payload.", "danger")
- return redirect(url_for("image_config", image_type=image_type))
-
- ctrl = control_path(image_type)
- tools = tools_path(image_type)
-
- try:
- if section == "hardware_models":
- us_file = os.path.join(tools, "user_selections.json")
- us_raw = _load_json(us_file)
- us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
- us_data["HardwareModelSelection"] = data
- _save_json(us_file, [us_data])
- audit("CONFIG_SAVE", f"{image_type}/hardware_models")
-
- elif section == "drivers":
- # Strip internal fields before saving
- clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
- _save_json(os.path.join(ctrl, "HardwareDriver.json"), clean)
- audit("CONFIG_SAVE", f"{image_type}/drivers")
-
- elif section == "operating_systems":
- clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
- _save_json(os.path.join(ctrl, "OperatingSystem.json"), clean)
- audit("CONFIG_SAVE", f"{image_type}/operating_systems")
-
- elif section == "packages":
- clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
- _save_json(os.path.join(ctrl, "packages.json"), clean)
- audit("CONFIG_SAVE", f"{image_type}/packages")
-
- else:
- flash(f"Unknown section: {section}", "danger")
- return redirect(url_for("image_config", image_type=image_type))
-
- flash(f"Saved {section.replace('_', ' ')} successfully.", "success")
- except Exception as exc:
- flash(f"Failed to save {section}: {exc}", "danger")
-
- return redirect(url_for("image_config", image_type=image_type))
-
-
-# ---------------------------------------------------------------------------
-# Routes — Clonezilla Backups
-# ---------------------------------------------------------------------------
-
-@app.route("/backups")
-def clonezilla_backups():
- backups = []
- if os.path.isdir(CLONEZILLA_SHARE):
- for f in sorted(os.listdir(CLONEZILLA_SHARE)):
- fpath = os.path.join(CLONEZILLA_SHARE, f)
- if os.path.isfile(fpath) and f.lower().endswith(".zip"):
- stat = os.stat(fpath)
- backups.append({
- "filename": f,
- "machine": os.path.splitext(f)[0],
- "size": stat.st_size,
- "modified": stat.st_mtime,
- })
- return render_template(
- "backups.html",
- backups=backups,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-@app.route("/backups/upload", methods=["POST"])
-def clonezilla_upload():
- if "backup_file" not in request.files:
- flash("No file selected.", "danger")
- return redirect(url_for("clonezilla_backups"))
-
- f = request.files["backup_file"]
- if not f.filename:
- flash("No file selected.", "danger")
- return redirect(url_for("clonezilla_backups"))
-
- filename = secure_filename(f.filename)
- if not filename.lower().endswith(".zip"):
- flash("Only .zip files are accepted.", "danger")
- return redirect(url_for("clonezilla_backups"))
-
- os.makedirs(CLONEZILLA_SHARE, exist_ok=True)
- dest = os.path.join(CLONEZILLA_SHARE, filename)
- f.save(dest)
- audit("BACKUP_UPLOAD", filename)
- flash(f"Uploaded {filename} successfully.", "success")
- return redirect(url_for("clonezilla_backups"))
-
-
-@app.route("/backups/download/")
-def clonezilla_download(filename):
- filename = secure_filename(filename)
- fpath = os.path.join(CLONEZILLA_SHARE, filename)
- if not os.path.isfile(fpath):
- flash(f"Backup not found: {filename}", "danger")
- return redirect(url_for("clonezilla_backups"))
- return send_file(fpath, as_attachment=True)
-
-
-@app.route("/backups/delete/", methods=["POST"])
-def clonezilla_delete(filename):
- filename = secure_filename(filename)
- fpath = os.path.join(CLONEZILLA_SHARE, filename)
- if os.path.isfile(fpath):
- os.remove(fpath)
- audit("BACKUP_DELETE", filename)
- flash(f"Deleted {filename}.", "success")
- else:
- flash(f"Backup not found: {filename}", "danger")
- return redirect(url_for("clonezilla_backups"))
-
-
-# ---------------------------------------------------------------------------
-# Routes — Blancco Reports
-# ---------------------------------------------------------------------------
-
-@app.route("/reports")
-def blancco_reports():
- reports = []
- if os.path.isdir(BLANCCO_REPORTS):
- for f in sorted(os.listdir(BLANCCO_REPORTS), reverse=True):
- fpath = os.path.join(BLANCCO_REPORTS, f)
- if os.path.isfile(fpath):
- stat = os.stat(fpath)
- ext = os.path.splitext(f)[1].lower()
- reports.append({
- "filename": f,
- "size": stat.st_size,
- "modified": stat.st_mtime,
- "type": ext.lstrip(".").upper() or "FILE",
- })
- return render_template(
- "reports.html",
- reports=reports,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-@app.route("/reports/download/")
-def blancco_download_report(filename):
- filename = secure_filename(filename)
- fpath = os.path.join(BLANCCO_REPORTS, filename)
- if not os.path.isfile(fpath):
- flash(f"Report not found: {filename}", "danger")
- return redirect(url_for("blancco_reports"))
- return send_file(fpath, as_attachment=True)
-
-
-@app.route("/reports/delete/", methods=["POST"])
-def blancco_delete_report(filename):
- filename = secure_filename(filename)
- fpath = os.path.join(BLANCCO_REPORTS, filename)
- if os.path.isfile(fpath):
- os.remove(fpath)
- audit("REPORT_DELETE", filename)
- flash(f"Deleted {filename}.", "success")
- else:
- flash(f"Report not found: {filename}", "danger")
- return redirect(url_for("blancco_reports"))
-
-
-# ---------------------------------------------------------------------------
-# Routes — Enrollment Packages
-# ---------------------------------------------------------------------------
-
-@app.route("/enrollment")
-def enrollment():
- packages = []
- if os.path.isdir(ENROLLMENT_SHARE):
- for f in sorted(os.listdir(ENROLLMENT_SHARE)):
- fpath = os.path.join(ENROLLMENT_SHARE, f)
- if os.path.isfile(fpath) and f.lower().endswith(".ppkg"):
- stat = os.stat(fpath)
- packages.append({
- "filename": f,
- "size": stat.st_size,
- "modified": stat.st_mtime,
- })
- return render_template(
- "enrollment.html",
- packages=packages,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-@app.route("/enrollment/upload", methods=["POST"])
-def enrollment_upload():
- if "ppkg_file" not in request.files:
- flash("No file selected.", "danger")
- return redirect(url_for("enrollment"))
-
- f = request.files["ppkg_file"]
- if not f.filename:
- flash("No file selected.", "danger")
- return redirect(url_for("enrollment"))
-
- filename = secure_filename(f.filename)
- if not filename.lower().endswith(".ppkg"):
- flash("Only .ppkg files are accepted.", "danger")
- return redirect(url_for("enrollment"))
-
- os.makedirs(ENROLLMENT_SHARE, exist_ok=True)
- dest = os.path.join(ENROLLMENT_SHARE, filename)
- f.save(dest)
- audit("ENROLLMENT_UPLOAD", filename)
- flash(f"Uploaded {filename} successfully.", "success")
- return redirect(url_for("enrollment"))
-
-
-@app.route("/enrollment/download/")
-def enrollment_download(filename):
- filename = secure_filename(filename)
- fpath = os.path.join(ENROLLMENT_SHARE, filename)
- if not os.path.isfile(fpath):
- flash(f"Package not found: {filename}", "danger")
- return redirect(url_for("enrollment"))
- return send_file(fpath, as_attachment=True)
-
-
-@app.route("/enrollment/delete/", methods=["POST"])
-def enrollment_delete(filename):
- filename = secure_filename(filename)
- fpath = os.path.join(ENROLLMENT_SHARE, filename)
- if os.path.isfile(fpath):
- os.remove(fpath)
- audit("ENROLLMENT_DELETE", filename)
- flash(f"Deleted {filename}.", "success")
- else:
- flash(f"Package not found: {filename}", "danger")
- return redirect(url_for("enrollment"))
-
-
-# ---------------------------------------------------------------------------
-# Routes — startnet.cmd Editor (WIM)
-# ---------------------------------------------------------------------------
-
-def _wim_extract_startnet(wim_path):
- """Extract startnet.cmd from a WIM file using wimextract."""
- tmpdir = tempfile.mkdtemp()
- try:
- result = subprocess.run(
- ["wimextract", wim_path, "1",
- "/Windows/System32/startnet.cmd",
- "--dest-dir", tmpdir],
- capture_output=True, text=True, timeout=30,
- )
- startnet_path = os.path.join(tmpdir, "startnet.cmd")
- if result.returncode == 0 and os.path.isfile(startnet_path):
- with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh:
- return fh.read()
- return None
- except Exception:
- return None
- finally:
- shutil.rmtree(tmpdir, ignore_errors=True)
-
-
-def _wim_update_startnet(wim_path, content):
- """Update startnet.cmd inside a WIM file using wimupdate."""
- tmpdir = tempfile.mkdtemp()
- try:
- startnet_path = os.path.join(tmpdir, "startnet.cmd")
- with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh:
- fh.write(content)
- # wimupdate reads commands from stdin
- update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n"
- result = subprocess.run(
- ["wimupdate", wim_path, "1"],
- input=update_cmd,
- capture_output=True, text=True, timeout=60,
- )
- if result.returncode != 0:
- return False, result.stderr.strip()
- return True, ""
- except Exception as exc:
- return False, str(exc)
- finally:
- shutil.rmtree(tmpdir, ignore_errors=True)
-
-
-def _wim_list_files(wim_path, path="/"):
- """List files inside a WIM at the given path."""
- try:
- result = subprocess.run(
- ["wimdir", wim_path, "1", path],
- capture_output=True, text=True, timeout=30,
- )
- if result.returncode == 0:
- return [l.strip() for l in result.stdout.splitlines() if l.strip()]
- return []
- except Exception:
- return []
-
-
-@app.route("/startnet")
-def startnet_editor():
- wim_exists = os.path.isfile(BOOT_WIM)
- content = ""
- wim_info = {}
-
- if wim_exists:
- content = _wim_extract_startnet(BOOT_WIM) or ""
- # Get WIM info
- try:
- result = subprocess.run(
- ["wiminfo", BOOT_WIM],
- capture_output=True, text=True, timeout=15,
- )
- if result.returncode == 0:
- for line in result.stdout.splitlines():
- if ":" in line:
- key, _, val = line.partition(":")
- wim_info[key.strip()] = val.strip()
- except Exception:
- pass
-
- return render_template(
- "startnet_editor.html",
- wim_exists=wim_exists,
- wim_path=BOOT_WIM,
- content=content,
- wim_info=wim_info,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-@app.route("/startnet/save", methods=["POST"])
-def startnet_save():
- if not os.path.isfile(BOOT_WIM):
- flash("boot.wim not found.", "danger")
- return redirect(url_for("startnet_editor"))
-
- content = request.form.get("content", "")
- ok, err = _wim_update_startnet(BOOT_WIM, content)
- if ok:
- audit("STARTNET_SAVE", "boot.wim updated")
- flash("startnet.cmd updated successfully in boot.wim.", "success")
- else:
- flash(f"Failed to update boot.wim: {err}", "danger")
- return redirect(url_for("startnet_editor"))
-
-
-# ---------------------------------------------------------------------------
-# Routes — Audit Log
-# ---------------------------------------------------------------------------
-
-@app.route("/audit")
-def audit_log():
- entries = []
- if os.path.isfile(AUDIT_LOG):
- with open(AUDIT_LOG, "r") as fh:
- for line in fh:
- entries.append(line.strip())
- entries.reverse() # newest first
- return render_template(
- "audit.html",
- entries=entries,
- image_types=IMAGE_TYPES,
- friendly_names=FRIENDLY_NAMES,
- )
-
-
-# ---------------------------------------------------------------------------
-# Routes — API
-# ---------------------------------------------------------------------------
-
-@app.route("/api/services")
-def api_services():
- services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
- return jsonify(services)
-
-
-@app.route("/api/images")
-def api_images():
- images = [image_status(it) for it in IMAGE_TYPES]
- return jsonify(images)
-
-
-@app.route("/api/images//unattend", methods=["POST"])
-def api_save_unattend(image_type):
- if image_type not in IMAGE_TYPES:
- return jsonify({"error": "Unknown image type"}), 404
-
- xml_file = unattend_path(image_type)
- payload = request.get_json(silent=True)
-
- if not payload:
- return jsonify({"error": "No JSON body provided"}), 400
-
- if "raw_xml" in payload:
- raw_xml = payload["raw_xml"]
- try:
- etree.fromstring(raw_xml.encode("utf-8"))
- except etree.XMLSyntaxError as exc:
- return jsonify({"error": f"Invalid XML: {exc}"}), 400
- xml_content = raw_xml
- else:
- try:
- xml_content = build_unattend_xml(payload)
- except Exception as exc:
- return jsonify({"error": f"Failed to build XML: {exc}"}), 400
-
- try:
- os.makedirs(os.path.dirname(xml_file), exist_ok=True)
- with open(xml_file, "w", encoding="utf-8") as fh:
- fh.write(xml_content)
- except Exception as exc:
- return jsonify({"error": f"Failed to write file: {exc}"}), 500
-
- audit("UNATTEND_SAVE_API", image_type)
- return jsonify({"status": "ok", "path": xml_file})
-
-
-# ---------------------------------------------------------------------------
-# Template filters
-# ---------------------------------------------------------------------------
-
-@app.template_filter("timestamp_fmt")
-def timestamp_fmt(ts):
- """Format a Unix timestamp to a human-readable date string."""
- return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
-
-
-# ---------------------------------------------------------------------------
-# Context processor — make data available to all templates
-# ---------------------------------------------------------------------------
-
-@app.context_processor
-def inject_globals():
- return {
- "all_image_types": IMAGE_TYPES,
- "all_friendly_names": FRIENDLY_NAMES,
- }
-
-
-# ---------------------------------------------------------------------------
-# Main
-# ---------------------------------------------------------------------------
-
-if __name__ == "__main__":
- app.run(host="127.0.0.1", port=9010, debug=False, threaded=True)
+#!/usr/bin/env python3
+"""Flask web application for managing a GE Aerospace PXE server.
+
+This file is the route surface; most logic lives in ``services/``:
+ services.audit - audit log writer
+ services.csrf - session CSRF token + before_request validator
+ services.fs - path helpers + JSON load/save
+ services.system - systemd service status + USB mounts
+ services.images - image_status + load_image_config
+ services.deploy - import_deploy + merge_tree + symlink dance
+ services.unattend - parse + build + form-extract for unattend.xml
+ services.wim - boot.wim startnet.cmd extract/update via wimtools
+"""
+
+import json
+import os
+import shutil
+from datetime import datetime
+from pathlib import Path
+
+from flask import (
+ Flask,
+ abort,
+ flash,
+ jsonify,
+ redirect,
+ render_template,
+ request,
+ send_file,
+ url_for,
+)
+from lxml import etree
+from werkzeug.utils import secure_filename
+
+import config
+from services import deploy, fs, images, system, unattend, wim
+from services.audit import audit
+from services.csrf import init_csrf
+
+app = Flask(__name__)
+app.secret_key = config.FLASK_SECRET_KEY
+app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH
+
+init_csrf(app)
+
+
+# ---------------------------------------------------------------------------
+# Routes - pages
+# ---------------------------------------------------------------------------
+
+@app.route("/")
+def dashboard():
+ image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
+ services = [system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
+ return render_template(
+ "dashboard.html",
+ images=image_list,
+ services=services,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+@app.route("/images/import", methods=["GET", "POST"])
+def images_import():
+ usb_mounts = system.find_usb_mounts()
+ upload_sources = system.find_upload_sources()
+ image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
+
+ if request.method == "POST":
+ source = request.form.get("source", "")
+ target = request.form.get("target", "")
+
+ if not source or not target:
+ flash("Please select both a source and a target image type.", "danger")
+ return redirect(url_for("images_import"))
+
+ if target not in config.IMAGE_TYPES:
+ flash("Invalid target image type.", "danger")
+ return redirect(url_for("images_import"))
+
+ if not deploy.allowed_import_source(source):
+ flash("Source path is not a valid import location.", "danger")
+ return redirect(url_for("images_import"))
+
+ if not os.path.isdir(source):
+ flash(f"Source path does not exist: {source}", "danger")
+ return redirect(url_for("images_import"))
+
+ root = fs.image_root(target)
+ dest = fs.deploy_path(target)
+ try:
+ os.makedirs(dest, exist_ok=True)
+ src_items = os.listdir(source)
+
+ # Move files from network upload to save disk space; copy from USB.
+ use_move = source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/")
+ _transfer = shutil.move if use_move else shutil.copy2
+ _transfer_tree = shutil.move if use_move else shutil.copytree
+
+ top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))}
+ full_layout = "Deploy" in top_dirs
+
+ if full_layout:
+ shared_root = []
+ for prefix, dirs in config.SHARED_ROOT_DIRS.items():
+ if target.startswith(prefix):
+ shared_root = dirs
+ break
+
+ for item in src_items:
+ src_item = os.path.join(source, item)
+ if item == "Deploy":
+ deploy.import_deploy(src_item, dest, target, move=use_move)
+ elif os.path.isdir(src_item) and item in shared_root:
+ prefix_key = target.split("-")[0] + "-"
+ shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}")
+ os.makedirs(shared_dest, exist_ok=True)
+ deploy._merge_tree(src_item, shared_dest, move=use_move)
+ dst_item = os.path.join(root, item)
+ if os.path.islink(dst_item):
+ os.remove(dst_item)
+ elif os.path.isdir(dst_item):
+ shutil.rmtree(dst_item)
+ os.symlink(shared_dest, dst_item)
+ elif os.path.isdir(src_item):
+ dst_item = os.path.join(root, item)
+ if os.path.exists(dst_item):
+ shutil.rmtree(dst_item)
+ _transfer_tree(src_item, dst_item)
+ else:
+ _transfer(src_item, os.path.join(root, item))
+ else:
+ deploy.import_deploy(source, dest, target, move=use_move)
+
+ # Ensure Media.tag exists (FlatSetupLoader.exe drive detection).
+ control_dir = os.path.join(dest, "Control")
+ os.makedirs(control_dir, exist_ok=True)
+ media_tag = os.path.join(control_dir, "Media.tag")
+ Path(media_tag).touch()
+
+ audit("IMAGE_IMPORT", f"{source} -> {target}")
+ flash(
+ f"Successfully imported content to {config.FRIENDLY_NAMES.get(target, target)}.",
+ "success",
+ )
+ except Exception as exc:
+ flash(f"Import failed: {exc}", "danger")
+
+ return redirect(url_for("images_import"))
+
+ return render_template(
+ "import.html",
+ usb_mounts=usb_mounts,
+ upload_sources=upload_sources,
+ images=image_list,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+@app.route("/images//unattend", methods=["GET", "POST"])
+def unattend_editor(image_type):
+ if image_type not in config.IMAGE_TYPES:
+ flash("Unknown image type.", "danger")
+ return redirect(url_for("dashboard"))
+
+ xml_file = fs.unattend_path(image_type)
+
+ if request.method == "POST":
+ save_mode = request.form.get("save_mode", "form")
+
+ if save_mode == "raw":
+ raw_xml = request.form.get("raw_xml", "")
+ try:
+ etree.fromstring(raw_xml.encode("utf-8"))
+ except etree.XMLSyntaxError as exc:
+ flash(f"Invalid XML: {exc}", "danger")
+ data = unattend.parse_unattend(xml_file)
+ data["raw_xml"] = raw_xml
+ return render_template(
+ "unattend_editor.html",
+ image_type=image_type,
+ friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
+ data=data,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+ xml_content = raw_xml
+ else:
+ form_data = unattend.extract_form_data(request.form)
+ xml_content = unattend.build_unattend_xml(form_data)
+
+ try:
+ os.makedirs(os.path.dirname(xml_file), exist_ok=True)
+ with open(xml_file, "w", encoding="utf-8") as fh:
+ fh.write(xml_content)
+ audit("UNATTEND_SAVE", f"{image_type} ({save_mode})")
+ flash("unattend.xml saved successfully.", "success")
+ except Exception as exc:
+ flash(f"Failed to save: {exc}", "danger")
+
+ return redirect(url_for("unattend_editor", image_type=image_type))
+
+ data = unattend.parse_unattend(xml_file)
+ return render_template(
+ "unattend_editor.html",
+ image_type=image_type,
+ friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
+ data=data,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+@app.route("/images//config")
+def image_config(image_type):
+ if image_type not in config.IMAGE_TYPES:
+ flash("Unknown image type.", "danger")
+ return redirect(url_for("dashboard"))
+
+ cfg = images.load_image_config(image_type)
+ return render_template(
+ "image_config.html",
+ image_type=image_type,
+ friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
+ config=cfg,
+ )
+
+
+@app.route("/images//config/save", methods=["POST"])
+def image_config_save(image_type):
+ if image_type not in config.IMAGE_TYPES:
+ flash("Unknown image type.", "danger")
+ return redirect(url_for("dashboard"))
+
+ section = request.form.get("section", "")
+ payload = request.form.get("payload", "[]")
+ try:
+ data = json.loads(payload)
+ except json.JSONDecodeError:
+ flash("Invalid JSON payload.", "danger")
+ return redirect(url_for("image_config", image_type=image_type))
+
+ ctrl = fs.control_path(image_type)
+ tools = fs.tools_path(image_type)
+
+ try:
+ if section == "hardware_models":
+ us_file = os.path.join(tools, "user_selections.json")
+ us_raw = fs.load_json(us_file)
+ us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
+ us_data["HardwareModelSelection"] = data
+ fs.save_json(us_file, [us_data])
+ audit("CONFIG_SAVE", f"{image_type}/hardware_models")
+
+ elif section == "drivers":
+ clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
+ fs.save_json(os.path.join(ctrl, "HardwareDriver.json"), clean)
+ audit("CONFIG_SAVE", f"{image_type}/drivers")
+
+ elif section == "operating_systems":
+ clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
+ fs.save_json(os.path.join(ctrl, "OperatingSystem.json"), clean)
+ audit("CONFIG_SAVE", f"{image_type}/operating_systems")
+
+ elif section == "packages":
+ clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
+ fs.save_json(os.path.join(ctrl, "packages.json"), clean)
+ audit("CONFIG_SAVE", f"{image_type}/packages")
+
+ else:
+ flash(f"Unknown section: {section}", "danger")
+ return redirect(url_for("image_config", image_type=image_type))
+
+ flash(f"Saved {section.replace('_', ' ')} successfully.", "success")
+ except Exception as exc:
+ flash(f"Failed to save {section}: {exc}", "danger")
+
+ return redirect(url_for("image_config", image_type=image_type))
+
+
+# ---------------------------------------------------------------------------
+# Routes - Clonezilla Backups
+# ---------------------------------------------------------------------------
+
+@app.route("/backups")
+def clonezilla_backups():
+ backups = []
+ if os.path.isdir(config.CLONEZILLA_SHARE):
+ for f in sorted(os.listdir(config.CLONEZILLA_SHARE)):
+ fpath = os.path.join(config.CLONEZILLA_SHARE, f)
+ if os.path.isfile(fpath) and f.lower().endswith(".zip"):
+ stat = os.stat(fpath)
+ backups.append({
+ "filename": f,
+ "machine": os.path.splitext(f)[0],
+ "size": stat.st_size,
+ "modified": stat.st_mtime,
+ })
+ return render_template(
+ "backups.html",
+ backups=backups,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+@app.route("/backups/upload", methods=["POST"])
+def clonezilla_upload():
+ if "backup_file" not in request.files:
+ flash("No file selected.", "danger")
+ return redirect(url_for("clonezilla_backups"))
+
+ f = request.files["backup_file"]
+ if not f.filename:
+ flash("No file selected.", "danger")
+ return redirect(url_for("clonezilla_backups"))
+
+ filename = secure_filename(f.filename)
+ if not filename.lower().endswith(".zip"):
+ flash("Only .zip files are accepted.", "danger")
+ return redirect(url_for("clonezilla_backups"))
+
+ os.makedirs(config.CLONEZILLA_SHARE, exist_ok=True)
+ dest = os.path.join(config.CLONEZILLA_SHARE, filename)
+ f.save(dest)
+ audit("BACKUP_UPLOAD", filename)
+ flash(f"Uploaded {filename} successfully.", "success")
+ return redirect(url_for("clonezilla_backups"))
+
+
+@app.route("/backups/download/")
+def clonezilla_download(filename):
+ filename = secure_filename(filename)
+ fpath = os.path.join(config.CLONEZILLA_SHARE, filename)
+ if not os.path.isfile(fpath):
+ flash(f"Backup not found: {filename}", "danger")
+ return redirect(url_for("clonezilla_backups"))
+ return send_file(fpath, as_attachment=True)
+
+
+@app.route("/backups/delete/", methods=["POST"])
+def clonezilla_delete(filename):
+ filename = secure_filename(filename)
+ fpath = os.path.join(config.CLONEZILLA_SHARE, filename)
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ audit("BACKUP_DELETE", filename)
+ flash(f"Deleted {filename}.", "success")
+ else:
+ flash(f"Backup not found: {filename}", "danger")
+ return redirect(url_for("clonezilla_backups"))
+
+
+# ---------------------------------------------------------------------------
+# Routes - Blancco Reports
+# ---------------------------------------------------------------------------
+
+@app.route("/reports")
+def blancco_reports():
+ reports = []
+ if os.path.isdir(config.BLANCCO_REPORTS):
+ for f in sorted(os.listdir(config.BLANCCO_REPORTS), reverse=True):
+ fpath = os.path.join(config.BLANCCO_REPORTS, f)
+ if os.path.isfile(fpath):
+ stat = os.stat(fpath)
+ ext = os.path.splitext(f)[1].lower()
+ reports.append({
+ "filename": f,
+ "size": stat.st_size,
+ "modified": stat.st_mtime,
+ "type": ext.lstrip(".").upper() or "FILE",
+ })
+ return render_template(
+ "reports.html",
+ reports=reports,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+@app.route("/reports/download/")
+def blancco_download_report(filename):
+ filename = secure_filename(filename)
+ fpath = os.path.join(config.BLANCCO_REPORTS, filename)
+ if not os.path.isfile(fpath):
+ flash(f"Report not found: {filename}", "danger")
+ return redirect(url_for("blancco_reports"))
+ return send_file(fpath, as_attachment=True)
+
+
+@app.route("/reports/delete/", methods=["POST"])
+def blancco_delete_report(filename):
+ filename = secure_filename(filename)
+ fpath = os.path.join(config.BLANCCO_REPORTS, filename)
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ audit("REPORT_DELETE", filename)
+ flash(f"Deleted {filename}.", "success")
+ else:
+ flash(f"Report not found: {filename}", "danger")
+ return redirect(url_for("blancco_reports"))
+
+
+# ---------------------------------------------------------------------------
+# Routes - Enrollment Packages
+# ---------------------------------------------------------------------------
+
+@app.route("/enrollment")
+def enrollment():
+ packages = []
+ if os.path.isdir(config.ENROLLMENT_SHARE):
+ for f in sorted(os.listdir(config.ENROLLMENT_SHARE)):
+ fpath = os.path.join(config.ENROLLMENT_SHARE, f)
+ if os.path.isfile(fpath) and f.lower().endswith(".ppkg"):
+ stat = os.stat(fpath)
+ packages.append({
+ "filename": f,
+ "size": stat.st_size,
+ "modified": stat.st_mtime,
+ })
+ return render_template(
+ "enrollment.html",
+ packages=packages,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+@app.route("/enrollment/upload", methods=["POST"])
+def enrollment_upload():
+ if "ppkg_file" not in request.files:
+ flash("No file selected.", "danger")
+ return redirect(url_for("enrollment"))
+
+ f = request.files["ppkg_file"]
+ if not f.filename:
+ flash("No file selected.", "danger")
+ return redirect(url_for("enrollment"))
+
+ filename = secure_filename(f.filename)
+ if not filename.lower().endswith(".ppkg"):
+ flash("Only .ppkg files are accepted.", "danger")
+ return redirect(url_for("enrollment"))
+
+ os.makedirs(config.ENROLLMENT_SHARE, exist_ok=True)
+ dest = os.path.join(config.ENROLLMENT_SHARE, filename)
+ f.save(dest)
+ audit("ENROLLMENT_UPLOAD", filename)
+ flash(f"Uploaded {filename} successfully.", "success")
+ return redirect(url_for("enrollment"))
+
+
+@app.route("/enrollment/download/")
+def enrollment_download(filename):
+ filename = secure_filename(filename)
+ fpath = os.path.join(config.ENROLLMENT_SHARE, filename)
+ if not os.path.isfile(fpath):
+ flash(f"Package not found: {filename}", "danger")
+ return redirect(url_for("enrollment"))
+ return send_file(fpath, as_attachment=True)
+
+
+@app.route("/enrollment/delete/", methods=["POST"])
+def enrollment_delete(filename):
+ filename = secure_filename(filename)
+ fpath = os.path.join(config.ENROLLMENT_SHARE, filename)
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ audit("ENROLLMENT_DELETE", filename)
+ flash(f"Deleted {filename}.", "success")
+ else:
+ flash(f"Package not found: {filename}", "danger")
+ return redirect(url_for("enrollment"))
+
+
+# ---------------------------------------------------------------------------
+# Routes - startnet.cmd Editor (boot.wim)
+# ---------------------------------------------------------------------------
+
+@app.route("/startnet")
+def startnet_editor():
+ import subprocess
+ wim_exists = os.path.isfile(config.BOOT_WIM)
+ content = ""
+ wim_info = {}
+
+ if wim_exists:
+ content = wim.extract_startnet(config.BOOT_WIM) or ""
+ try:
+ result = subprocess.run(
+ ["wiminfo", config.BOOT_WIM],
+ capture_output=True, text=True, timeout=15,
+ )
+ if result.returncode == 0:
+ for line in result.stdout.splitlines():
+ if ":" in line:
+ key, _, val = line.partition(":")
+ wim_info[key.strip()] = val.strip()
+ except Exception:
+ pass
+
+ return render_template(
+ "startnet_editor.html",
+ wim_exists=wim_exists,
+ wim_path=config.BOOT_WIM,
+ content=content,
+ wim_info=wim_info,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+@app.route("/startnet/save", methods=["POST"])
+def startnet_save():
+ if not os.path.isfile(config.BOOT_WIM):
+ flash("boot.wim not found.", "danger")
+ return redirect(url_for("startnet_editor"))
+
+ content = request.form.get("content", "")
+ ok, err = wim.update_startnet(config.BOOT_WIM, content)
+ if ok:
+ audit("STARTNET_SAVE", "boot.wim updated")
+ flash("startnet.cmd updated successfully in boot.wim.", "success")
+ else:
+ flash(f"Failed to update boot.wim: {err}", "danger")
+ return redirect(url_for("startnet_editor"))
+
+
+# ---------------------------------------------------------------------------
+# Routes - Audit Log
+# ---------------------------------------------------------------------------
+
+@app.route("/audit")
+def audit_log():
+ entries = []
+ if os.path.isfile(config.AUDIT_LOG):
+ with open(config.AUDIT_LOG, "r") as fh:
+ for line in fh:
+ entries.append(line.strip())
+ entries.reverse()
+ return render_template(
+ "audit.html",
+ entries=entries,
+ image_types=config.IMAGE_TYPES,
+ friendly_names=config.FRIENDLY_NAMES,
+ )
+
+
+# ---------------------------------------------------------------------------
+# Routes - JSON API
+# ---------------------------------------------------------------------------
+
+@app.route("/api/services")
+def api_services():
+ services = {s: system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
+ return jsonify(services)
+
+
+@app.route("/api/images")
+def api_images():
+ image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
+ return jsonify(image_list)
+
+
+@app.route("/api/images//unattend", methods=["POST"])
+def api_save_unattend(image_type):
+ if image_type not in config.IMAGE_TYPES:
+ return jsonify({"error": "Unknown image type"}), 404
+
+ xml_file = fs.unattend_path(image_type)
+ payload = request.get_json(silent=True)
+
+ if not payload:
+ return jsonify({"error": "No JSON body provided"}), 400
+
+ if "raw_xml" in payload:
+ raw_xml = payload["raw_xml"]
+ try:
+ etree.fromstring(raw_xml.encode("utf-8"))
+ except etree.XMLSyntaxError as exc:
+ return jsonify({"error": f"Invalid XML: {exc}"}), 400
+ xml_content = raw_xml
+ else:
+ try:
+ xml_content = unattend.build_unattend_xml(payload)
+ except Exception as exc:
+ return jsonify({"error": f"Failed to build XML: {exc}"}), 400
+
+ try:
+ os.makedirs(os.path.dirname(xml_file), exist_ok=True)
+ with open(xml_file, "w", encoding="utf-8") as fh:
+ fh.write(xml_content)
+ except Exception as exc:
+ return jsonify({"error": f"Failed to write file: {exc}"}), 500
+
+ audit("UNATTEND_SAVE_API", image_type)
+ return jsonify({"status": "ok", "path": xml_file})
+
+
+# ---------------------------------------------------------------------------
+# Template helpers
+# ---------------------------------------------------------------------------
+
+@app.template_filter("timestamp_fmt")
+def timestamp_fmt(ts):
+ """Format a Unix timestamp to a human-readable date string."""
+ return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
+
+
+@app.context_processor
+def inject_globals():
+ return {
+ "all_image_types": config.IMAGE_TYPES,
+ "all_friendly_names": config.FRIENDLY_NAMES,
+ }
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+if __name__ == "__main__":
+ app.run(host="127.0.0.1", port=9010, debug=False, threaded=True)
diff --git a/webapp/config.py b/webapp/config.py
new file mode 100644
index 0000000..249ec61
--- /dev/null
+++ b/webapp/config.py
@@ -0,0 +1,66 @@
+"""Configuration constants for the PXE webapp.
+
+Reads from environment variables with sensible defaults that match the
+Ansible playbook's deploy paths. Also defines the canonical image type
+list and shared-directory taxonomy used by the import pipeline.
+"""
+
+import os
+
+# --- Filesystem paths --------------------------------------------------------
+SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
+CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla")
+BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports")
+ENROLLMENT_SHARE = os.environ.get("ENROLLMENT_SHARE", "/srv/samba/enrollment")
+UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload")
+SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared")
+WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html")
+BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim")
+AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log")
+
+# --- Flask -------------------------------------------------------------------
+FLASK_SECRET_KEY = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
+MAX_CONTENT_LENGTH = 16 * 1024 * 1024 * 1024 # 16 GB max upload
+
+# --- Image taxonomy ----------------------------------------------------------
+# Subdirs inside Deploy/ shared across ALL image types.
+SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"]
+
+# Subdirs inside Deploy/ shared within the same image family (by prefix).
+SHARED_DEPLOY_SCOPED = {
+ "gea-": ["Operating Systems", "Packages"],
+ "ge-": ["Operating Systems", "Packages"],
+}
+
+# Sibling dirs at image root shared within the same image family.
+SHARED_ROOT_DIRS = {
+ "gea-": ["Sources"],
+ "ge-": ["Sources"],
+}
+
+IMAGE_TYPES = [
+ "gea-standard",
+ "gea-engineer",
+ "gea-shopfloor",
+ "gea-shopfloor-mce",
+ "ge-standard",
+ "ge-engineer",
+ "ge-shopfloor-lockdown",
+ "ge-shopfloor-mce",
+]
+
+FRIENDLY_NAMES = {
+ "gea-standard": "GE Aerospace Standard",
+ "gea-engineer": "GE Aerospace Engineer",
+ "gea-shopfloor": "GE Aerospace Shop Floor",
+ "gea-shopfloor-mce": "GE Aerospace Shop Floor MCE",
+ "ge-standard": "GE Legacy Standard",
+ "ge-engineer": "GE Legacy Engineer",
+ "ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown",
+ "ge-shopfloor-mce": "GE Legacy Shop Floor MCE",
+}
+
+# --- Unattend XML namespaces -------------------------------------------------
+UNATTEND_NS = "urn:schemas-microsoft-com:unattend"
+WCM_NS = "http://schemas.microsoft.com/WMIConfig/2002/State"
+NSMAP = {None: UNATTEND_NS, "wcm": WCM_NS}
diff --git a/webapp/services/__init__.py b/webapp/services/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/webapp/services/audit.py b/webapp/services/audit.py
new file mode 100644
index 0000000..e872e84
--- /dev/null
+++ b/webapp/services/audit.py
@@ -0,0 +1,26 @@
+"""Audit logging for the PXE webapp.
+
+Every write-action route should call ``audit(action, detail)`` to record
+who did what. The log lives at ``AUDIT_LOG`` (configurable via env var)
+and is consumed by the /audit page.
+"""
+
+import logging
+
+from flask import request
+
+import config
+
+_audit_handler = logging.FileHandler(config.AUDIT_LOG, mode="a")
+_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
+
+audit_logger = logging.getLogger("pxe_audit")
+audit_logger.setLevel(logging.INFO)
+if not audit_logger.handlers:
+ audit_logger.addHandler(_audit_handler)
+
+
+def audit(action, detail=""):
+ """Write an entry to the audit log."""
+ ip = request.remote_addr if request else "system"
+ audit_logger.info(f"[{ip}] {action}: {detail}")
diff --git a/webapp/services/csrf.py b/webapp/services/csrf.py
new file mode 100644
index 0000000..b1a60d3
--- /dev/null
+++ b/webapp/services/csrf.py
@@ -0,0 +1,28 @@
+"""Session-based CSRF token: generate per-session, double-submit on POST."""
+
+import secrets
+
+from flask import abort, request, session
+
+
+def generate_csrf_token():
+ """Return the CSRF token for the current session, creating one if needed."""
+ if "_csrf_token" not in session:
+ session["_csrf_token"] = secrets.token_hex(32)
+ return session["_csrf_token"]
+
+
+def init_csrf(app):
+ """Wire CSRF protection into a Flask app: validator + template helper."""
+
+ @app.before_request
+ def _validate_csrf():
+ if request.method != "POST":
+ return
+ token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token")
+ if not token or token != generate_csrf_token():
+ abort(403)
+
+ @app.context_processor
+ def _inject_csrf_token():
+ return {"csrf_token": generate_csrf_token}
diff --git a/webapp/services/deploy.py b/webapp/services/deploy.py
new file mode 100644
index 0000000..629758f
--- /dev/null
+++ b/webapp/services/deploy.py
@@ -0,0 +1,96 @@
+"""Image deploy import logic: copy/move from a USB or upload-dir source
+into ``SAMBA_SHARE//Deploy/`` while merging shared subdirs
+(``Out-of-box Drivers`` etc.) into ``SAMBA_SHARE/_shared/`` and replacing
+the per-image copies with symlinks. This is what lets two image types
+re-use the same multi-GB driver tree without doubling disk usage.
+"""
+
+import os
+import shutil
+
+import config
+from services.system import find_usb_mounts
+
+
+def _replace_with_symlink(link_path, target_path):
+ """Replace a file/dir/symlink at link_path with a symlink to target_path."""
+ if os.path.islink(link_path):
+ os.remove(link_path)
+ elif os.path.isdir(link_path):
+ shutil.rmtree(link_path)
+ os.symlink(target_path, link_path)
+
+
+def _merge_tree(src, dst, move=False):
+ """Recursively merge src tree into dst, overwriting existing files.
+
+ When move=True, files are moved instead of copied (saves disk space
+ on imports from the local upload-dir).
+ """
+ _transfer = shutil.move if move else shutil.copy2
+ _transfer_tree = shutil.move if move else shutil.copytree
+ for item in os.listdir(src):
+ s = os.path.join(src, item)
+ d = os.path.join(dst, item)
+ if os.path.isdir(s):
+ if os.path.isdir(d):
+ _merge_tree(s, d, move=move)
+ else:
+ if os.path.exists(d):
+ os.remove(d)
+ _transfer_tree(s, d)
+ else:
+ os.makedirs(os.path.dirname(d), exist_ok=True)
+ _transfer(s, d)
+
+
+def import_deploy(src_deploy, dst_deploy, target="", move=False):
+ """Import Deploy/ contents, redirecting shared subdirs into _shared/."""
+ scoped_shared = []
+ prefix_key = ""
+ for prefix, dirs in config.SHARED_DEPLOY_SCOPED.items():
+ if target.startswith(prefix):
+ scoped_shared = dirs
+ prefix_key = prefix
+ break
+
+ _transfer = shutil.move if move else shutil.copy2
+ _transfer_tree = shutil.move if move else shutil.copytree
+
+ os.makedirs(dst_deploy, exist_ok=True)
+ for item in os.listdir(src_deploy):
+ src_item = os.path.join(src_deploy, item)
+ dst_item = os.path.join(dst_deploy, item)
+
+ if not os.path.isdir(src_item):
+ _transfer(src_item, dst_item)
+ continue
+
+ if item in config.SHARED_DEPLOY_GLOBAL:
+ shared_dest = os.path.join(config.SHARED_DIR, item)
+ os.makedirs(shared_dest, exist_ok=True)
+ _merge_tree(src_item, shared_dest, move=move)
+ _replace_with_symlink(dst_item, shared_dest)
+ continue
+
+ if item in scoped_shared:
+ shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}")
+ os.makedirs(shared_dest, exist_ok=True)
+ _merge_tree(src_item, shared_dest, move=move)
+ _replace_with_symlink(dst_item, shared_dest)
+ continue
+
+ if os.path.isdir(dst_item):
+ _merge_tree(src_item, dst_item, move=move)
+ else:
+ _transfer_tree(src_item, dst_item)
+
+
+def allowed_import_source(source):
+ """True if source is a USB mount or under the upload dir."""
+ usb = find_usb_mounts()
+ if any(source == m or source.startswith(m + "/") for m in usb):
+ return True
+ if source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/"):
+ return os.path.isdir(source)
+ return False
diff --git a/webapp/services/fs.py b/webapp/services/fs.py
new file mode 100644
index 0000000..0031a25
--- /dev/null
+++ b/webapp/services/fs.py
@@ -0,0 +1,75 @@
+"""Filesystem path helpers + tiny JSON load/save utilities.
+
+All paths are derived from ``config.SAMBA_SHARE`` so swapping the share
+root only requires changing one env var.
+"""
+
+import json
+import os
+
+import config
+
+
+def image_root(image_type):
+ """Return the root directory for an image type."""
+ return os.path.join(config.SAMBA_SHARE, image_type)
+
+
+def deploy_path(image_type):
+ """Return the Deploy directory for an image type."""
+ return os.path.join(config.SAMBA_SHARE, image_type, "Deploy")
+
+
+def unattend_path(image_type):
+ """Return the unattend.xml path for an image type."""
+ return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml")
+
+
+def control_path(image_type):
+ """Return the Deploy/Control directory for an image type."""
+ return os.path.join(deploy_path(image_type), "Control")
+
+
+def tools_path(image_type):
+ """Return the Tools directory for an image type."""
+ return os.path.join(config.SAMBA_SHARE, image_type, "Tools")
+
+
+def load_json(filepath):
+ """Parse a JSON file and return its contents, or [] on failure."""
+ try:
+ with open(filepath, "r", encoding="utf-8-sig") as fh:
+ return json.load(fh)
+ except (OSError, json.JSONDecodeError):
+ return []
+
+
+def save_json(filepath, data):
+ """Write data as pretty-printed JSON, creating parent dirs as needed."""
+ os.makedirs(os.path.dirname(filepath), exist_ok=True)
+ with open(filepath, "w", encoding="utf-8") as fh:
+ json.dump(data, fh, indent=2, ensure_ascii=False)
+ fh.write("\n")
+
+
+def resolve_destination(dest_dir, image_type):
+ """Convert a Windows ``*destinationdir*`` path to a Linux filesystem path.
+
+ Replaces the placeholder + backslashes, prepends
+ ``SAMBA_SHARE/image_type/``, then resolves symlinks so shared dirs
+ are followed.
+ """
+ if not dest_dir:
+ return ""
+ path = dest_dir
+ lower = path.lower()
+ idx = lower.find("*destinationdir*")
+ if idx != -1:
+ path = path[idx + len("*destinationdir*"):]
+ path = path.replace("\\", "/").lstrip("/")
+ full = os.path.join(config.SAMBA_SHARE, image_type, path)
+ try:
+ full = os.path.realpath(full)
+ except OSError:
+ pass
+ return full
diff --git a/webapp/services/images.py b/webapp/services/images.py
new file mode 100644
index 0000000..1fc137f
--- /dev/null
+++ b/webapp/services/images.py
@@ -0,0 +1,123 @@
+"""Per-image-type state probes: status + config (drivers / OS / packages / models)."""
+
+import os
+
+import config
+from services import fs
+
+
+def image_status(image_type):
+ """Return a dict describing the state of an image type."""
+ dp = fs.deploy_path(image_type)
+ up = fs.unattend_path(image_type)
+ has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
+ has_unattend = os.path.isfile(up)
+ return {
+ "image_type": image_type,
+ "friendly_name": config.FRIENDLY_NAMES.get(image_type, image_type),
+ "deploy_path": dp,
+ "has_content": has_content,
+ "has_unattend": has_unattend,
+ }
+
+
+def load_image_config(image_type):
+ """Load all JSON configs for an image and check on-disk presence."""
+ ctrl = fs.control_path(image_type)
+ tools = fs.tools_path(image_type)
+
+ # --- Drivers (merge HardwareDriver.json + hw_drivers.json) ---
+ hw_driver_file = os.path.join(ctrl, "HardwareDriver.json")
+ hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json")
+ drivers_raw = fs.load_json(hw_driver_file)
+ extra_raw = fs.load_json(hw_drivers_extra)
+
+ seen_files = set()
+ drivers = []
+ for d in drivers_raw + extra_raw:
+ fname = (d.get("FileName") or d.get("fileName") or "").lower()
+ if fname and fname in seen_files:
+ continue
+ if fname:
+ seen_files.add(fname)
+ drivers.append(d)
+
+ for d in drivers:
+ fname = d.get("FileName") or d.get("fileName") or ""
+ dest = d.get("DestinationDir") or d.get("destinationDir") or ""
+ resolved = fs.resolve_destination(dest, image_type)
+ if resolved and fname:
+ d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
+ else:
+ d["_on_disk"] = False
+
+ # --- Operating Systems ---
+ os_file = os.path.join(ctrl, "OperatingSystem.json")
+ operating_systems = fs.load_json(os_file)
+ for entry in operating_systems:
+ osv = entry.get("operatingSystemVersion", {})
+ wim = osv.get("wim", {})
+ dest = wim.get("DestinationDir") or wim.get("destinationDir") or ""
+ resolved = fs.resolve_destination(dest, image_type)
+ if resolved:
+ entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim"))
+ else:
+ entry["_on_disk"] = False
+
+ # --- Packages ---
+ pkg_file = os.path.join(ctrl, "packages.json")
+ packages = fs.load_json(pkg_file)
+ for p in packages:
+ fname = p.get("fileName") or p.get("FileName") or ""
+ dest = p.get("destinationDir") or p.get("DestinationDir") or ""
+ resolved = fs.resolve_destination(dest, image_type)
+ if resolved and fname:
+ p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
+ else:
+ p["_on_disk"] = False
+
+ # --- Hardware Models (user_selections.json) ---
+ us_file = os.path.join(tools, "user_selections.json")
+ us_raw = fs.load_json(us_file)
+ us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
+ hardware_models = us_data.get("HardwareModelSelection", [])
+ os_selection = str(us_data.get("OperatingSystemSelection", ""))
+
+ family_lookup = {}
+ for d in drivers:
+ family = d.get("family", "")
+ if family:
+ family_lookup[family] = d
+
+ for hm in hardware_models:
+ family_id = hm.get("Id", "")
+ matched = family_lookup.get(family_id)
+ hm["_on_disk"] = matched["_on_disk"] if matched else False
+
+ # --- Orphan drivers: zip files on disk not referenced in any JSON ---
+ orphan_drivers = []
+ oob_dir = os.path.join(fs.deploy_path(image_type), "Out-of-box Drivers")
+ try:
+ oob_dir = os.path.realpath(oob_dir)
+ except OSError:
+ pass
+ registered_files = set()
+ for d in drivers:
+ fname = d.get("FileName") or d.get("fileName") or ""
+ if fname:
+ registered_files.add(fname.lower())
+ if os.path.isdir(oob_dir):
+ for dirpath, _dirnames, filenames in os.walk(oob_dir):
+ for fn in filenames:
+ if fn.lower().endswith(".zip") and fn.lower() not in registered_files:
+ rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir)
+ orphan_drivers.append({"fileName": fn, "relPath": rel})
+
+ return {
+ "hardware_models": hardware_models,
+ "drivers": drivers,
+ "operating_systems": operating_systems,
+ "packages": packages,
+ "orphan_drivers": orphan_drivers,
+ "os_selection": os_selection,
+ }
diff --git a/webapp/services/system.py b/webapp/services/system.py
new file mode 100644
index 0000000..f926521
--- /dev/null
+++ b/webapp/services/system.py
@@ -0,0 +1,55 @@
+"""Host-system probes: systemd service status, USB mounts, upload sources."""
+
+import os
+import subprocess
+
+import config
+
+
+def service_status(service_name):
+ """Check whether a systemd service is active."""
+ try:
+ result = subprocess.run(
+ ["systemctl", "is-active", service_name],
+ capture_output=True,
+ text=True,
+ timeout=5,
+ )
+ state = result.stdout.strip()
+ return {"name": service_name, "active": state == "active", "state": state}
+ except Exception as exc:
+ return {"name": service_name, "active": False, "state": str(exc)}
+
+
+def find_usb_mounts():
+ """Return a list of mount-point paths that look like removable media."""
+ mounts = []
+ try:
+ with open("/proc/mounts", "r") as fh:
+ for line in fh:
+ parts = line.split()
+ if len(parts) >= 2:
+ mount_point = parts[1]
+ if mount_point.startswith(("/mnt/", "/media/")):
+ if os.path.isdir(mount_point):
+ mounts.append(mount_point)
+ except OSError:
+ pass
+ return sorted(set(mounts))
+
+
+def find_upload_sources():
+ """Return sub-directories inside UPLOAD_DIR that look like image content."""
+ sources = []
+ if os.path.isdir(config.UPLOAD_DIR):
+ try:
+ entries = os.listdir(config.UPLOAD_DIR)
+ if entries:
+ sources.append(config.UPLOAD_DIR)
+ for entry in entries:
+ full = os.path.join(config.UPLOAD_DIR, entry)
+ if os.path.isdir(full):
+ sources.append(full)
+ except OSError:
+ pass
+ return sources
diff --git a/webapp/services/unattend.py b/webapp/services/unattend.py
new file mode 100644
index 0000000..8308147
--- /dev/null
+++ b/webapp/services/unattend.py
@@ -0,0 +1,496 @@
+"""Unattend.xml parser, builder, and form-data extractor.
+
+Round-trips a Windows ``FlatUnattendW10.xml`` between the disk format
+and a flat dict the editor template can render. ``parse_unattend()``
+reads disk + returns a dict; ``build_unattend_xml()`` takes the dict
++ returns a string for writing back; ``extract_form_data()`` converts
+a Flask ``request.form`` MultiDict into the dict format that
+``build_unattend_xml()`` expects.
+"""
+
+import os
+
+from lxml import etree
+
+import config
+
+UNATTEND_TEMPLATE = """\
+
+
+
+
+
+
+
+"""
+
+
+def qn(tag):
+ """Return a tag qualified with the default unattend namespace."""
+ return f"{{{config.UNATTEND_NS}}}{tag}"
+
+
+def qwcm(attr):
+ """Return an attribute qualified with the wcm namespace."""
+ return f"{{{config.WCM_NS}}}{attr}"
+
+
+def _find_or_create(parent, tag):
+ """Find the first child with *tag* or create it."""
+ el = parent.find(tag, namespaces={"": config.UNATTEND_NS})
+ if el is None:
+ el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag)
+ return el
+
+
+def _settings_pass(root, pass_name):
+ """Return the element, creating if needed."""
+ for s in root.findall(qn("settings")):
+ if s.get("pass") == pass_name:
+ return s
+ s = etree.SubElement(root, qn("settings"))
+ s.set("pass", pass_name)
+ return s
+
+
+def parse_unattend(xml_path):
+ """Parse an unattend.xml and return a dict of editable data."""
+ data = {
+ "driver_paths": [],
+ "computer_name": "",
+ "registered_organization": "",
+ "registered_owner": "",
+ "time_zone": "",
+ "specialize_commands": [],
+ "oobe": {
+ "HideEULAPage": "true",
+ "HideOEMRegistrationScreen": "true",
+ "HideOnlineAccountScreens": "true",
+ "HideWirelessSetupInOOBE": "true",
+ "HideLocalAccountScreen": "true",
+ "NetworkLocation": "Work",
+ "ProtectYourPC": "3",
+ "SkipUserOOBE": "true",
+ "SkipMachineOOBE": "true",
+ },
+ "firstlogon_commands": [],
+ "user_accounts": [],
+ "autologon": {
+ "enabled": "",
+ "username": "",
+ "password": "",
+ "plain_text": "true",
+ "logon_count": "",
+ },
+ "intl": {
+ "input_locale": "",
+ "system_locale": "",
+ "ui_language": "",
+ "user_locale": "",
+ },
+ "oobe_timezone": "",
+ "raw_xml": "",
+ }
+
+ if not os.path.isfile(xml_path):
+ data["raw_xml"] = UNATTEND_TEMPLATE
+ return data
+
+ with open(xml_path, "r", encoding="utf-8") as fh:
+ raw = fh.read()
+ data["raw_xml"] = raw
+
+ try:
+ root = etree.fromstring(raw.encode("utf-8"))
+ except etree.XMLSyntaxError:
+ return data
+
+ ns = {"u": config.UNATTEND_NS}
+
+ # --- offlineServicing: DriverPaths ---
+ for dp_el in root.xpath(
+ "u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path",
+ namespaces=ns,
+ ):
+ if dp_el.text:
+ data["driver_paths"].append(dp_el.text.strip())
+
+ # --- specialize: Shell-Setup ---
+ for comp in root.xpath("u:settings[@pass='specialize']/u:component", namespaces=ns):
+ comp_name = comp.get("name", "")
+ if "Shell-Setup" in comp_name:
+ for tag, key in [
+ ("ComputerName", "computer_name"),
+ ("RegisteredOrganization", "registered_organization"),
+ ("RegisteredOwner", "registered_owner"),
+ ("TimeZone", "time_zone"),
+ ]:
+ el = comp.find(qn(tag))
+ if el is not None and el.text:
+ data[key] = el.text.strip()
+
+ # --- specialize: RunSynchronous commands ---
+ for cmd in root.xpath(
+ "u:settings[@pass='specialize']//u:RunSynchronousCommand",
+ namespaces=ns,
+ ):
+ order_el = cmd.find(qn("Order"))
+ path_el = cmd.find(qn("Path"))
+ desc_el = cmd.find(qn("Description"))
+ data["specialize_commands"].append({
+ "order": order_el.text.strip() if order_el is not None and order_el.text else "",
+ "path": path_el.text.strip() if path_el is not None and path_el.text else "",
+ "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
+ })
+
+ # --- oobeSystem ---
+ for comp in root.xpath("u:settings[@pass='oobeSystem']/u:component", namespaces=ns):
+ comp_name = comp.get("name", "")
+
+ if "International-Core" in comp_name:
+ for tag, key in [
+ ("InputLocale", "input_locale"),
+ ("SystemLocale", "system_locale"),
+ ("UILanguage", "ui_language"),
+ ("UserLocale", "user_locale"),
+ ]:
+ el = comp.find(qn(tag))
+ if el is not None and el.text:
+ data["intl"][key] = el.text.strip()
+
+ if "OOBE" in comp_name or "Shell-Setup" in comp_name:
+ oobe_el = comp.find(qn("OOBE"))
+ if oobe_el is not None:
+ for child in oobe_el:
+ local = etree.QName(child).localname
+ if local in data["oobe"] and child.text:
+ data["oobe"][local] = child.text.strip()
+
+ ua = comp.find(qn("UserAccounts"))
+ if ua is not None:
+ la_container = ua.find(qn("LocalAccounts"))
+ if la_container is not None:
+ for acct in la_container.findall(qn("LocalAccount")):
+ name_el = acct.find(qn("Name"))
+ group_el = acct.find(qn("Group"))
+ display_el = acct.find(qn("DisplayName"))
+ pw_el = acct.find(qn("Password"))
+ pw_val = ""
+ pw_plain = "true"
+ if pw_el is not None:
+ v = pw_el.find(qn("Value"))
+ p = pw_el.find(qn("PlainText"))
+ if v is not None and v.text:
+ pw_val = v.text.strip()
+ if p is not None and p.text:
+ pw_plain = p.text.strip()
+ data["user_accounts"].append({
+ "name": name_el.text.strip() if name_el is not None and name_el.text else "",
+ "password": pw_val,
+ "plain_text": pw_plain,
+ "group": group_el.text.strip() if group_el is not None and group_el.text else "Administrators",
+ "display_name": display_el.text.strip() if display_el is not None and display_el.text else "",
+ })
+
+ al = comp.find(qn("AutoLogon"))
+ if al is not None:
+ enabled_el = al.find(qn("Enabled"))
+ user_el = al.find(qn("Username"))
+ count_el = al.find(qn("LogonCount"))
+ pw_el = al.find(qn("Password"))
+ pw_val = ""
+ pw_plain = "true"
+ if pw_el is not None:
+ v = pw_el.find(qn("Value"))
+ p = pw_el.find(qn("PlainText"))
+ if v is not None and v.text:
+ pw_val = v.text.strip()
+ if p is not None and p.text:
+ pw_plain = p.text.strip()
+ data["autologon"] = {
+ "enabled": enabled_el.text.strip() if enabled_el is not None and enabled_el.text else "",
+ "username": user_el.text.strip() if user_el is not None and user_el.text else "",
+ "password": pw_val,
+ "plain_text": pw_plain,
+ "logon_count": count_el.text.strip() if count_el is not None and count_el.text else "",
+ }
+
+ flc = comp.find(qn("FirstLogonCommands"))
+ if flc is not None:
+ for sync in flc.findall(qn("SynchronousCommand")):
+ order_el = sync.find(qn("Order"))
+ cl_el = sync.find(qn("CommandLine"))
+ desc_el = sync.find(qn("Description"))
+ data["firstlogon_commands"].append({
+ "order": order_el.text.strip() if order_el is not None and order_el.text else "",
+ "commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "",
+ "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
+ })
+
+ tz_el = comp.find(qn("TimeZone"))
+ if tz_el is not None and tz_el.text:
+ data["oobe_timezone"] = tz_el.text.strip()
+
+ return data
+
+
+def build_unattend_xml(form_data):
+ """Build a complete unattend.xml string from form data dict."""
+ root = etree.Element(qn("unattend"), nsmap=config.NSMAP)
+
+ _settings_pass(root, "windowsPE")
+
+ # --- offlineServicing: DriverPaths ---
+ offline = _settings_pass(root, "offlineServicing")
+ driver_paths = form_data.get("driver_paths", [])
+ if driver_paths:
+ comp = etree.SubElement(offline, qn("component"))
+ comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE")
+ comp.set("processorArchitecture", "amd64")
+ comp.set("publicKeyToken", "31bf3856ad364e35")
+ comp.set("language", "neutral")
+ comp.set("versionScope", "nonSxS")
+ dp_container = etree.SubElement(comp, qn("DriverPaths"))
+ for idx, dp in enumerate(driver_paths, start=1):
+ if not dp.strip():
+ continue
+ pac = etree.SubElement(dp_container, qn("PathAndCredentials"))
+ pac.set(qwcm("action"), "add")
+ pac.set(qwcm("keyValue"), str(idx))
+ path_el = etree.SubElement(pac, qn("Path"))
+ path_el.text = dp.strip()
+
+ # --- specialize ---
+ spec = _settings_pass(root, "specialize")
+
+ shell_comp = etree.SubElement(spec, qn("component"))
+ shell_comp.set("name", "Microsoft-Windows-Shell-Setup")
+ shell_comp.set("processorArchitecture", "amd64")
+ shell_comp.set("publicKeyToken", "31bf3856ad364e35")
+ shell_comp.set("language", "neutral")
+ shell_comp.set("versionScope", "nonSxS")
+
+ for tag, key in [
+ ("ComputerName", "computer_name"),
+ ("RegisteredOrganization", "registered_organization"),
+ ("RegisteredOwner", "registered_owner"),
+ ("TimeZone", "time_zone"),
+ ]:
+ val = form_data.get(key, "").strip()
+ if val:
+ el = etree.SubElement(shell_comp, qn(tag))
+ el.text = val
+
+ spec_cmds = form_data.get("specialize_commands", [])
+ if spec_cmds:
+ deploy_comp = etree.SubElement(spec, qn("component"))
+ deploy_comp.set("name", "Microsoft-Windows-Deployment")
+ deploy_comp.set("processorArchitecture", "amd64")
+ deploy_comp.set("publicKeyToken", "31bf3856ad364e35")
+ deploy_comp.set("language", "neutral")
+ deploy_comp.set("versionScope", "nonSxS")
+ rs = etree.SubElement(deploy_comp, qn("RunSynchronous"))
+ for idx, cmd in enumerate(spec_cmds, start=1):
+ if not cmd.get("path", "").strip():
+ continue
+ rsc = etree.SubElement(rs, qn("RunSynchronousCommand"))
+ rsc.set(qwcm("action"), "add")
+ order_el = etree.SubElement(rsc, qn("Order"))
+ order_el.text = str(idx)
+ path_el = etree.SubElement(rsc, qn("Path"))
+ path_el.text = cmd["path"].strip()
+ desc_el = etree.SubElement(rsc, qn("Description"))
+ desc_el.text = cmd.get("description", "").strip()
+
+ # --- oobeSystem ---
+ oobe_settings = _settings_pass(root, "oobeSystem")
+
+ intl = form_data.get("intl", {})
+ if any(v.strip() for v in intl.values() if v):
+ intl_comp = etree.SubElement(oobe_settings, qn("component"))
+ intl_comp.set("name", "Microsoft-Windows-International-Core")
+ intl_comp.set("processorArchitecture", "amd64")
+ intl_comp.set("publicKeyToken", "31bf3856ad364e35")
+ intl_comp.set("language", "neutral")
+ intl_comp.set("versionScope", "nonSxS")
+ for tag, key in [
+ ("InputLocale", "input_locale"),
+ ("SystemLocale", "system_locale"),
+ ("UILanguage", "ui_language"),
+ ("UserLocale", "user_locale"),
+ ]:
+ val = intl.get(key, "").strip()
+ if val:
+ el = etree.SubElement(intl_comp, qn(tag))
+ el.text = val
+
+ oobe_comp = etree.SubElement(oobe_settings, qn("component"))
+ oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
+ oobe_comp.set("processorArchitecture", "amd64")
+ oobe_comp.set("publicKeyToken", "31bf3856ad364e35")
+ oobe_comp.set("language", "neutral")
+ oobe_comp.set("versionScope", "nonSxS")
+
+ oobe_el = etree.SubElement(oobe_comp, qn("OOBE"))
+ oobe_data = form_data.get("oobe", {})
+ for key in [
+ "HideEULAPage",
+ "HideOEMRegistrationScreen",
+ "HideOnlineAccountScreens",
+ "HideWirelessSetupInOOBE",
+ "HideLocalAccountScreen",
+ "NetworkLocation",
+ "ProtectYourPC",
+ "SkipUserOOBE",
+ "SkipMachineOOBE",
+ ]:
+ val = oobe_data.get(key, "")
+ if val:
+ child = etree.SubElement(oobe_el, qn(key))
+ child.text = str(val)
+
+ accounts = form_data.get("user_accounts", [])
+ if accounts:
+ ua = etree.SubElement(oobe_comp, qn("UserAccounts"))
+ la_container = etree.SubElement(ua, qn("LocalAccounts"))
+ for acct in accounts:
+ if not acct.get("name", "").strip():
+ continue
+ la = etree.SubElement(la_container, qn("LocalAccount"))
+ la.set(qwcm("action"), "add")
+ pw = etree.SubElement(la, qn("Password"))
+ pw_val = etree.SubElement(pw, qn("Value"))
+ pw_val.text = acct.get("password", "")
+ pw_plain = etree.SubElement(pw, qn("PlainText"))
+ pw_plain.text = acct.get("plain_text", "true")
+ name_el = etree.SubElement(la, qn("Name"))
+ name_el.text = acct["name"].strip()
+ group_el = etree.SubElement(la, qn("Group"))
+ group_el.text = acct.get("group", "Administrators").strip()
+ display_el = etree.SubElement(la, qn("DisplayName"))
+ display_el.text = acct.get("display_name", acct["name"]).strip()
+
+ autologon = form_data.get("autologon", {})
+ if autologon.get("username", "").strip():
+ al = etree.SubElement(oobe_comp, qn("AutoLogon"))
+ al_pw = etree.SubElement(al, qn("Password"))
+ al_pw_val = etree.SubElement(al_pw, qn("Value"))
+ al_pw_val.text = autologon.get("password", "")
+ al_pw_plain = etree.SubElement(al_pw, qn("PlainText"))
+ al_pw_plain.text = autologon.get("plain_text", "true")
+ al_enabled = etree.SubElement(al, qn("Enabled"))
+ al_enabled.text = autologon.get("enabled", "true")
+ al_user = etree.SubElement(al, qn("Username"))
+ al_user.text = autologon["username"].strip()
+ logon_count = autologon.get("logon_count", "").strip()
+ if logon_count:
+ al_count = etree.SubElement(al, qn("LogonCount"))
+ al_count.text = logon_count
+
+ fl_cmds = form_data.get("firstlogon_commands", [])
+ if fl_cmds:
+ flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands"))
+ for idx, cmd in enumerate(fl_cmds, start=1):
+ if not cmd.get("commandline", "").strip():
+ continue
+ sc = etree.SubElement(flc, qn("SynchronousCommand"))
+ sc.set(qwcm("action"), "add")
+ order_el = etree.SubElement(sc, qn("Order"))
+ order_el.text = str(idx)
+ cl_el = etree.SubElement(sc, qn("CommandLine"))
+ cl_el.text = cmd["commandline"].strip()
+ desc_el = etree.SubElement(sc, qn("Description"))
+ desc_el.text = cmd.get("description", "").strip()
+
+ oobe_tz = form_data.get("oobe_timezone", "").strip()
+ if oobe_tz:
+ tz_el = etree.SubElement(oobe_comp, qn("TimeZone"))
+ tz_el.text = oobe_tz
+
+ xml_bytes = etree.tostring(
+ root,
+ pretty_print=True,
+ xml_declaration=True,
+ encoding="utf-8",
+ )
+ return xml_bytes.decode("utf-8")
+
+
+def extract_form_data(form):
+ """Pull structured data from the submitted form."""
+ data = {}
+
+ dp_list = form.getlist("driver_path[]")
+ data["driver_paths"] = [p for p in dp_list if p.strip()]
+
+ data["computer_name"] = form.get("computer_name", "")
+ data["registered_organization"] = form.get("registered_organization", "")
+ data["registered_owner"] = form.get("registered_owner", "")
+ data["time_zone"] = form.get("time_zone", "")
+
+ spec_paths = form.getlist("spec_cmd_path[]")
+ spec_descs = form.getlist("spec_cmd_desc[]")
+ data["specialize_commands"] = []
+ for i in range(len(spec_paths)):
+ if spec_paths[i].strip():
+ data["specialize_commands"].append({
+ "path": spec_paths[i],
+ "description": spec_descs[i] if i < len(spec_descs) else "",
+ })
+
+ data["oobe"] = {}
+ for key in [
+ "HideEULAPage",
+ "HideOEMRegistrationScreen",
+ "HideOnlineAccountScreens",
+ "HideWirelessSetupInOOBE",
+ "HideLocalAccountScreen",
+ "SkipUserOOBE",
+ "SkipMachineOOBE",
+ ]:
+ data["oobe"][key] = form.get(f"oobe_{key}", "false")
+ data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work")
+ data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3")
+
+ fl_cls = form.getlist("fl_cmd_commandline[]")
+ fl_descs = form.getlist("fl_cmd_desc[]")
+ data["firstlogon_commands"] = []
+ for i in range(len(fl_cls)):
+ if fl_cls[i].strip():
+ data["firstlogon_commands"].append({
+ "commandline": fl_cls[i],
+ "description": fl_descs[i] if i < len(fl_descs) else "",
+ })
+
+ accounts = []
+ i = 0
+ while form.get(f"account_name_{i}"):
+ accounts.append({
+ "name": form.get(f"account_name_{i}", ""),
+ "password": form.get(f"account_password_{i}", ""),
+ "plain_text": form.get(f"account_plaintext_{i}", "true"),
+ "group": form.get(f"account_group_{i}", "Administrators"),
+ "display_name": form.get(f"account_display_{i}", ""),
+ })
+ i += 1
+ data["user_accounts"] = accounts
+
+ data["autologon"] = {
+ "enabled": form.get("autologon_enabled", ""),
+ "username": form.get("autologon_username", ""),
+ "password": form.get("autologon_password", ""),
+ "plain_text": form.get("autologon_plaintext", "true"),
+ "logon_count": form.get("autologon_logoncount", ""),
+ }
+
+ data["intl"] = {
+ "input_locale": form.get("intl_input_locale", ""),
+ "system_locale": form.get("intl_system_locale", ""),
+ "ui_language": form.get("intl_ui_language", ""),
+ "user_locale": form.get("intl_user_locale", ""),
+ }
+
+ data["oobe_timezone"] = form.get("oobe_timezone", "")
+
+ return data
diff --git a/webapp/services/wim.py b/webapp/services/wim.py
new file mode 100644
index 0000000..9556f42
--- /dev/null
+++ b/webapp/services/wim.py
@@ -0,0 +1,70 @@
+"""boot.wim manipulation via wimtools (wimextract / wimupdate / wimdir).
+
+Used by the startnet.cmd editor to extract + update the boot script
+that runs when WinPE boots from PXE.
+"""
+
+import os
+import shutil
+import subprocess
+import tempfile
+
+
+def extract_startnet(wim_path):
+ """Extract startnet.cmd from a WIM file. Returns the contents or None."""
+ tmpdir = tempfile.mkdtemp()
+ try:
+ result = subprocess.run(
+ ["wimextract", wim_path, "1",
+ "/Windows/System32/startnet.cmd",
+ "--dest-dir", tmpdir],
+ capture_output=True, text=True, timeout=30,
+ )
+ startnet_path = os.path.join(tmpdir, "startnet.cmd")
+ if result.returncode == 0 and os.path.isfile(startnet_path):
+ with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh:
+ return fh.read()
+ return None
+ except Exception:
+ return None
+ finally:
+ shutil.rmtree(tmpdir, ignore_errors=True)
+
+
+def update_startnet(wim_path, content):
+ """Update startnet.cmd inside a WIM file via wimupdate.
+
+ Returns (ok, error_message). Writes CRLF line endings.
+ """
+ tmpdir = tempfile.mkdtemp()
+ try:
+ startnet_path = os.path.join(tmpdir, "startnet.cmd")
+ with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh:
+ fh.write(content)
+ update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n"
+ result = subprocess.run(
+ ["wimupdate", wim_path, "1"],
+ input=update_cmd,
+ capture_output=True, text=True, timeout=60,
+ )
+ if result.returncode != 0:
+ return False, result.stderr.strip()
+ return True, ""
+ except Exception as exc:
+ return False, str(exc)
+ finally:
+ shutil.rmtree(tmpdir, ignore_errors=True)
+
+
+def list_files(wim_path, path="/"):
+ """List files inside a WIM at the given path."""
+ try:
+ result = subprocess.run(
+ ["wimdir", wim_path, "1", path],
+ capture_output=True, text=True, timeout=30,
+ )
+ if result.returncode == 0:
+ return [l.strip() for l in result.stdout.splitlines() if l.strip()]
+ return []
+ except Exception:
+ return []