From c16a4f23b44d77a3cfda2ccc51ed83031a6f1bb4 Mon Sep 17 00:00:00 2001 From: cproudlock Date: Fri, 8 May 2026 18:25:32 -0400 Subject: [PATCH] webapp: extract service layer (config.py + services/) from app.py Phase 1a of a multi-session refactor toward a clean blueprint structure. Pulls the helper code that lived alongside the routes in the 1621-line app.py into focused modules. app.py is now 625 lines of mostly routes plus a small Flask wiring header. Behaviour is unchanged: smoke-tested against the 8 main GET routes (200 OK). New modules: - config.py env vars + IMAGE_TYPES + FRIENDLY_NAMES + SHARED_DEPLOY_* taxonomy + unattend XML namespaces. - services/audit.py audit log file handler + audit() helper. - services/csrf.py session CSRF token + before_request validator wired via init_csrf(app). - services/fs.py image_root / deploy_path / unattend_path / control_path / tools_path + load_json / save_json + resolve_destination. - services/system.py service_status / find_usb_mounts / find_upload_sources. - services/images.py image_status + load_image_config. - services/deploy.py import_deploy + _merge_tree + _replace_with_symlink + allowed_import_source. - services/unattend.py parse_unattend / build_unattend_xml / extract_form_data and the qn / qwcm / settings pass helpers. - services/wim.py extract_startnet / update_startnet / list_files wrapping wimextract / wimupdate / wimdir. Endpoint names kept stable (dashboard, clonezilla_backups, etc.) so existing url_for(...) calls in templates are unchanged. Phase 1b (Flask blueprints with ".endpoint" naming) deferred to a future session because it requires updating ~30 url_for sites in templates and is mostly cosmetic. Co-Authored-By: Claude Opus 4.7 (1M context) --- webapp/app.py | 2246 ++++++++++------------------------- webapp/config.py | 66 + webapp/services/__init__.py | 0 webapp/services/audit.py | 26 + webapp/services/csrf.py | 28 + webapp/services/deploy.py | 96 ++ webapp/services/fs.py | 75 ++ webapp/services/images.py | 123 ++ webapp/services/system.py | 55 + webapp/services/unattend.py | 496 ++++++++ webapp/services/wim.py | 70 ++ 11 files changed, 1660 insertions(+), 1621 deletions(-) create mode 100644 webapp/config.py create mode 100644 webapp/services/__init__.py create mode 100644 webapp/services/audit.py create mode 100644 webapp/services/csrf.py create mode 100644 webapp/services/deploy.py create mode 100644 webapp/services/fs.py create mode 100644 webapp/services/images.py create mode 100644 webapp/services/system.py create mode 100644 webapp/services/unattend.py create mode 100644 webapp/services/wim.py diff --git a/webapp/app.py b/webapp/app.py index 8eafa78..d9f7ea4 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -1,1621 +1,625 @@ -#!/usr/bin/env python3 -"""Flask web application for managing a GE Aerospace PXE server.""" - -import json -import logging -import os -import secrets -import shutil -import subprocess -import tempfile -from datetime import datetime -from pathlib import Path - -from flask import ( - Flask, - abort, - flash, - jsonify, - redirect, - render_template, - request, - send_file, - session, - url_for, -) -from lxml import etree -from werkzeug.utils import secure_filename - -app = Flask(__name__) -app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod") -app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 * 1024 # 16 GB max upload - -# --------------------------------------------------------------------------- -# Audit logging -# --------------------------------------------------------------------------- -AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log") -audit_logger = logging.getLogger("pxe_audit") -audit_logger.setLevel(logging.INFO) -_audit_handler = logging.FileHandler(AUDIT_LOG, mode="a") -_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) -audit_logger.addHandler(_audit_handler) - - -def audit(action, detail=""): - """Write an entry to the audit log.""" - ip = request.remote_addr if request else "system" - audit_logger.info(f"[{ip}] {action}: {detail}") - -# --------------------------------------------------------------------------- -# Configuration -# --------------------------------------------------------------------------- -SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps") -CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla") -BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports") -ENROLLMENT_SHARE = os.environ.get("ENROLLMENT_SHARE", "/srv/samba/enrollment") -UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload") -SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared") -# Subdirs inside Deploy/ shared across ALL image types -SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"] -# Subdirs inside Deploy/ shared within the same image family (by prefix) -SHARED_DEPLOY_SCOPED = { - "gea-": ["Operating Systems", "Packages"], - "ge-": ["Operating Systems", "Packages"], -} -# Sibling dirs at image root shared within the same image family -SHARED_ROOT_DIRS = { - "gea-": ["Sources"], - "ge-": ["Sources"], -} -WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html") -BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim") - -IMAGE_TYPES = [ - "gea-standard", - "gea-engineer", - "gea-shopfloor", - "gea-shopfloor-mce", - "ge-standard", - "ge-engineer", - "ge-shopfloor-lockdown", - "ge-shopfloor-mce", -] - -FRIENDLY_NAMES = { - "gea-standard": "GE Aerospace Standard", - "gea-engineer": "GE Aerospace Engineer", - "gea-shopfloor": "GE Aerospace Shop Floor", - "gea-shopfloor-mce": "GE Aerospace Shop Floor MCE", - "ge-standard": "GE Legacy Standard", - "ge-engineer": "GE Legacy Engineer", - "ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown", - "ge-shopfloor-mce": "GE Legacy Shop Floor MCE", -} - -# --------------------------------------------------------------------------- -# CSRF protection -# --------------------------------------------------------------------------- -def generate_csrf_token(): - """Return the CSRF token for the current session, creating one if needed.""" - if "_csrf_token" not in session: - session["_csrf_token"] = secrets.token_hex(32) - return session["_csrf_token"] - - -@app.context_processor -def inject_csrf_token(): - """Make csrf_token() available in all templates.""" - return {"csrf_token": generate_csrf_token} - - -@app.before_request -def validate_csrf(): - """Reject POST requests with a missing or invalid CSRF token.""" - if request.method != "POST": - return - token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token") - if not token or token != generate_csrf_token(): - abort(403) - - -NS = "urn:schemas-microsoft-com:unattend" -WCM = "http://schemas.microsoft.com/WMIConfig/2002/State" -NSMAP = {None: NS, "wcm": WCM} - -# Convenience qualified-name helpers -def qn(tag): - """Return a tag qualified with the default unattend namespace.""" - return f"{{{NS}}}{tag}" - -def qwcm(attr): - """Return an attribute qualified with the wcm namespace.""" - return f"{{{WCM}}}{attr}" - - -# --------------------------------------------------------------------------- -# Utility helpers -# --------------------------------------------------------------------------- - -def image_root(image_type): - """Return the root directory for an image type.""" - return os.path.join(SAMBA_SHARE, image_type) - - -def deploy_path(image_type): - """Return the Deploy directory for an image type.""" - return os.path.join(SAMBA_SHARE, image_type, "Deploy") - - -def unattend_path(image_type): - """Return the unattend.xml path for an image type.""" - return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml") - - -def control_path(image_type): - """Return the Deploy/Control directory for an image type.""" - return os.path.join(deploy_path(image_type), "Control") - - -def tools_path(image_type): - """Return the Tools directory for an image type.""" - return os.path.join(SAMBA_SHARE, image_type, "Tools") - - -def _load_json(filepath): - """Parse a JSON file and return its contents, or [] on failure.""" - try: - with open(filepath, "r", encoding="utf-8-sig") as fh: - return json.load(fh) - except (OSError, json.JSONDecodeError): - return [] - - -def _save_json(filepath, data): - """Write data as pretty-printed JSON.""" - os.makedirs(os.path.dirname(filepath), exist_ok=True) - with open(filepath, "w", encoding="utf-8") as fh: - json.dump(data, fh, indent=2, ensure_ascii=False) - fh.write("\n") - - -def _resolve_destination(dest_dir, image_type): - """Convert a Windows *destinationdir* path to a Linux filesystem path. - - Replaces the ``*destinationdir*`` placeholder and backslashes, then - prepends ``SAMBA_SHARE/image_type/`` and resolves symlinks. - """ - if not dest_dir: - return "" - # Replace the placeholder (case-insensitive) - path = dest_dir - lower = path.lower() - idx = lower.find("*destinationdir*") - if idx != -1: - path = path[idx + len("*destinationdir*"):] - # Backslash → forward slash, strip leading slash - path = path.replace("\\", "/").lstrip("/") - full = os.path.join(SAMBA_SHARE, image_type, path) - # Resolve symlinks so shared dirs are found - try: - full = os.path.realpath(full) - except OSError: - pass - return full - - -def _load_image_config(image_type): - """Load all JSON configs for an image and check on-disk presence.""" - ctrl = control_path(image_type) - tools = tools_path(image_type) - - # --- Drivers (merge HardwareDriver.json + hw_drivers.json) --- - hw_driver_file = os.path.join(ctrl, "HardwareDriver.json") - hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json") - drivers_raw = _load_json(hw_driver_file) - extra_raw = _load_json(hw_drivers_extra) - - # Merge: dedup by FileName (case-insensitive) - seen_files = set() - drivers = [] - for d in drivers_raw + extra_raw: - fname = (d.get("FileName") or d.get("fileName") or "").lower() - if fname and fname in seen_files: - continue - if fname: - seen_files.add(fname) - drivers.append(d) - - # Check disk presence for each driver - for d in drivers: - fname = d.get("FileName") or d.get("fileName") or "" - dest = d.get("DestinationDir") or d.get("destinationDir") or "" - resolved = _resolve_destination(dest, image_type) - if resolved and fname: - d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) - else: - d["_on_disk"] = False - - # --- Operating Systems --- - os_file = os.path.join(ctrl, "OperatingSystem.json") - operating_systems = _load_json(os_file) - for entry in operating_systems: - osv = entry.get("operatingSystemVersion", {}) - wim = osv.get("wim", {}) - dest = wim.get("DestinationDir") or wim.get("destinationDir") or "" - resolved = _resolve_destination(dest, image_type) - if resolved: - entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim")) - else: - entry["_on_disk"] = False - - # --- Packages --- - pkg_file = os.path.join(ctrl, "packages.json") - packages = _load_json(pkg_file) - for p in packages: - fname = p.get("fileName") or p.get("FileName") or "" - dest = p.get("destinationDir") or p.get("DestinationDir") or "" - resolved = _resolve_destination(dest, image_type) - if resolved and fname: - p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) - else: - p["_on_disk"] = False - - # --- Hardware Models (user_selections.json) --- - us_file = os.path.join(tools, "user_selections.json") - us_raw = _load_json(us_file) - us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} - hardware_models = us_data.get("HardwareModelSelection", []) - os_selection = str(us_data.get("OperatingSystemSelection", "")) - - # Build a lookup: family → driver entry (for disk-presence on models) - family_lookup = {} - for d in drivers: - family = d.get("family", "") - if family: - family_lookup[family] = d - - for hm in hardware_models: - family_id = hm.get("Id", "") - matched = family_lookup.get(family_id) - hm["_on_disk"] = matched["_on_disk"] if matched else False - - # --- Orphan drivers: zip files on disk not referenced in any JSON --- - orphan_drivers = [] - oob_dir = os.path.join(deploy_path(image_type), "Out-of-box Drivers") - # Resolve symlinks - try: - oob_dir = os.path.realpath(oob_dir) - except OSError: - pass - registered_files = set() - for d in drivers: - fname = d.get("FileName") or d.get("fileName") or "" - if fname: - registered_files.add(fname.lower()) - if os.path.isdir(oob_dir): - for dirpath, _dirnames, filenames in os.walk(oob_dir): - for fn in filenames: - if fn.lower().endswith(".zip") and fn.lower() not in registered_files: - rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir) - orphan_drivers.append({"fileName": fn, "relPath": rel}) - - return { - "hardware_models": hardware_models, - "drivers": drivers, - "operating_systems": operating_systems, - "packages": packages, - "orphan_drivers": orphan_drivers, - "os_selection": os_selection, - } - - -def image_status(image_type): - """Return a dict describing the state of an image type.""" - dp = deploy_path(image_type) - up = unattend_path(image_type) - has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False - has_unattend = os.path.isfile(up) - return { - "image_type": image_type, - "friendly_name": FRIENDLY_NAMES.get(image_type, image_type), - "deploy_path": dp, - "has_content": has_content, - "has_unattend": has_unattend, - } - - -def service_status(service_name): - """Check whether a systemd service is active.""" - try: - result = subprocess.run( - ["systemctl", "is-active", service_name], - capture_output=True, - text=True, - timeout=5, - ) - state = result.stdout.strip() - return {"name": service_name, "active": state == "active", "state": state} - except Exception as exc: - return {"name": service_name, "active": False, "state": str(exc)} - - -def find_usb_mounts(): - """Return a list of mount-point paths that look like removable media.""" - mounts = [] - try: - with open("/proc/mounts", "r") as fh: - for line in fh: - parts = line.split() - if len(parts) >= 2: - mount_point = parts[1] - if mount_point.startswith(("/mnt/", "/media/")): - if os.path.isdir(mount_point): - mounts.append(mount_point) - except OSError: - pass - return sorted(set(mounts)) - - -def find_upload_sources(): - """Return sub-directories inside UPLOAD_DIR that look like image content.""" - sources = [] - if os.path.isdir(UPLOAD_DIR): - # Include the upload dir itself if it has content - try: - entries = os.listdir(UPLOAD_DIR) - if entries: - sources.append(UPLOAD_DIR) - # Also include immediate subdirectories - for entry in entries: - full = os.path.join(UPLOAD_DIR, entry) - if os.path.isdir(full): - sources.append(full) - except OSError: - pass - return sources - - -def _import_deploy(src_deploy, dst_deploy, target="", move=False): - """Import Deploy directory contents, merging shared subdirs into _shared. - When move=True, files are moved instead of copied (saves disk space).""" - # Build list of scoped shared dirs for this target - scoped_shared = [] - prefix_key = "" - for prefix, dirs in SHARED_DEPLOY_SCOPED.items(): - if target.startswith(prefix): - scoped_shared = dirs - prefix_key = prefix - break - - _transfer = shutil.move if move else shutil.copy2 - _transfer_tree = shutil.move if move else shutil.copytree - - os.makedirs(dst_deploy, exist_ok=True) - for item in os.listdir(src_deploy): - src_item = os.path.join(src_deploy, item) - dst_item = os.path.join(dst_deploy, item) - - if not os.path.isdir(src_item): - _transfer(src_item, dst_item) - continue - - # Global shared (e.g., Out-of-box Drivers) — one copy for all - if item in SHARED_DEPLOY_GLOBAL: - shared_dest = os.path.join(SHARED_DIR, item) - os.makedirs(shared_dest, exist_ok=True) - _merge_tree(src_item, shared_dest, move=move) - _replace_with_symlink(dst_item, shared_dest) - continue - - # Scoped shared (e.g., Operating Systems) — per family prefix - if item in scoped_shared: - shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}") - os.makedirs(shared_dest, exist_ok=True) - _merge_tree(src_item, shared_dest, move=move) - _replace_with_symlink(dst_item, shared_dest) - continue - - # Normal transfer — merge to preserve existing custom content - if os.path.isdir(dst_item): - _merge_tree(src_item, dst_item, move=move) - else: - _transfer_tree(src_item, dst_item) - - -def _replace_with_symlink(link_path, target_path): - """Replace a file/dir/symlink at link_path with a symlink to target_path.""" - if os.path.islink(link_path): - os.remove(link_path) - elif os.path.isdir(link_path): - shutil.rmtree(link_path) - os.symlink(target_path, link_path) - - -def _merge_tree(src, dst, move=False): - """Recursively merge src tree into dst, overwriting existing files. - When move=True, files are moved instead of copied.""" - _transfer = shutil.move if move else shutil.copy2 - _transfer_tree = shutil.move if move else shutil.copytree - for item in os.listdir(src): - s = os.path.join(src, item) - d = os.path.join(dst, item) - if os.path.isdir(s): - if os.path.isdir(d): - _merge_tree(s, d, move=move) - else: - if os.path.exists(d): - os.remove(d) - _transfer_tree(s, d) - else: - os.makedirs(os.path.dirname(d), exist_ok=True) - _transfer(s, d) - - -def allowed_import_source(source): - """Check if a source path is a valid import location (USB or upload dir).""" - usb = find_usb_mounts() - if any(source == m or source.startswith(m + "/") for m in usb): - return True - if source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/"): - return os.path.isdir(source) - return False - - -# --------------------------------------------------------------------------- -# XML helpers — parse / build unattend.xml -# --------------------------------------------------------------------------- - -UNATTEND_TEMPLATE = """\ - - - - - - - -""" - - -def _find_or_create(parent, tag): - """Find the first child with *tag* or create it.""" - el = parent.find(tag, namespaces={"": NS}) - if el is None: - el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag) - return el - - -def _settings_pass(root, pass_name): - """Return the element, creating if needed.""" - for s in root.findall(qn("settings")): - if s.get("pass") == pass_name: - return s - s = etree.SubElement(root, qn("settings")) - s.set("pass", pass_name) - return s - - -def parse_unattend(xml_path): - """Parse an unattend.xml and return a dict of editable data.""" - data = { - "driver_paths": [], - "computer_name": "", - "registered_organization": "", - "registered_owner": "", - "time_zone": "", - "specialize_commands": [], - "oobe": { - "HideEULAPage": "true", - "HideOEMRegistrationScreen": "true", - "HideOnlineAccountScreens": "true", - "HideWirelessSetupInOOBE": "true", - "HideLocalAccountScreen": "true", - "NetworkLocation": "Work", - "ProtectYourPC": "3", - "SkipUserOOBE": "true", - "SkipMachineOOBE": "true", - }, - "firstlogon_commands": [], - "user_accounts": [], - "autologon": { - "enabled": "", - "username": "", - "password": "", - "plain_text": "true", - "logon_count": "", - }, - "intl": { - "input_locale": "", - "system_locale": "", - "ui_language": "", - "user_locale": "", - }, - "oobe_timezone": "", - "raw_xml": "", - } - - if not os.path.isfile(xml_path): - data["raw_xml"] = UNATTEND_TEMPLATE - return data - - with open(xml_path, "r", encoding="utf-8") as fh: - raw = fh.read() - data["raw_xml"] = raw - - try: - root = etree.fromstring(raw.encode("utf-8")) - except etree.XMLSyntaxError: - return data - - ns = {"u": NS} - - # --- offlineServicing: DriverPaths --- - for dp_el in root.xpath( - "u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path", - namespaces=ns, - ): - if dp_el.text: - data["driver_paths"].append(dp_el.text.strip()) - - # --- specialize: Shell-Setup --- - for comp in root.xpath( - "u:settings[@pass='specialize']/u:component", - namespaces=ns, - ): - comp_name = comp.get("name", "") - if "Shell-Setup" in comp_name: - for tag, key in [ - ("ComputerName", "computer_name"), - ("RegisteredOrganization", "registered_organization"), - ("RegisteredOwner", "registered_owner"), - ("TimeZone", "time_zone"), - ]: - el = comp.find(qn(tag)) - if el is not None and el.text: - data[key] = el.text.strip() - - # --- specialize: RunSynchronous commands --- - for cmd in root.xpath( - "u:settings[@pass='specialize']//u:RunSynchronousCommand", - namespaces=ns, - ): - order_el = cmd.find(qn("Order")) - path_el = cmd.find(qn("Path")) - desc_el = cmd.find(qn("Description")) - data["specialize_commands"].append({ - "order": order_el.text.strip() if order_el is not None and order_el.text else "", - "path": path_el.text.strip() if path_el is not None and path_el.text else "", - "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", - }) - - # --- oobeSystem --- - for comp in root.xpath( - "u:settings[@pass='oobeSystem']/u:component", - namespaces=ns, - ): - comp_name = comp.get("name", "") - - # International-Core component - if "International-Core" in comp_name: - for tag, key in [ - ("InputLocale", "input_locale"), - ("SystemLocale", "system_locale"), - ("UILanguage", "ui_language"), - ("UserLocale", "user_locale"), - ]: - el = comp.find(qn(tag)) - if el is not None and el.text: - data["intl"][key] = el.text.strip() - - if "OOBE" in comp_name or "Shell-Setup" in comp_name: - oobe_el = comp.find(qn("OOBE")) - if oobe_el is not None: - for child in oobe_el: - local = etree.QName(child).localname - if local in data["oobe"] and child.text: - data["oobe"][local] = child.text.strip() - - # UserAccounts / LocalAccounts - ua = comp.find(qn("UserAccounts")) - if ua is not None: - la_container = ua.find(qn("LocalAccounts")) - if la_container is not None: - for acct in la_container.findall(qn("LocalAccount")): - name_el = acct.find(qn("Name")) - group_el = acct.find(qn("Group")) - display_el = acct.find(qn("DisplayName")) - pw_el = acct.find(qn("Password")) - pw_val = "" - pw_plain = "true" - if pw_el is not None: - v = pw_el.find(qn("Value")) - p = pw_el.find(qn("PlainText")) - if v is not None and v.text: - pw_val = v.text.strip() - if p is not None and p.text: - pw_plain = p.text.strip() - data["user_accounts"].append({ - "name": name_el.text.strip() if name_el is not None and name_el.text else "", - "password": pw_val, - "plain_text": pw_plain, - "group": group_el.text.strip() if group_el is not None and group_el.text else "Administrators", - "display_name": display_el.text.strip() if display_el is not None and display_el.text else "", - }) - - # AutoLogon - al = comp.find(qn("AutoLogon")) - if al is not None: - enabled_el = al.find(qn("Enabled")) - user_el = al.find(qn("Username")) - count_el = al.find(qn("LogonCount")) - pw_el = al.find(qn("Password")) - pw_val = "" - pw_plain = "true" - if pw_el is not None: - v = pw_el.find(qn("Value")) - p = pw_el.find(qn("PlainText")) - if v is not None and v.text: - pw_val = v.text.strip() - if p is not None and p.text: - pw_plain = p.text.strip() - data["autologon"] = { - "enabled": enabled_el.text.strip() if enabled_el is not None and enabled_el.text else "", - "username": user_el.text.strip() if user_el is not None and user_el.text else "", - "password": pw_val, - "plain_text": pw_plain, - "logon_count": count_el.text.strip() if count_el is not None and count_el.text else "", - } - - # FirstLogonCommands - flc = comp.find(qn("FirstLogonCommands")) - if flc is not None: - for sync in flc.findall(qn("SynchronousCommand")): - order_el = sync.find(qn("Order")) - cl_el = sync.find(qn("CommandLine")) - desc_el = sync.find(qn("Description")) - data["firstlogon_commands"].append({ - "order": order_el.text.strip() if order_el is not None and order_el.text else "", - "commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "", - "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", - }) - - # TimeZone (oobeSystem pass) - tz_el = comp.find(qn("TimeZone")) - if tz_el is not None and tz_el.text: - data["oobe_timezone"] = tz_el.text.strip() - - return data - - -def build_unattend_xml(form_data): - """Build a complete unattend.xml string from form data dict.""" - root = etree.Element(qn("unattend"), nsmap=NSMAP) - - # --- windowsPE (empty) --- - _settings_pass(root, "windowsPE") - - # --- offlineServicing: DriverPaths --- - offline = _settings_pass(root, "offlineServicing") - driver_paths = form_data.get("driver_paths", []) - if driver_paths: - comp = etree.SubElement(offline, qn("component")) - comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE") - comp.set("processorArchitecture", "amd64") - comp.set("publicKeyToken", "31bf3856ad364e35") - comp.set("language", "neutral") - comp.set("versionScope", "nonSxS") - dp_container = etree.SubElement(comp, qn("DriverPaths")) - for idx, dp in enumerate(driver_paths, start=1): - if not dp.strip(): - continue - pac = etree.SubElement(dp_container, qn("PathAndCredentials")) - pac.set(qwcm("action"), "add") - pac.set(qwcm("keyValue"), str(idx)) - path_el = etree.SubElement(pac, qn("Path")) - path_el.text = dp.strip() - - # --- specialize --- - spec = _settings_pass(root, "specialize") - - # Shell-Setup component - shell_comp = etree.SubElement(spec, qn("component")) - shell_comp.set("name", "Microsoft-Windows-Shell-Setup") - shell_comp.set("processorArchitecture", "amd64") - shell_comp.set("publicKeyToken", "31bf3856ad364e35") - shell_comp.set("language", "neutral") - shell_comp.set("versionScope", "nonSxS") - - for tag, key in [ - ("ComputerName", "computer_name"), - ("RegisteredOrganization", "registered_organization"), - ("RegisteredOwner", "registered_owner"), - ("TimeZone", "time_zone"), - ]: - val = form_data.get(key, "").strip() - if val: - el = etree.SubElement(shell_comp, qn(tag)) - el.text = val - - # Deployment / RunSynchronous commands - spec_cmds = form_data.get("specialize_commands", []) - if spec_cmds: - deploy_comp = etree.SubElement(spec, qn("component")) - deploy_comp.set("name", "Microsoft-Windows-Deployment") - deploy_comp.set("processorArchitecture", "amd64") - deploy_comp.set("publicKeyToken", "31bf3856ad364e35") - deploy_comp.set("language", "neutral") - deploy_comp.set("versionScope", "nonSxS") - rs = etree.SubElement(deploy_comp, qn("RunSynchronous")) - for idx, cmd in enumerate(spec_cmds, start=1): - if not cmd.get("path", "").strip(): - continue - rsc = etree.SubElement(rs, qn("RunSynchronousCommand")) - rsc.set(qwcm("action"), "add") - order_el = etree.SubElement(rsc, qn("Order")) - order_el.text = str(idx) - path_el = etree.SubElement(rsc, qn("Path")) - path_el.text = cmd["path"].strip() - desc_el = etree.SubElement(rsc, qn("Description")) - desc_el.text = cmd.get("description", "").strip() - - # --- oobeSystem --- - oobe_settings = _settings_pass(root, "oobeSystem") - - # International-Core component (before Shell-Setup) - intl = form_data.get("intl", {}) - if any(v.strip() for v in intl.values() if v): - intl_comp = etree.SubElement(oobe_settings, qn("component")) - intl_comp.set("name", "Microsoft-Windows-International-Core") - intl_comp.set("processorArchitecture", "amd64") - intl_comp.set("publicKeyToken", "31bf3856ad364e35") - intl_comp.set("language", "neutral") - intl_comp.set("versionScope", "nonSxS") - for tag, key in [ - ("InputLocale", "input_locale"), - ("SystemLocale", "system_locale"), - ("UILanguage", "ui_language"), - ("UserLocale", "user_locale"), - ]: - val = intl.get(key, "").strip() - if val: - el = etree.SubElement(intl_comp, qn(tag)) - el.text = val - - # Shell-Setup component - oobe_comp = etree.SubElement(oobe_settings, qn("component")) - oobe_comp.set("name", "Microsoft-Windows-Shell-Setup") - oobe_comp.set("processorArchitecture", "amd64") - oobe_comp.set("publicKeyToken", "31bf3856ad364e35") - oobe_comp.set("language", "neutral") - oobe_comp.set("versionScope", "nonSxS") - - oobe_el = etree.SubElement(oobe_comp, qn("OOBE")) - oobe_data = form_data.get("oobe", {}) - for key in [ - "HideEULAPage", - "HideOEMRegistrationScreen", - "HideOnlineAccountScreens", - "HideWirelessSetupInOOBE", - "HideLocalAccountScreen", - "NetworkLocation", - "ProtectYourPC", - "SkipUserOOBE", - "SkipMachineOOBE", - ]: - val = oobe_data.get(key, "") - if val: - child = etree.SubElement(oobe_el, qn(key)) - child.text = str(val) - - # UserAccounts / LocalAccounts - accounts = form_data.get("user_accounts", []) - if accounts: - ua = etree.SubElement(oobe_comp, qn("UserAccounts")) - la_container = etree.SubElement(ua, qn("LocalAccounts")) - for acct in accounts: - if not acct.get("name", "").strip(): - continue - la = etree.SubElement(la_container, qn("LocalAccount")) - la.set(qwcm("action"), "add") - pw = etree.SubElement(la, qn("Password")) - pw_val = etree.SubElement(pw, qn("Value")) - pw_val.text = acct.get("password", "") - pw_plain = etree.SubElement(pw, qn("PlainText")) - pw_plain.text = acct.get("plain_text", "true") - name_el = etree.SubElement(la, qn("Name")) - name_el.text = acct["name"].strip() - group_el = etree.SubElement(la, qn("Group")) - group_el.text = acct.get("group", "Administrators").strip() - display_el = etree.SubElement(la, qn("DisplayName")) - display_el.text = acct.get("display_name", acct["name"]).strip() - - # AutoLogon - autologon = form_data.get("autologon", {}) - if autologon.get("username", "").strip(): - al = etree.SubElement(oobe_comp, qn("AutoLogon")) - al_pw = etree.SubElement(al, qn("Password")) - al_pw_val = etree.SubElement(al_pw, qn("Value")) - al_pw_val.text = autologon.get("password", "") - al_pw_plain = etree.SubElement(al_pw, qn("PlainText")) - al_pw_plain.text = autologon.get("plain_text", "true") - al_enabled = etree.SubElement(al, qn("Enabled")) - al_enabled.text = autologon.get("enabled", "true") - al_user = etree.SubElement(al, qn("Username")) - al_user.text = autologon["username"].strip() - logon_count = autologon.get("logon_count", "").strip() - if logon_count: - al_count = etree.SubElement(al, qn("LogonCount")) - al_count.text = logon_count - - # FirstLogonCommands - fl_cmds = form_data.get("firstlogon_commands", []) - if fl_cmds: - flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands")) - for idx, cmd in enumerate(fl_cmds, start=1): - if not cmd.get("commandline", "").strip(): - continue - sc = etree.SubElement(flc, qn("SynchronousCommand")) - sc.set(qwcm("action"), "add") - order_el = etree.SubElement(sc, qn("Order")) - order_el.text = str(idx) - cl_el = etree.SubElement(sc, qn("CommandLine")) - cl_el.text = cmd["commandline"].strip() - desc_el = etree.SubElement(sc, qn("Description")) - desc_el.text = cmd.get("description", "").strip() - - # TimeZone (oobeSystem pass) - oobe_tz = form_data.get("oobe_timezone", "").strip() - if oobe_tz: - tz_el = etree.SubElement(oobe_comp, qn("TimeZone")) - tz_el.text = oobe_tz - - xml_bytes = etree.tostring( - root, - pretty_print=True, - xml_declaration=True, - encoding="utf-8", - ) - return xml_bytes.decode("utf-8") - - -def _extract_form_data(form): - """Pull structured data from the submitted form.""" - data = {} - - # Driver paths - dp_list = form.getlist("driver_path[]") - data["driver_paths"] = [p for p in dp_list if p.strip()] - - # Machine settings - data["computer_name"] = form.get("computer_name", "") - data["registered_organization"] = form.get("registered_organization", "") - data["registered_owner"] = form.get("registered_owner", "") - data["time_zone"] = form.get("time_zone", "") - - # Specialize commands - spec_paths = form.getlist("spec_cmd_path[]") - spec_descs = form.getlist("spec_cmd_desc[]") - data["specialize_commands"] = [] - for i in range(len(spec_paths)): - if spec_paths[i].strip(): - data["specialize_commands"].append({ - "path": spec_paths[i], - "description": spec_descs[i] if i < len(spec_descs) else "", - }) - - # OOBE settings - data["oobe"] = {} - for key in [ - "HideEULAPage", - "HideOEMRegistrationScreen", - "HideOnlineAccountScreens", - "HideWirelessSetupInOOBE", - "HideLocalAccountScreen", - "SkipUserOOBE", - "SkipMachineOOBE", - ]: - data["oobe"][key] = form.get(f"oobe_{key}", "false") - data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work") - data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3") - - # First logon commands - fl_cls = form.getlist("fl_cmd_commandline[]") - fl_descs = form.getlist("fl_cmd_desc[]") - data["firstlogon_commands"] = [] - for i in range(len(fl_cls)): - if fl_cls[i].strip(): - data["firstlogon_commands"].append({ - "commandline": fl_cls[i], - "description": fl_descs[i] if i < len(fl_descs) else "", - }) - - # User accounts - accounts = [] - i = 0 - while form.get(f"account_name_{i}"): - accounts.append({ - "name": form.get(f"account_name_{i}", ""), - "password": form.get(f"account_password_{i}", ""), - "plain_text": form.get(f"account_plaintext_{i}", "true"), - "group": form.get(f"account_group_{i}", "Administrators"), - "display_name": form.get(f"account_display_{i}", ""), - }) - i += 1 - data["user_accounts"] = accounts - - # AutoLogon - data["autologon"] = { - "enabled": form.get("autologon_enabled", ""), - "username": form.get("autologon_username", ""), - "password": form.get("autologon_password", ""), - "plain_text": form.get("autologon_plaintext", "true"), - "logon_count": form.get("autologon_logoncount", ""), - } - - # International settings - data["intl"] = { - "input_locale": form.get("intl_input_locale", ""), - "system_locale": form.get("intl_system_locale", ""), - "ui_language": form.get("intl_ui_language", ""), - "user_locale": form.get("intl_user_locale", ""), - } - - # OOBE TimeZone - data["oobe_timezone"] = form.get("oobe_timezone", "") - - return data - - -# --------------------------------------------------------------------------- -# Routes — pages -# --------------------------------------------------------------------------- - -@app.route("/") -def dashboard(): - images = [image_status(it) for it in IMAGE_TYPES] - services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")] - return render_template( - "dashboard.html", - images=images, - services=services, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -@app.route("/images/import", methods=["GET", "POST"]) -def images_import(): - usb_mounts = find_usb_mounts() - upload_sources = find_upload_sources() - images = [image_status(it) for it in IMAGE_TYPES] - - if request.method == "POST": - source = request.form.get("source", "") - target = request.form.get("target", "") - - if not source or not target: - flash("Please select both a source and a target image type.", "danger") - return redirect(url_for("images_import")) - - if target not in IMAGE_TYPES: - flash("Invalid target image type.", "danger") - return redirect(url_for("images_import")) - - if not allowed_import_source(source): - flash("Source path is not a valid import location.", "danger") - return redirect(url_for("images_import")) - - if not os.path.isdir(source): - flash(f"Source path does not exist: {source}", "danger") - return redirect(url_for("images_import")) - - root = image_root(target) - dest = deploy_path(target) - try: - os.makedirs(dest, exist_ok=True) - src_items = os.listdir(source) - - # Move files from network upload to save disk space; copy from USB - use_move = source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/") - _transfer = shutil.move if use_move else shutil.copy2 - _transfer_tree = shutil.move if use_move else shutil.copytree - - # Detect layout: if source has Deploy/, Sources/, Tools/ at top - # level, it's the full image root structure (USB-style). - # Otherwise treat it as Deploy/ contents directly. - top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))} - full_layout = "Deploy" in top_dirs - - if full_layout: - # Determine which root-level dirs are shared for this target - shared_root = [] - for prefix, dirs in SHARED_ROOT_DIRS.items(): - if target.startswith(prefix): - shared_root = dirs - break - - # Full image root: import Deploy contents + sibling dirs - for item in src_items: - src_item = os.path.join(source, item) - if item == "Deploy": - _import_deploy(src_item, dest, target, move=use_move) - elif os.path.isdir(src_item) and item in shared_root: - # Shared sibling: merge into _shared/{prefix}{item} - # and symlink from image root - prefix_key = target.split("-")[0] + "-" - shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}") - os.makedirs(shared_dest, exist_ok=True) - _merge_tree(src_item, shared_dest, move=use_move) - dst_item = os.path.join(root, item) - if os.path.islink(dst_item): - os.remove(dst_item) - elif os.path.isdir(dst_item): - shutil.rmtree(dst_item) - os.symlink(shared_dest, dst_item) - elif os.path.isdir(src_item): - # Non-shared sibling dirs (Tools) go into image root - dst_item = os.path.join(root, item) - if os.path.exists(dst_item): - shutil.rmtree(dst_item) - _transfer_tree(src_item, dst_item) - else: - _transfer(src_item, os.path.join(root, item)) - else: - # Flat layout: treat source as Deploy contents - _import_deploy(source, dest, target, move=use_move) - - # Ensure Media.tag exists (FlatSetupLoader.exe drive detection) - control_dir = os.path.join(dest, "Control") - os.makedirs(control_dir, exist_ok=True) - media_tag = os.path.join(control_dir, "Media.tag") - Path(media_tag).touch() - - audit("IMAGE_IMPORT", f"{source} -> {target}") - flash( - f"Successfully imported content to {FRIENDLY_NAMES.get(target, target)}.", - "success", - ) - except Exception as exc: - flash(f"Import failed: {exc}", "danger") - - return redirect(url_for("images_import")) - - return render_template( - "import.html", - usb_mounts=usb_mounts, - upload_sources=upload_sources, - images=images, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -@app.route("/images//unattend", methods=["GET", "POST"]) -def unattend_editor(image_type): - if image_type not in IMAGE_TYPES: - flash("Unknown image type.", "danger") - return redirect(url_for("dashboard")) - - xml_file = unattend_path(image_type) - - if request.method == "POST": - save_mode = request.form.get("save_mode", "form") - - if save_mode == "raw": - raw_xml = request.form.get("raw_xml", "") - # Validate the XML before saving - try: - etree.fromstring(raw_xml.encode("utf-8")) - except etree.XMLSyntaxError as exc: - flash(f"Invalid XML: {exc}", "danger") - data = parse_unattend(xml_file) - data["raw_xml"] = raw_xml - return render_template( - "unattend_editor.html", - image_type=image_type, - friendly_name=FRIENDLY_NAMES.get(image_type, image_type), - data=data, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - xml_content = raw_xml - else: - form_data = _extract_form_data(request.form) - xml_content = build_unattend_xml(form_data) - - # Write to disk - try: - os.makedirs(os.path.dirname(xml_file), exist_ok=True) - with open(xml_file, "w", encoding="utf-8") as fh: - fh.write(xml_content) - audit("UNATTEND_SAVE", f"{image_type} ({save_mode})") - flash("unattend.xml saved successfully.", "success") - except Exception as exc: - flash(f"Failed to save: {exc}", "danger") - - return redirect(url_for("unattend_editor", image_type=image_type)) - - data = parse_unattend(xml_file) - return render_template( - "unattend_editor.html", - image_type=image_type, - friendly_name=FRIENDLY_NAMES.get(image_type, image_type), - data=data, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -@app.route("/images//config") -def image_config(image_type): - if image_type not in IMAGE_TYPES: - flash("Unknown image type.", "danger") - return redirect(url_for("dashboard")) - - config = _load_image_config(image_type) - return render_template( - "image_config.html", - image_type=image_type, - friendly_name=FRIENDLY_NAMES.get(image_type, image_type), - config=config, - ) - - -@app.route("/images//config/save", methods=["POST"]) -def image_config_save(image_type): - if image_type not in IMAGE_TYPES: - flash("Unknown image type.", "danger") - return redirect(url_for("dashboard")) - - section = request.form.get("section", "") - payload = request.form.get("payload", "[]") - try: - data = json.loads(payload) - except json.JSONDecodeError: - flash("Invalid JSON payload.", "danger") - return redirect(url_for("image_config", image_type=image_type)) - - ctrl = control_path(image_type) - tools = tools_path(image_type) - - try: - if section == "hardware_models": - us_file = os.path.join(tools, "user_selections.json") - us_raw = _load_json(us_file) - us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} - us_data["HardwareModelSelection"] = data - _save_json(us_file, [us_data]) - audit("CONFIG_SAVE", f"{image_type}/hardware_models") - - elif section == "drivers": - # Strip internal fields before saving - clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] - _save_json(os.path.join(ctrl, "HardwareDriver.json"), clean) - audit("CONFIG_SAVE", f"{image_type}/drivers") - - elif section == "operating_systems": - clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] - _save_json(os.path.join(ctrl, "OperatingSystem.json"), clean) - audit("CONFIG_SAVE", f"{image_type}/operating_systems") - - elif section == "packages": - clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] - _save_json(os.path.join(ctrl, "packages.json"), clean) - audit("CONFIG_SAVE", f"{image_type}/packages") - - else: - flash(f"Unknown section: {section}", "danger") - return redirect(url_for("image_config", image_type=image_type)) - - flash(f"Saved {section.replace('_', ' ')} successfully.", "success") - except Exception as exc: - flash(f"Failed to save {section}: {exc}", "danger") - - return redirect(url_for("image_config", image_type=image_type)) - - -# --------------------------------------------------------------------------- -# Routes — Clonezilla Backups -# --------------------------------------------------------------------------- - -@app.route("/backups") -def clonezilla_backups(): - backups = [] - if os.path.isdir(CLONEZILLA_SHARE): - for f in sorted(os.listdir(CLONEZILLA_SHARE)): - fpath = os.path.join(CLONEZILLA_SHARE, f) - if os.path.isfile(fpath) and f.lower().endswith(".zip"): - stat = os.stat(fpath) - backups.append({ - "filename": f, - "machine": os.path.splitext(f)[0], - "size": stat.st_size, - "modified": stat.st_mtime, - }) - return render_template( - "backups.html", - backups=backups, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -@app.route("/backups/upload", methods=["POST"]) -def clonezilla_upload(): - if "backup_file" not in request.files: - flash("No file selected.", "danger") - return redirect(url_for("clonezilla_backups")) - - f = request.files["backup_file"] - if not f.filename: - flash("No file selected.", "danger") - return redirect(url_for("clonezilla_backups")) - - filename = secure_filename(f.filename) - if not filename.lower().endswith(".zip"): - flash("Only .zip files are accepted.", "danger") - return redirect(url_for("clonezilla_backups")) - - os.makedirs(CLONEZILLA_SHARE, exist_ok=True) - dest = os.path.join(CLONEZILLA_SHARE, filename) - f.save(dest) - audit("BACKUP_UPLOAD", filename) - flash(f"Uploaded {filename} successfully.", "success") - return redirect(url_for("clonezilla_backups")) - - -@app.route("/backups/download/") -def clonezilla_download(filename): - filename = secure_filename(filename) - fpath = os.path.join(CLONEZILLA_SHARE, filename) - if not os.path.isfile(fpath): - flash(f"Backup not found: {filename}", "danger") - return redirect(url_for("clonezilla_backups")) - return send_file(fpath, as_attachment=True) - - -@app.route("/backups/delete/", methods=["POST"]) -def clonezilla_delete(filename): - filename = secure_filename(filename) - fpath = os.path.join(CLONEZILLA_SHARE, filename) - if os.path.isfile(fpath): - os.remove(fpath) - audit("BACKUP_DELETE", filename) - flash(f"Deleted {filename}.", "success") - else: - flash(f"Backup not found: {filename}", "danger") - return redirect(url_for("clonezilla_backups")) - - -# --------------------------------------------------------------------------- -# Routes — Blancco Reports -# --------------------------------------------------------------------------- - -@app.route("/reports") -def blancco_reports(): - reports = [] - if os.path.isdir(BLANCCO_REPORTS): - for f in sorted(os.listdir(BLANCCO_REPORTS), reverse=True): - fpath = os.path.join(BLANCCO_REPORTS, f) - if os.path.isfile(fpath): - stat = os.stat(fpath) - ext = os.path.splitext(f)[1].lower() - reports.append({ - "filename": f, - "size": stat.st_size, - "modified": stat.st_mtime, - "type": ext.lstrip(".").upper() or "FILE", - }) - return render_template( - "reports.html", - reports=reports, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -@app.route("/reports/download/") -def blancco_download_report(filename): - filename = secure_filename(filename) - fpath = os.path.join(BLANCCO_REPORTS, filename) - if not os.path.isfile(fpath): - flash(f"Report not found: {filename}", "danger") - return redirect(url_for("blancco_reports")) - return send_file(fpath, as_attachment=True) - - -@app.route("/reports/delete/", methods=["POST"]) -def blancco_delete_report(filename): - filename = secure_filename(filename) - fpath = os.path.join(BLANCCO_REPORTS, filename) - if os.path.isfile(fpath): - os.remove(fpath) - audit("REPORT_DELETE", filename) - flash(f"Deleted {filename}.", "success") - else: - flash(f"Report not found: {filename}", "danger") - return redirect(url_for("blancco_reports")) - - -# --------------------------------------------------------------------------- -# Routes — Enrollment Packages -# --------------------------------------------------------------------------- - -@app.route("/enrollment") -def enrollment(): - packages = [] - if os.path.isdir(ENROLLMENT_SHARE): - for f in sorted(os.listdir(ENROLLMENT_SHARE)): - fpath = os.path.join(ENROLLMENT_SHARE, f) - if os.path.isfile(fpath) and f.lower().endswith(".ppkg"): - stat = os.stat(fpath) - packages.append({ - "filename": f, - "size": stat.st_size, - "modified": stat.st_mtime, - }) - return render_template( - "enrollment.html", - packages=packages, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -@app.route("/enrollment/upload", methods=["POST"]) -def enrollment_upload(): - if "ppkg_file" not in request.files: - flash("No file selected.", "danger") - return redirect(url_for("enrollment")) - - f = request.files["ppkg_file"] - if not f.filename: - flash("No file selected.", "danger") - return redirect(url_for("enrollment")) - - filename = secure_filename(f.filename) - if not filename.lower().endswith(".ppkg"): - flash("Only .ppkg files are accepted.", "danger") - return redirect(url_for("enrollment")) - - os.makedirs(ENROLLMENT_SHARE, exist_ok=True) - dest = os.path.join(ENROLLMENT_SHARE, filename) - f.save(dest) - audit("ENROLLMENT_UPLOAD", filename) - flash(f"Uploaded {filename} successfully.", "success") - return redirect(url_for("enrollment")) - - -@app.route("/enrollment/download/") -def enrollment_download(filename): - filename = secure_filename(filename) - fpath = os.path.join(ENROLLMENT_SHARE, filename) - if not os.path.isfile(fpath): - flash(f"Package not found: {filename}", "danger") - return redirect(url_for("enrollment")) - return send_file(fpath, as_attachment=True) - - -@app.route("/enrollment/delete/", methods=["POST"]) -def enrollment_delete(filename): - filename = secure_filename(filename) - fpath = os.path.join(ENROLLMENT_SHARE, filename) - if os.path.isfile(fpath): - os.remove(fpath) - audit("ENROLLMENT_DELETE", filename) - flash(f"Deleted {filename}.", "success") - else: - flash(f"Package not found: {filename}", "danger") - return redirect(url_for("enrollment")) - - -# --------------------------------------------------------------------------- -# Routes — startnet.cmd Editor (WIM) -# --------------------------------------------------------------------------- - -def _wim_extract_startnet(wim_path): - """Extract startnet.cmd from a WIM file using wimextract.""" - tmpdir = tempfile.mkdtemp() - try: - result = subprocess.run( - ["wimextract", wim_path, "1", - "/Windows/System32/startnet.cmd", - "--dest-dir", tmpdir], - capture_output=True, text=True, timeout=30, - ) - startnet_path = os.path.join(tmpdir, "startnet.cmd") - if result.returncode == 0 and os.path.isfile(startnet_path): - with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh: - return fh.read() - return None - except Exception: - return None - finally: - shutil.rmtree(tmpdir, ignore_errors=True) - - -def _wim_update_startnet(wim_path, content): - """Update startnet.cmd inside a WIM file using wimupdate.""" - tmpdir = tempfile.mkdtemp() - try: - startnet_path = os.path.join(tmpdir, "startnet.cmd") - with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh: - fh.write(content) - # wimupdate reads commands from stdin - update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n" - result = subprocess.run( - ["wimupdate", wim_path, "1"], - input=update_cmd, - capture_output=True, text=True, timeout=60, - ) - if result.returncode != 0: - return False, result.stderr.strip() - return True, "" - except Exception as exc: - return False, str(exc) - finally: - shutil.rmtree(tmpdir, ignore_errors=True) - - -def _wim_list_files(wim_path, path="/"): - """List files inside a WIM at the given path.""" - try: - result = subprocess.run( - ["wimdir", wim_path, "1", path], - capture_output=True, text=True, timeout=30, - ) - if result.returncode == 0: - return [l.strip() for l in result.stdout.splitlines() if l.strip()] - return [] - except Exception: - return [] - - -@app.route("/startnet") -def startnet_editor(): - wim_exists = os.path.isfile(BOOT_WIM) - content = "" - wim_info = {} - - if wim_exists: - content = _wim_extract_startnet(BOOT_WIM) or "" - # Get WIM info - try: - result = subprocess.run( - ["wiminfo", BOOT_WIM], - capture_output=True, text=True, timeout=15, - ) - if result.returncode == 0: - for line in result.stdout.splitlines(): - if ":" in line: - key, _, val = line.partition(":") - wim_info[key.strip()] = val.strip() - except Exception: - pass - - return render_template( - "startnet_editor.html", - wim_exists=wim_exists, - wim_path=BOOT_WIM, - content=content, - wim_info=wim_info, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -@app.route("/startnet/save", methods=["POST"]) -def startnet_save(): - if not os.path.isfile(BOOT_WIM): - flash("boot.wim not found.", "danger") - return redirect(url_for("startnet_editor")) - - content = request.form.get("content", "") - ok, err = _wim_update_startnet(BOOT_WIM, content) - if ok: - audit("STARTNET_SAVE", "boot.wim updated") - flash("startnet.cmd updated successfully in boot.wim.", "success") - else: - flash(f"Failed to update boot.wim: {err}", "danger") - return redirect(url_for("startnet_editor")) - - -# --------------------------------------------------------------------------- -# Routes — Audit Log -# --------------------------------------------------------------------------- - -@app.route("/audit") -def audit_log(): - entries = [] - if os.path.isfile(AUDIT_LOG): - with open(AUDIT_LOG, "r") as fh: - for line in fh: - entries.append(line.strip()) - entries.reverse() # newest first - return render_template( - "audit.html", - entries=entries, - image_types=IMAGE_TYPES, - friendly_names=FRIENDLY_NAMES, - ) - - -# --------------------------------------------------------------------------- -# Routes — API -# --------------------------------------------------------------------------- - -@app.route("/api/services") -def api_services(): - services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")} - return jsonify(services) - - -@app.route("/api/images") -def api_images(): - images = [image_status(it) for it in IMAGE_TYPES] - return jsonify(images) - - -@app.route("/api/images//unattend", methods=["POST"]) -def api_save_unattend(image_type): - if image_type not in IMAGE_TYPES: - return jsonify({"error": "Unknown image type"}), 404 - - xml_file = unattend_path(image_type) - payload = request.get_json(silent=True) - - if not payload: - return jsonify({"error": "No JSON body provided"}), 400 - - if "raw_xml" in payload: - raw_xml = payload["raw_xml"] - try: - etree.fromstring(raw_xml.encode("utf-8")) - except etree.XMLSyntaxError as exc: - return jsonify({"error": f"Invalid XML: {exc}"}), 400 - xml_content = raw_xml - else: - try: - xml_content = build_unattend_xml(payload) - except Exception as exc: - return jsonify({"error": f"Failed to build XML: {exc}"}), 400 - - try: - os.makedirs(os.path.dirname(xml_file), exist_ok=True) - with open(xml_file, "w", encoding="utf-8") as fh: - fh.write(xml_content) - except Exception as exc: - return jsonify({"error": f"Failed to write file: {exc}"}), 500 - - audit("UNATTEND_SAVE_API", image_type) - return jsonify({"status": "ok", "path": xml_file}) - - -# --------------------------------------------------------------------------- -# Template filters -# --------------------------------------------------------------------------- - -@app.template_filter("timestamp_fmt") -def timestamp_fmt(ts): - """Format a Unix timestamp to a human-readable date string.""" - return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") - - -# --------------------------------------------------------------------------- -# Context processor — make data available to all templates -# --------------------------------------------------------------------------- - -@app.context_processor -def inject_globals(): - return { - "all_image_types": IMAGE_TYPES, - "all_friendly_names": FRIENDLY_NAMES, - } - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - -if __name__ == "__main__": - app.run(host="127.0.0.1", port=9010, debug=False, threaded=True) +#!/usr/bin/env python3 +"""Flask web application for managing a GE Aerospace PXE server. + +This file is the route surface; most logic lives in ``services/``: + services.audit - audit log writer + services.csrf - session CSRF token + before_request validator + services.fs - path helpers + JSON load/save + services.system - systemd service status + USB mounts + services.images - image_status + load_image_config + services.deploy - import_deploy + merge_tree + symlink dance + services.unattend - parse + build + form-extract for unattend.xml + services.wim - boot.wim startnet.cmd extract/update via wimtools +""" + +import json +import os +import shutil +from datetime import datetime +from pathlib import Path + +from flask import ( + Flask, + abort, + flash, + jsonify, + redirect, + render_template, + request, + send_file, + url_for, +) +from lxml import etree +from werkzeug.utils import secure_filename + +import config +from services import deploy, fs, images, system, unattend, wim +from services.audit import audit +from services.csrf import init_csrf + +app = Flask(__name__) +app.secret_key = config.FLASK_SECRET_KEY +app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH + +init_csrf(app) + + +# --------------------------------------------------------------------------- +# Routes - pages +# --------------------------------------------------------------------------- + +@app.route("/") +def dashboard(): + image_list = [images.image_status(it) for it in config.IMAGE_TYPES] + services = [system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")] + return render_template( + "dashboard.html", + images=image_list, + services=services, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +@app.route("/images/import", methods=["GET", "POST"]) +def images_import(): + usb_mounts = system.find_usb_mounts() + upload_sources = system.find_upload_sources() + image_list = [images.image_status(it) for it in config.IMAGE_TYPES] + + if request.method == "POST": + source = request.form.get("source", "") + target = request.form.get("target", "") + + if not source or not target: + flash("Please select both a source and a target image type.", "danger") + return redirect(url_for("images_import")) + + if target not in config.IMAGE_TYPES: + flash("Invalid target image type.", "danger") + return redirect(url_for("images_import")) + + if not deploy.allowed_import_source(source): + flash("Source path is not a valid import location.", "danger") + return redirect(url_for("images_import")) + + if not os.path.isdir(source): + flash(f"Source path does not exist: {source}", "danger") + return redirect(url_for("images_import")) + + root = fs.image_root(target) + dest = fs.deploy_path(target) + try: + os.makedirs(dest, exist_ok=True) + src_items = os.listdir(source) + + # Move files from network upload to save disk space; copy from USB. + use_move = source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/") + _transfer = shutil.move if use_move else shutil.copy2 + _transfer_tree = shutil.move if use_move else shutil.copytree + + top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))} + full_layout = "Deploy" in top_dirs + + if full_layout: + shared_root = [] + for prefix, dirs in config.SHARED_ROOT_DIRS.items(): + if target.startswith(prefix): + shared_root = dirs + break + + for item in src_items: + src_item = os.path.join(source, item) + if item == "Deploy": + deploy.import_deploy(src_item, dest, target, move=use_move) + elif os.path.isdir(src_item) and item in shared_root: + prefix_key = target.split("-")[0] + "-" + shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}") + os.makedirs(shared_dest, exist_ok=True) + deploy._merge_tree(src_item, shared_dest, move=use_move) + dst_item = os.path.join(root, item) + if os.path.islink(dst_item): + os.remove(dst_item) + elif os.path.isdir(dst_item): + shutil.rmtree(dst_item) + os.symlink(shared_dest, dst_item) + elif os.path.isdir(src_item): + dst_item = os.path.join(root, item) + if os.path.exists(dst_item): + shutil.rmtree(dst_item) + _transfer_tree(src_item, dst_item) + else: + _transfer(src_item, os.path.join(root, item)) + else: + deploy.import_deploy(source, dest, target, move=use_move) + + # Ensure Media.tag exists (FlatSetupLoader.exe drive detection). + control_dir = os.path.join(dest, "Control") + os.makedirs(control_dir, exist_ok=True) + media_tag = os.path.join(control_dir, "Media.tag") + Path(media_tag).touch() + + audit("IMAGE_IMPORT", f"{source} -> {target}") + flash( + f"Successfully imported content to {config.FRIENDLY_NAMES.get(target, target)}.", + "success", + ) + except Exception as exc: + flash(f"Import failed: {exc}", "danger") + + return redirect(url_for("images_import")) + + return render_template( + "import.html", + usb_mounts=usb_mounts, + upload_sources=upload_sources, + images=image_list, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +@app.route("/images//unattend", methods=["GET", "POST"]) +def unattend_editor(image_type): + if image_type not in config.IMAGE_TYPES: + flash("Unknown image type.", "danger") + return redirect(url_for("dashboard")) + + xml_file = fs.unattend_path(image_type) + + if request.method == "POST": + save_mode = request.form.get("save_mode", "form") + + if save_mode == "raw": + raw_xml = request.form.get("raw_xml", "") + try: + etree.fromstring(raw_xml.encode("utf-8")) + except etree.XMLSyntaxError as exc: + flash(f"Invalid XML: {exc}", "danger") + data = unattend.parse_unattend(xml_file) + data["raw_xml"] = raw_xml + return render_template( + "unattend_editor.html", + image_type=image_type, + friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type), + data=data, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + xml_content = raw_xml + else: + form_data = unattend.extract_form_data(request.form) + xml_content = unattend.build_unattend_xml(form_data) + + try: + os.makedirs(os.path.dirname(xml_file), exist_ok=True) + with open(xml_file, "w", encoding="utf-8") as fh: + fh.write(xml_content) + audit("UNATTEND_SAVE", f"{image_type} ({save_mode})") + flash("unattend.xml saved successfully.", "success") + except Exception as exc: + flash(f"Failed to save: {exc}", "danger") + + return redirect(url_for("unattend_editor", image_type=image_type)) + + data = unattend.parse_unattend(xml_file) + return render_template( + "unattend_editor.html", + image_type=image_type, + friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type), + data=data, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +@app.route("/images//config") +def image_config(image_type): + if image_type not in config.IMAGE_TYPES: + flash("Unknown image type.", "danger") + return redirect(url_for("dashboard")) + + cfg = images.load_image_config(image_type) + return render_template( + "image_config.html", + image_type=image_type, + friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type), + config=cfg, + ) + + +@app.route("/images//config/save", methods=["POST"]) +def image_config_save(image_type): + if image_type not in config.IMAGE_TYPES: + flash("Unknown image type.", "danger") + return redirect(url_for("dashboard")) + + section = request.form.get("section", "") + payload = request.form.get("payload", "[]") + try: + data = json.loads(payload) + except json.JSONDecodeError: + flash("Invalid JSON payload.", "danger") + return redirect(url_for("image_config", image_type=image_type)) + + ctrl = fs.control_path(image_type) + tools = fs.tools_path(image_type) + + try: + if section == "hardware_models": + us_file = os.path.join(tools, "user_selections.json") + us_raw = fs.load_json(us_file) + us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} + us_data["HardwareModelSelection"] = data + fs.save_json(us_file, [us_data]) + audit("CONFIG_SAVE", f"{image_type}/hardware_models") + + elif section == "drivers": + clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] + fs.save_json(os.path.join(ctrl, "HardwareDriver.json"), clean) + audit("CONFIG_SAVE", f"{image_type}/drivers") + + elif section == "operating_systems": + clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] + fs.save_json(os.path.join(ctrl, "OperatingSystem.json"), clean) + audit("CONFIG_SAVE", f"{image_type}/operating_systems") + + elif section == "packages": + clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] + fs.save_json(os.path.join(ctrl, "packages.json"), clean) + audit("CONFIG_SAVE", f"{image_type}/packages") + + else: + flash(f"Unknown section: {section}", "danger") + return redirect(url_for("image_config", image_type=image_type)) + + flash(f"Saved {section.replace('_', ' ')} successfully.", "success") + except Exception as exc: + flash(f"Failed to save {section}: {exc}", "danger") + + return redirect(url_for("image_config", image_type=image_type)) + + +# --------------------------------------------------------------------------- +# Routes - Clonezilla Backups +# --------------------------------------------------------------------------- + +@app.route("/backups") +def clonezilla_backups(): + backups = [] + if os.path.isdir(config.CLONEZILLA_SHARE): + for f in sorted(os.listdir(config.CLONEZILLA_SHARE)): + fpath = os.path.join(config.CLONEZILLA_SHARE, f) + if os.path.isfile(fpath) and f.lower().endswith(".zip"): + stat = os.stat(fpath) + backups.append({ + "filename": f, + "machine": os.path.splitext(f)[0], + "size": stat.st_size, + "modified": stat.st_mtime, + }) + return render_template( + "backups.html", + backups=backups, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +@app.route("/backups/upload", methods=["POST"]) +def clonezilla_upload(): + if "backup_file" not in request.files: + flash("No file selected.", "danger") + return redirect(url_for("clonezilla_backups")) + + f = request.files["backup_file"] + if not f.filename: + flash("No file selected.", "danger") + return redirect(url_for("clonezilla_backups")) + + filename = secure_filename(f.filename) + if not filename.lower().endswith(".zip"): + flash("Only .zip files are accepted.", "danger") + return redirect(url_for("clonezilla_backups")) + + os.makedirs(config.CLONEZILLA_SHARE, exist_ok=True) + dest = os.path.join(config.CLONEZILLA_SHARE, filename) + f.save(dest) + audit("BACKUP_UPLOAD", filename) + flash(f"Uploaded {filename} successfully.", "success") + return redirect(url_for("clonezilla_backups")) + + +@app.route("/backups/download/") +def clonezilla_download(filename): + filename = secure_filename(filename) + fpath = os.path.join(config.CLONEZILLA_SHARE, filename) + if not os.path.isfile(fpath): + flash(f"Backup not found: {filename}", "danger") + return redirect(url_for("clonezilla_backups")) + return send_file(fpath, as_attachment=True) + + +@app.route("/backups/delete/", methods=["POST"]) +def clonezilla_delete(filename): + filename = secure_filename(filename) + fpath = os.path.join(config.CLONEZILLA_SHARE, filename) + if os.path.isfile(fpath): + os.remove(fpath) + audit("BACKUP_DELETE", filename) + flash(f"Deleted {filename}.", "success") + else: + flash(f"Backup not found: {filename}", "danger") + return redirect(url_for("clonezilla_backups")) + + +# --------------------------------------------------------------------------- +# Routes - Blancco Reports +# --------------------------------------------------------------------------- + +@app.route("/reports") +def blancco_reports(): + reports = [] + if os.path.isdir(config.BLANCCO_REPORTS): + for f in sorted(os.listdir(config.BLANCCO_REPORTS), reverse=True): + fpath = os.path.join(config.BLANCCO_REPORTS, f) + if os.path.isfile(fpath): + stat = os.stat(fpath) + ext = os.path.splitext(f)[1].lower() + reports.append({ + "filename": f, + "size": stat.st_size, + "modified": stat.st_mtime, + "type": ext.lstrip(".").upper() or "FILE", + }) + return render_template( + "reports.html", + reports=reports, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +@app.route("/reports/download/") +def blancco_download_report(filename): + filename = secure_filename(filename) + fpath = os.path.join(config.BLANCCO_REPORTS, filename) + if not os.path.isfile(fpath): + flash(f"Report not found: {filename}", "danger") + return redirect(url_for("blancco_reports")) + return send_file(fpath, as_attachment=True) + + +@app.route("/reports/delete/", methods=["POST"]) +def blancco_delete_report(filename): + filename = secure_filename(filename) + fpath = os.path.join(config.BLANCCO_REPORTS, filename) + if os.path.isfile(fpath): + os.remove(fpath) + audit("REPORT_DELETE", filename) + flash(f"Deleted {filename}.", "success") + else: + flash(f"Report not found: {filename}", "danger") + return redirect(url_for("blancco_reports")) + + +# --------------------------------------------------------------------------- +# Routes - Enrollment Packages +# --------------------------------------------------------------------------- + +@app.route("/enrollment") +def enrollment(): + packages = [] + if os.path.isdir(config.ENROLLMENT_SHARE): + for f in sorted(os.listdir(config.ENROLLMENT_SHARE)): + fpath = os.path.join(config.ENROLLMENT_SHARE, f) + if os.path.isfile(fpath) and f.lower().endswith(".ppkg"): + stat = os.stat(fpath) + packages.append({ + "filename": f, + "size": stat.st_size, + "modified": stat.st_mtime, + }) + return render_template( + "enrollment.html", + packages=packages, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +@app.route("/enrollment/upload", methods=["POST"]) +def enrollment_upload(): + if "ppkg_file" not in request.files: + flash("No file selected.", "danger") + return redirect(url_for("enrollment")) + + f = request.files["ppkg_file"] + if not f.filename: + flash("No file selected.", "danger") + return redirect(url_for("enrollment")) + + filename = secure_filename(f.filename) + if not filename.lower().endswith(".ppkg"): + flash("Only .ppkg files are accepted.", "danger") + return redirect(url_for("enrollment")) + + os.makedirs(config.ENROLLMENT_SHARE, exist_ok=True) + dest = os.path.join(config.ENROLLMENT_SHARE, filename) + f.save(dest) + audit("ENROLLMENT_UPLOAD", filename) + flash(f"Uploaded {filename} successfully.", "success") + return redirect(url_for("enrollment")) + + +@app.route("/enrollment/download/") +def enrollment_download(filename): + filename = secure_filename(filename) + fpath = os.path.join(config.ENROLLMENT_SHARE, filename) + if not os.path.isfile(fpath): + flash(f"Package not found: {filename}", "danger") + return redirect(url_for("enrollment")) + return send_file(fpath, as_attachment=True) + + +@app.route("/enrollment/delete/", methods=["POST"]) +def enrollment_delete(filename): + filename = secure_filename(filename) + fpath = os.path.join(config.ENROLLMENT_SHARE, filename) + if os.path.isfile(fpath): + os.remove(fpath) + audit("ENROLLMENT_DELETE", filename) + flash(f"Deleted {filename}.", "success") + else: + flash(f"Package not found: {filename}", "danger") + return redirect(url_for("enrollment")) + + +# --------------------------------------------------------------------------- +# Routes - startnet.cmd Editor (boot.wim) +# --------------------------------------------------------------------------- + +@app.route("/startnet") +def startnet_editor(): + import subprocess + wim_exists = os.path.isfile(config.BOOT_WIM) + content = "" + wim_info = {} + + if wim_exists: + content = wim.extract_startnet(config.BOOT_WIM) or "" + try: + result = subprocess.run( + ["wiminfo", config.BOOT_WIM], + capture_output=True, text=True, timeout=15, + ) + if result.returncode == 0: + for line in result.stdout.splitlines(): + if ":" in line: + key, _, val = line.partition(":") + wim_info[key.strip()] = val.strip() + except Exception: + pass + + return render_template( + "startnet_editor.html", + wim_exists=wim_exists, + wim_path=config.BOOT_WIM, + content=content, + wim_info=wim_info, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +@app.route("/startnet/save", methods=["POST"]) +def startnet_save(): + if not os.path.isfile(config.BOOT_WIM): + flash("boot.wim not found.", "danger") + return redirect(url_for("startnet_editor")) + + content = request.form.get("content", "") + ok, err = wim.update_startnet(config.BOOT_WIM, content) + if ok: + audit("STARTNET_SAVE", "boot.wim updated") + flash("startnet.cmd updated successfully in boot.wim.", "success") + else: + flash(f"Failed to update boot.wim: {err}", "danger") + return redirect(url_for("startnet_editor")) + + +# --------------------------------------------------------------------------- +# Routes - Audit Log +# --------------------------------------------------------------------------- + +@app.route("/audit") +def audit_log(): + entries = [] + if os.path.isfile(config.AUDIT_LOG): + with open(config.AUDIT_LOG, "r") as fh: + for line in fh: + entries.append(line.strip()) + entries.reverse() + return render_template( + "audit.html", + entries=entries, + image_types=config.IMAGE_TYPES, + friendly_names=config.FRIENDLY_NAMES, + ) + + +# --------------------------------------------------------------------------- +# Routes - JSON API +# --------------------------------------------------------------------------- + +@app.route("/api/services") +def api_services(): + services = {s: system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")} + return jsonify(services) + + +@app.route("/api/images") +def api_images(): + image_list = [images.image_status(it) for it in config.IMAGE_TYPES] + return jsonify(image_list) + + +@app.route("/api/images//unattend", methods=["POST"]) +def api_save_unattend(image_type): + if image_type not in config.IMAGE_TYPES: + return jsonify({"error": "Unknown image type"}), 404 + + xml_file = fs.unattend_path(image_type) + payload = request.get_json(silent=True) + + if not payload: + return jsonify({"error": "No JSON body provided"}), 400 + + if "raw_xml" in payload: + raw_xml = payload["raw_xml"] + try: + etree.fromstring(raw_xml.encode("utf-8")) + except etree.XMLSyntaxError as exc: + return jsonify({"error": f"Invalid XML: {exc}"}), 400 + xml_content = raw_xml + else: + try: + xml_content = unattend.build_unattend_xml(payload) + except Exception as exc: + return jsonify({"error": f"Failed to build XML: {exc}"}), 400 + + try: + os.makedirs(os.path.dirname(xml_file), exist_ok=True) + with open(xml_file, "w", encoding="utf-8") as fh: + fh.write(xml_content) + except Exception as exc: + return jsonify({"error": f"Failed to write file: {exc}"}), 500 + + audit("UNATTEND_SAVE_API", image_type) + return jsonify({"status": "ok", "path": xml_file}) + + +# --------------------------------------------------------------------------- +# Template helpers +# --------------------------------------------------------------------------- + +@app.template_filter("timestamp_fmt") +def timestamp_fmt(ts): + """Format a Unix timestamp to a human-readable date string.""" + return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") + + +@app.context_processor +def inject_globals(): + return { + "all_image_types": config.IMAGE_TYPES, + "all_friendly_names": config.FRIENDLY_NAMES, + } + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + app.run(host="127.0.0.1", port=9010, debug=False, threaded=True) diff --git a/webapp/config.py b/webapp/config.py new file mode 100644 index 0000000..249ec61 --- /dev/null +++ b/webapp/config.py @@ -0,0 +1,66 @@ +"""Configuration constants for the PXE webapp. + +Reads from environment variables with sensible defaults that match the +Ansible playbook's deploy paths. Also defines the canonical image type +list and shared-directory taxonomy used by the import pipeline. +""" + +import os + +# --- Filesystem paths -------------------------------------------------------- +SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps") +CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla") +BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports") +ENROLLMENT_SHARE = os.environ.get("ENROLLMENT_SHARE", "/srv/samba/enrollment") +UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload") +SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared") +WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html") +BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim") +AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log") + +# --- Flask ------------------------------------------------------------------- +FLASK_SECRET_KEY = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod") +MAX_CONTENT_LENGTH = 16 * 1024 * 1024 * 1024 # 16 GB max upload + +# --- Image taxonomy ---------------------------------------------------------- +# Subdirs inside Deploy/ shared across ALL image types. +SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"] + +# Subdirs inside Deploy/ shared within the same image family (by prefix). +SHARED_DEPLOY_SCOPED = { + "gea-": ["Operating Systems", "Packages"], + "ge-": ["Operating Systems", "Packages"], +} + +# Sibling dirs at image root shared within the same image family. +SHARED_ROOT_DIRS = { + "gea-": ["Sources"], + "ge-": ["Sources"], +} + +IMAGE_TYPES = [ + "gea-standard", + "gea-engineer", + "gea-shopfloor", + "gea-shopfloor-mce", + "ge-standard", + "ge-engineer", + "ge-shopfloor-lockdown", + "ge-shopfloor-mce", +] + +FRIENDLY_NAMES = { + "gea-standard": "GE Aerospace Standard", + "gea-engineer": "GE Aerospace Engineer", + "gea-shopfloor": "GE Aerospace Shop Floor", + "gea-shopfloor-mce": "GE Aerospace Shop Floor MCE", + "ge-standard": "GE Legacy Standard", + "ge-engineer": "GE Legacy Engineer", + "ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown", + "ge-shopfloor-mce": "GE Legacy Shop Floor MCE", +} + +# --- Unattend XML namespaces ------------------------------------------------- +UNATTEND_NS = "urn:schemas-microsoft-com:unattend" +WCM_NS = "http://schemas.microsoft.com/WMIConfig/2002/State" +NSMAP = {None: UNATTEND_NS, "wcm": WCM_NS} diff --git a/webapp/services/__init__.py b/webapp/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/webapp/services/audit.py b/webapp/services/audit.py new file mode 100644 index 0000000..e872e84 --- /dev/null +++ b/webapp/services/audit.py @@ -0,0 +1,26 @@ +"""Audit logging for the PXE webapp. + +Every write-action route should call ``audit(action, detail)`` to record +who did what. The log lives at ``AUDIT_LOG`` (configurable via env var) +and is consumed by the /audit page. +""" + +import logging + +from flask import request + +import config + +_audit_handler = logging.FileHandler(config.AUDIT_LOG, mode="a") +_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) + +audit_logger = logging.getLogger("pxe_audit") +audit_logger.setLevel(logging.INFO) +if not audit_logger.handlers: + audit_logger.addHandler(_audit_handler) + + +def audit(action, detail=""): + """Write an entry to the audit log.""" + ip = request.remote_addr if request else "system" + audit_logger.info(f"[{ip}] {action}: {detail}") diff --git a/webapp/services/csrf.py b/webapp/services/csrf.py new file mode 100644 index 0000000..b1a60d3 --- /dev/null +++ b/webapp/services/csrf.py @@ -0,0 +1,28 @@ +"""Session-based CSRF token: generate per-session, double-submit on POST.""" + +import secrets + +from flask import abort, request, session + + +def generate_csrf_token(): + """Return the CSRF token for the current session, creating one if needed.""" + if "_csrf_token" not in session: + session["_csrf_token"] = secrets.token_hex(32) + return session["_csrf_token"] + + +def init_csrf(app): + """Wire CSRF protection into a Flask app: validator + template helper.""" + + @app.before_request + def _validate_csrf(): + if request.method != "POST": + return + token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token") + if not token or token != generate_csrf_token(): + abort(403) + + @app.context_processor + def _inject_csrf_token(): + return {"csrf_token": generate_csrf_token} diff --git a/webapp/services/deploy.py b/webapp/services/deploy.py new file mode 100644 index 0000000..629758f --- /dev/null +++ b/webapp/services/deploy.py @@ -0,0 +1,96 @@ +"""Image deploy import logic: copy/move from a USB or upload-dir source +into ``SAMBA_SHARE//Deploy/`` while merging shared subdirs +(``Out-of-box Drivers`` etc.) into ``SAMBA_SHARE/_shared/`` and replacing +the per-image copies with symlinks. This is what lets two image types +re-use the same multi-GB driver tree without doubling disk usage. +""" + +import os +import shutil + +import config +from services.system import find_usb_mounts + + +def _replace_with_symlink(link_path, target_path): + """Replace a file/dir/symlink at link_path with a symlink to target_path.""" + if os.path.islink(link_path): + os.remove(link_path) + elif os.path.isdir(link_path): + shutil.rmtree(link_path) + os.symlink(target_path, link_path) + + +def _merge_tree(src, dst, move=False): + """Recursively merge src tree into dst, overwriting existing files. + + When move=True, files are moved instead of copied (saves disk space + on imports from the local upload-dir). + """ + _transfer = shutil.move if move else shutil.copy2 + _transfer_tree = shutil.move if move else shutil.copytree + for item in os.listdir(src): + s = os.path.join(src, item) + d = os.path.join(dst, item) + if os.path.isdir(s): + if os.path.isdir(d): + _merge_tree(s, d, move=move) + else: + if os.path.exists(d): + os.remove(d) + _transfer_tree(s, d) + else: + os.makedirs(os.path.dirname(d), exist_ok=True) + _transfer(s, d) + + +def import_deploy(src_deploy, dst_deploy, target="", move=False): + """Import Deploy/ contents, redirecting shared subdirs into _shared/.""" + scoped_shared = [] + prefix_key = "" + for prefix, dirs in config.SHARED_DEPLOY_SCOPED.items(): + if target.startswith(prefix): + scoped_shared = dirs + prefix_key = prefix + break + + _transfer = shutil.move if move else shutil.copy2 + _transfer_tree = shutil.move if move else shutil.copytree + + os.makedirs(dst_deploy, exist_ok=True) + for item in os.listdir(src_deploy): + src_item = os.path.join(src_deploy, item) + dst_item = os.path.join(dst_deploy, item) + + if not os.path.isdir(src_item): + _transfer(src_item, dst_item) + continue + + if item in config.SHARED_DEPLOY_GLOBAL: + shared_dest = os.path.join(config.SHARED_DIR, item) + os.makedirs(shared_dest, exist_ok=True) + _merge_tree(src_item, shared_dest, move=move) + _replace_with_symlink(dst_item, shared_dest) + continue + + if item in scoped_shared: + shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}") + os.makedirs(shared_dest, exist_ok=True) + _merge_tree(src_item, shared_dest, move=move) + _replace_with_symlink(dst_item, shared_dest) + continue + + if os.path.isdir(dst_item): + _merge_tree(src_item, dst_item, move=move) + else: + _transfer_tree(src_item, dst_item) + + +def allowed_import_source(source): + """True if source is a USB mount or under the upload dir.""" + usb = find_usb_mounts() + if any(source == m or source.startswith(m + "/") for m in usb): + return True + if source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/"): + return os.path.isdir(source) + return False diff --git a/webapp/services/fs.py b/webapp/services/fs.py new file mode 100644 index 0000000..0031a25 --- /dev/null +++ b/webapp/services/fs.py @@ -0,0 +1,75 @@ +"""Filesystem path helpers + tiny JSON load/save utilities. + +All paths are derived from ``config.SAMBA_SHARE`` so swapping the share +root only requires changing one env var. +""" + +import json +import os + +import config + + +def image_root(image_type): + """Return the root directory for an image type.""" + return os.path.join(config.SAMBA_SHARE, image_type) + + +def deploy_path(image_type): + """Return the Deploy directory for an image type.""" + return os.path.join(config.SAMBA_SHARE, image_type, "Deploy") + + +def unattend_path(image_type): + """Return the unattend.xml path for an image type.""" + return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml") + + +def control_path(image_type): + """Return the Deploy/Control directory for an image type.""" + return os.path.join(deploy_path(image_type), "Control") + + +def tools_path(image_type): + """Return the Tools directory for an image type.""" + return os.path.join(config.SAMBA_SHARE, image_type, "Tools") + + +def load_json(filepath): + """Parse a JSON file and return its contents, or [] on failure.""" + try: + with open(filepath, "r", encoding="utf-8-sig") as fh: + return json.load(fh) + except (OSError, json.JSONDecodeError): + return [] + + +def save_json(filepath, data): + """Write data as pretty-printed JSON, creating parent dirs as needed.""" + os.makedirs(os.path.dirname(filepath), exist_ok=True) + with open(filepath, "w", encoding="utf-8") as fh: + json.dump(data, fh, indent=2, ensure_ascii=False) + fh.write("\n") + + +def resolve_destination(dest_dir, image_type): + """Convert a Windows ``*destinationdir*`` path to a Linux filesystem path. + + Replaces the placeholder + backslashes, prepends + ``SAMBA_SHARE/image_type/``, then resolves symlinks so shared dirs + are followed. + """ + if not dest_dir: + return "" + path = dest_dir + lower = path.lower() + idx = lower.find("*destinationdir*") + if idx != -1: + path = path[idx + len("*destinationdir*"):] + path = path.replace("\\", "/").lstrip("/") + full = os.path.join(config.SAMBA_SHARE, image_type, path) + try: + full = os.path.realpath(full) + except OSError: + pass + return full diff --git a/webapp/services/images.py b/webapp/services/images.py new file mode 100644 index 0000000..1fc137f --- /dev/null +++ b/webapp/services/images.py @@ -0,0 +1,123 @@ +"""Per-image-type state probes: status + config (drivers / OS / packages / models).""" + +import os + +import config +from services import fs + + +def image_status(image_type): + """Return a dict describing the state of an image type.""" + dp = fs.deploy_path(image_type) + up = fs.unattend_path(image_type) + has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False + has_unattend = os.path.isfile(up) + return { + "image_type": image_type, + "friendly_name": config.FRIENDLY_NAMES.get(image_type, image_type), + "deploy_path": dp, + "has_content": has_content, + "has_unattend": has_unattend, + } + + +def load_image_config(image_type): + """Load all JSON configs for an image and check on-disk presence.""" + ctrl = fs.control_path(image_type) + tools = fs.tools_path(image_type) + + # --- Drivers (merge HardwareDriver.json + hw_drivers.json) --- + hw_driver_file = os.path.join(ctrl, "HardwareDriver.json") + hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json") + drivers_raw = fs.load_json(hw_driver_file) + extra_raw = fs.load_json(hw_drivers_extra) + + seen_files = set() + drivers = [] + for d in drivers_raw + extra_raw: + fname = (d.get("FileName") or d.get("fileName") or "").lower() + if fname and fname in seen_files: + continue + if fname: + seen_files.add(fname) + drivers.append(d) + + for d in drivers: + fname = d.get("FileName") or d.get("fileName") or "" + dest = d.get("DestinationDir") or d.get("destinationDir") or "" + resolved = fs.resolve_destination(dest, image_type) + if resolved and fname: + d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) + else: + d["_on_disk"] = False + + # --- Operating Systems --- + os_file = os.path.join(ctrl, "OperatingSystem.json") + operating_systems = fs.load_json(os_file) + for entry in operating_systems: + osv = entry.get("operatingSystemVersion", {}) + wim = osv.get("wim", {}) + dest = wim.get("DestinationDir") or wim.get("destinationDir") or "" + resolved = fs.resolve_destination(dest, image_type) + if resolved: + entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim")) + else: + entry["_on_disk"] = False + + # --- Packages --- + pkg_file = os.path.join(ctrl, "packages.json") + packages = fs.load_json(pkg_file) + for p in packages: + fname = p.get("fileName") or p.get("FileName") or "" + dest = p.get("destinationDir") or p.get("DestinationDir") or "" + resolved = fs.resolve_destination(dest, image_type) + if resolved and fname: + p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) + else: + p["_on_disk"] = False + + # --- Hardware Models (user_selections.json) --- + us_file = os.path.join(tools, "user_selections.json") + us_raw = fs.load_json(us_file) + us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} + hardware_models = us_data.get("HardwareModelSelection", []) + os_selection = str(us_data.get("OperatingSystemSelection", "")) + + family_lookup = {} + for d in drivers: + family = d.get("family", "") + if family: + family_lookup[family] = d + + for hm in hardware_models: + family_id = hm.get("Id", "") + matched = family_lookup.get(family_id) + hm["_on_disk"] = matched["_on_disk"] if matched else False + + # --- Orphan drivers: zip files on disk not referenced in any JSON --- + orphan_drivers = [] + oob_dir = os.path.join(fs.deploy_path(image_type), "Out-of-box Drivers") + try: + oob_dir = os.path.realpath(oob_dir) + except OSError: + pass + registered_files = set() + for d in drivers: + fname = d.get("FileName") or d.get("fileName") or "" + if fname: + registered_files.add(fname.lower()) + if os.path.isdir(oob_dir): + for dirpath, _dirnames, filenames in os.walk(oob_dir): + for fn in filenames: + if fn.lower().endswith(".zip") and fn.lower() not in registered_files: + rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir) + orphan_drivers.append({"fileName": fn, "relPath": rel}) + + return { + "hardware_models": hardware_models, + "drivers": drivers, + "operating_systems": operating_systems, + "packages": packages, + "orphan_drivers": orphan_drivers, + "os_selection": os_selection, + } diff --git a/webapp/services/system.py b/webapp/services/system.py new file mode 100644 index 0000000..f926521 --- /dev/null +++ b/webapp/services/system.py @@ -0,0 +1,55 @@ +"""Host-system probes: systemd service status, USB mounts, upload sources.""" + +import os +import subprocess + +import config + + +def service_status(service_name): + """Check whether a systemd service is active.""" + try: + result = subprocess.run( + ["systemctl", "is-active", service_name], + capture_output=True, + text=True, + timeout=5, + ) + state = result.stdout.strip() + return {"name": service_name, "active": state == "active", "state": state} + except Exception as exc: + return {"name": service_name, "active": False, "state": str(exc)} + + +def find_usb_mounts(): + """Return a list of mount-point paths that look like removable media.""" + mounts = [] + try: + with open("/proc/mounts", "r") as fh: + for line in fh: + parts = line.split() + if len(parts) >= 2: + mount_point = parts[1] + if mount_point.startswith(("/mnt/", "/media/")): + if os.path.isdir(mount_point): + mounts.append(mount_point) + except OSError: + pass + return sorted(set(mounts)) + + +def find_upload_sources(): + """Return sub-directories inside UPLOAD_DIR that look like image content.""" + sources = [] + if os.path.isdir(config.UPLOAD_DIR): + try: + entries = os.listdir(config.UPLOAD_DIR) + if entries: + sources.append(config.UPLOAD_DIR) + for entry in entries: + full = os.path.join(config.UPLOAD_DIR, entry) + if os.path.isdir(full): + sources.append(full) + except OSError: + pass + return sources diff --git a/webapp/services/unattend.py b/webapp/services/unattend.py new file mode 100644 index 0000000..8308147 --- /dev/null +++ b/webapp/services/unattend.py @@ -0,0 +1,496 @@ +"""Unattend.xml parser, builder, and form-data extractor. + +Round-trips a Windows ``FlatUnattendW10.xml`` between the disk format +and a flat dict the editor template can render. ``parse_unattend()`` +reads disk + returns a dict; ``build_unattend_xml()`` takes the dict ++ returns a string for writing back; ``extract_form_data()`` converts +a Flask ``request.form`` MultiDict into the dict format that +``build_unattend_xml()`` expects. +""" + +import os + +from lxml import etree + +import config + +UNATTEND_TEMPLATE = """\ + + + + + + + +""" + + +def qn(tag): + """Return a tag qualified with the default unattend namespace.""" + return f"{{{config.UNATTEND_NS}}}{tag}" + + +def qwcm(attr): + """Return an attribute qualified with the wcm namespace.""" + return f"{{{config.WCM_NS}}}{attr}" + + +def _find_or_create(parent, tag): + """Find the first child with *tag* or create it.""" + el = parent.find(tag, namespaces={"": config.UNATTEND_NS}) + if el is None: + el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag) + return el + + +def _settings_pass(root, pass_name): + """Return the element, creating if needed.""" + for s in root.findall(qn("settings")): + if s.get("pass") == pass_name: + return s + s = etree.SubElement(root, qn("settings")) + s.set("pass", pass_name) + return s + + +def parse_unattend(xml_path): + """Parse an unattend.xml and return a dict of editable data.""" + data = { + "driver_paths": [], + "computer_name": "", + "registered_organization": "", + "registered_owner": "", + "time_zone": "", + "specialize_commands": [], + "oobe": { + "HideEULAPage": "true", + "HideOEMRegistrationScreen": "true", + "HideOnlineAccountScreens": "true", + "HideWirelessSetupInOOBE": "true", + "HideLocalAccountScreen": "true", + "NetworkLocation": "Work", + "ProtectYourPC": "3", + "SkipUserOOBE": "true", + "SkipMachineOOBE": "true", + }, + "firstlogon_commands": [], + "user_accounts": [], + "autologon": { + "enabled": "", + "username": "", + "password": "", + "plain_text": "true", + "logon_count": "", + }, + "intl": { + "input_locale": "", + "system_locale": "", + "ui_language": "", + "user_locale": "", + }, + "oobe_timezone": "", + "raw_xml": "", + } + + if not os.path.isfile(xml_path): + data["raw_xml"] = UNATTEND_TEMPLATE + return data + + with open(xml_path, "r", encoding="utf-8") as fh: + raw = fh.read() + data["raw_xml"] = raw + + try: + root = etree.fromstring(raw.encode("utf-8")) + except etree.XMLSyntaxError: + return data + + ns = {"u": config.UNATTEND_NS} + + # --- offlineServicing: DriverPaths --- + for dp_el in root.xpath( + "u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path", + namespaces=ns, + ): + if dp_el.text: + data["driver_paths"].append(dp_el.text.strip()) + + # --- specialize: Shell-Setup --- + for comp in root.xpath("u:settings[@pass='specialize']/u:component", namespaces=ns): + comp_name = comp.get("name", "") + if "Shell-Setup" in comp_name: + for tag, key in [ + ("ComputerName", "computer_name"), + ("RegisteredOrganization", "registered_organization"), + ("RegisteredOwner", "registered_owner"), + ("TimeZone", "time_zone"), + ]: + el = comp.find(qn(tag)) + if el is not None and el.text: + data[key] = el.text.strip() + + # --- specialize: RunSynchronous commands --- + for cmd in root.xpath( + "u:settings[@pass='specialize']//u:RunSynchronousCommand", + namespaces=ns, + ): + order_el = cmd.find(qn("Order")) + path_el = cmd.find(qn("Path")) + desc_el = cmd.find(qn("Description")) + data["specialize_commands"].append({ + "order": order_el.text.strip() if order_el is not None and order_el.text else "", + "path": path_el.text.strip() if path_el is not None and path_el.text else "", + "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", + }) + + # --- oobeSystem --- + for comp in root.xpath("u:settings[@pass='oobeSystem']/u:component", namespaces=ns): + comp_name = comp.get("name", "") + + if "International-Core" in comp_name: + for tag, key in [ + ("InputLocale", "input_locale"), + ("SystemLocale", "system_locale"), + ("UILanguage", "ui_language"), + ("UserLocale", "user_locale"), + ]: + el = comp.find(qn(tag)) + if el is not None and el.text: + data["intl"][key] = el.text.strip() + + if "OOBE" in comp_name or "Shell-Setup" in comp_name: + oobe_el = comp.find(qn("OOBE")) + if oobe_el is not None: + for child in oobe_el: + local = etree.QName(child).localname + if local in data["oobe"] and child.text: + data["oobe"][local] = child.text.strip() + + ua = comp.find(qn("UserAccounts")) + if ua is not None: + la_container = ua.find(qn("LocalAccounts")) + if la_container is not None: + for acct in la_container.findall(qn("LocalAccount")): + name_el = acct.find(qn("Name")) + group_el = acct.find(qn("Group")) + display_el = acct.find(qn("DisplayName")) + pw_el = acct.find(qn("Password")) + pw_val = "" + pw_plain = "true" + if pw_el is not None: + v = pw_el.find(qn("Value")) + p = pw_el.find(qn("PlainText")) + if v is not None and v.text: + pw_val = v.text.strip() + if p is not None and p.text: + pw_plain = p.text.strip() + data["user_accounts"].append({ + "name": name_el.text.strip() if name_el is not None and name_el.text else "", + "password": pw_val, + "plain_text": pw_plain, + "group": group_el.text.strip() if group_el is not None and group_el.text else "Administrators", + "display_name": display_el.text.strip() if display_el is not None and display_el.text else "", + }) + + al = comp.find(qn("AutoLogon")) + if al is not None: + enabled_el = al.find(qn("Enabled")) + user_el = al.find(qn("Username")) + count_el = al.find(qn("LogonCount")) + pw_el = al.find(qn("Password")) + pw_val = "" + pw_plain = "true" + if pw_el is not None: + v = pw_el.find(qn("Value")) + p = pw_el.find(qn("PlainText")) + if v is not None and v.text: + pw_val = v.text.strip() + if p is not None and p.text: + pw_plain = p.text.strip() + data["autologon"] = { + "enabled": enabled_el.text.strip() if enabled_el is not None and enabled_el.text else "", + "username": user_el.text.strip() if user_el is not None and user_el.text else "", + "password": pw_val, + "plain_text": pw_plain, + "logon_count": count_el.text.strip() if count_el is not None and count_el.text else "", + } + + flc = comp.find(qn("FirstLogonCommands")) + if flc is not None: + for sync in flc.findall(qn("SynchronousCommand")): + order_el = sync.find(qn("Order")) + cl_el = sync.find(qn("CommandLine")) + desc_el = sync.find(qn("Description")) + data["firstlogon_commands"].append({ + "order": order_el.text.strip() if order_el is not None and order_el.text else "", + "commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "", + "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", + }) + + tz_el = comp.find(qn("TimeZone")) + if tz_el is not None and tz_el.text: + data["oobe_timezone"] = tz_el.text.strip() + + return data + + +def build_unattend_xml(form_data): + """Build a complete unattend.xml string from form data dict.""" + root = etree.Element(qn("unattend"), nsmap=config.NSMAP) + + _settings_pass(root, "windowsPE") + + # --- offlineServicing: DriverPaths --- + offline = _settings_pass(root, "offlineServicing") + driver_paths = form_data.get("driver_paths", []) + if driver_paths: + comp = etree.SubElement(offline, qn("component")) + comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE") + comp.set("processorArchitecture", "amd64") + comp.set("publicKeyToken", "31bf3856ad364e35") + comp.set("language", "neutral") + comp.set("versionScope", "nonSxS") + dp_container = etree.SubElement(comp, qn("DriverPaths")) + for idx, dp in enumerate(driver_paths, start=1): + if not dp.strip(): + continue + pac = etree.SubElement(dp_container, qn("PathAndCredentials")) + pac.set(qwcm("action"), "add") + pac.set(qwcm("keyValue"), str(idx)) + path_el = etree.SubElement(pac, qn("Path")) + path_el.text = dp.strip() + + # --- specialize --- + spec = _settings_pass(root, "specialize") + + shell_comp = etree.SubElement(spec, qn("component")) + shell_comp.set("name", "Microsoft-Windows-Shell-Setup") + shell_comp.set("processorArchitecture", "amd64") + shell_comp.set("publicKeyToken", "31bf3856ad364e35") + shell_comp.set("language", "neutral") + shell_comp.set("versionScope", "nonSxS") + + for tag, key in [ + ("ComputerName", "computer_name"), + ("RegisteredOrganization", "registered_organization"), + ("RegisteredOwner", "registered_owner"), + ("TimeZone", "time_zone"), + ]: + val = form_data.get(key, "").strip() + if val: + el = etree.SubElement(shell_comp, qn(tag)) + el.text = val + + spec_cmds = form_data.get("specialize_commands", []) + if spec_cmds: + deploy_comp = etree.SubElement(spec, qn("component")) + deploy_comp.set("name", "Microsoft-Windows-Deployment") + deploy_comp.set("processorArchitecture", "amd64") + deploy_comp.set("publicKeyToken", "31bf3856ad364e35") + deploy_comp.set("language", "neutral") + deploy_comp.set("versionScope", "nonSxS") + rs = etree.SubElement(deploy_comp, qn("RunSynchronous")) + for idx, cmd in enumerate(spec_cmds, start=1): + if not cmd.get("path", "").strip(): + continue + rsc = etree.SubElement(rs, qn("RunSynchronousCommand")) + rsc.set(qwcm("action"), "add") + order_el = etree.SubElement(rsc, qn("Order")) + order_el.text = str(idx) + path_el = etree.SubElement(rsc, qn("Path")) + path_el.text = cmd["path"].strip() + desc_el = etree.SubElement(rsc, qn("Description")) + desc_el.text = cmd.get("description", "").strip() + + # --- oobeSystem --- + oobe_settings = _settings_pass(root, "oobeSystem") + + intl = form_data.get("intl", {}) + if any(v.strip() for v in intl.values() if v): + intl_comp = etree.SubElement(oobe_settings, qn("component")) + intl_comp.set("name", "Microsoft-Windows-International-Core") + intl_comp.set("processorArchitecture", "amd64") + intl_comp.set("publicKeyToken", "31bf3856ad364e35") + intl_comp.set("language", "neutral") + intl_comp.set("versionScope", "nonSxS") + for tag, key in [ + ("InputLocale", "input_locale"), + ("SystemLocale", "system_locale"), + ("UILanguage", "ui_language"), + ("UserLocale", "user_locale"), + ]: + val = intl.get(key, "").strip() + if val: + el = etree.SubElement(intl_comp, qn(tag)) + el.text = val + + oobe_comp = etree.SubElement(oobe_settings, qn("component")) + oobe_comp.set("name", "Microsoft-Windows-Shell-Setup") + oobe_comp.set("processorArchitecture", "amd64") + oobe_comp.set("publicKeyToken", "31bf3856ad364e35") + oobe_comp.set("language", "neutral") + oobe_comp.set("versionScope", "nonSxS") + + oobe_el = etree.SubElement(oobe_comp, qn("OOBE")) + oobe_data = form_data.get("oobe", {}) + for key in [ + "HideEULAPage", + "HideOEMRegistrationScreen", + "HideOnlineAccountScreens", + "HideWirelessSetupInOOBE", + "HideLocalAccountScreen", + "NetworkLocation", + "ProtectYourPC", + "SkipUserOOBE", + "SkipMachineOOBE", + ]: + val = oobe_data.get(key, "") + if val: + child = etree.SubElement(oobe_el, qn(key)) + child.text = str(val) + + accounts = form_data.get("user_accounts", []) + if accounts: + ua = etree.SubElement(oobe_comp, qn("UserAccounts")) + la_container = etree.SubElement(ua, qn("LocalAccounts")) + for acct in accounts: + if not acct.get("name", "").strip(): + continue + la = etree.SubElement(la_container, qn("LocalAccount")) + la.set(qwcm("action"), "add") + pw = etree.SubElement(la, qn("Password")) + pw_val = etree.SubElement(pw, qn("Value")) + pw_val.text = acct.get("password", "") + pw_plain = etree.SubElement(pw, qn("PlainText")) + pw_plain.text = acct.get("plain_text", "true") + name_el = etree.SubElement(la, qn("Name")) + name_el.text = acct["name"].strip() + group_el = etree.SubElement(la, qn("Group")) + group_el.text = acct.get("group", "Administrators").strip() + display_el = etree.SubElement(la, qn("DisplayName")) + display_el.text = acct.get("display_name", acct["name"]).strip() + + autologon = form_data.get("autologon", {}) + if autologon.get("username", "").strip(): + al = etree.SubElement(oobe_comp, qn("AutoLogon")) + al_pw = etree.SubElement(al, qn("Password")) + al_pw_val = etree.SubElement(al_pw, qn("Value")) + al_pw_val.text = autologon.get("password", "") + al_pw_plain = etree.SubElement(al_pw, qn("PlainText")) + al_pw_plain.text = autologon.get("plain_text", "true") + al_enabled = etree.SubElement(al, qn("Enabled")) + al_enabled.text = autologon.get("enabled", "true") + al_user = etree.SubElement(al, qn("Username")) + al_user.text = autologon["username"].strip() + logon_count = autologon.get("logon_count", "").strip() + if logon_count: + al_count = etree.SubElement(al, qn("LogonCount")) + al_count.text = logon_count + + fl_cmds = form_data.get("firstlogon_commands", []) + if fl_cmds: + flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands")) + for idx, cmd in enumerate(fl_cmds, start=1): + if not cmd.get("commandline", "").strip(): + continue + sc = etree.SubElement(flc, qn("SynchronousCommand")) + sc.set(qwcm("action"), "add") + order_el = etree.SubElement(sc, qn("Order")) + order_el.text = str(idx) + cl_el = etree.SubElement(sc, qn("CommandLine")) + cl_el.text = cmd["commandline"].strip() + desc_el = etree.SubElement(sc, qn("Description")) + desc_el.text = cmd.get("description", "").strip() + + oobe_tz = form_data.get("oobe_timezone", "").strip() + if oobe_tz: + tz_el = etree.SubElement(oobe_comp, qn("TimeZone")) + tz_el.text = oobe_tz + + xml_bytes = etree.tostring( + root, + pretty_print=True, + xml_declaration=True, + encoding="utf-8", + ) + return xml_bytes.decode("utf-8") + + +def extract_form_data(form): + """Pull structured data from the submitted form.""" + data = {} + + dp_list = form.getlist("driver_path[]") + data["driver_paths"] = [p for p in dp_list if p.strip()] + + data["computer_name"] = form.get("computer_name", "") + data["registered_organization"] = form.get("registered_organization", "") + data["registered_owner"] = form.get("registered_owner", "") + data["time_zone"] = form.get("time_zone", "") + + spec_paths = form.getlist("spec_cmd_path[]") + spec_descs = form.getlist("spec_cmd_desc[]") + data["specialize_commands"] = [] + for i in range(len(spec_paths)): + if spec_paths[i].strip(): + data["specialize_commands"].append({ + "path": spec_paths[i], + "description": spec_descs[i] if i < len(spec_descs) else "", + }) + + data["oobe"] = {} + for key in [ + "HideEULAPage", + "HideOEMRegistrationScreen", + "HideOnlineAccountScreens", + "HideWirelessSetupInOOBE", + "HideLocalAccountScreen", + "SkipUserOOBE", + "SkipMachineOOBE", + ]: + data["oobe"][key] = form.get(f"oobe_{key}", "false") + data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work") + data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3") + + fl_cls = form.getlist("fl_cmd_commandline[]") + fl_descs = form.getlist("fl_cmd_desc[]") + data["firstlogon_commands"] = [] + for i in range(len(fl_cls)): + if fl_cls[i].strip(): + data["firstlogon_commands"].append({ + "commandline": fl_cls[i], + "description": fl_descs[i] if i < len(fl_descs) else "", + }) + + accounts = [] + i = 0 + while form.get(f"account_name_{i}"): + accounts.append({ + "name": form.get(f"account_name_{i}", ""), + "password": form.get(f"account_password_{i}", ""), + "plain_text": form.get(f"account_plaintext_{i}", "true"), + "group": form.get(f"account_group_{i}", "Administrators"), + "display_name": form.get(f"account_display_{i}", ""), + }) + i += 1 + data["user_accounts"] = accounts + + data["autologon"] = { + "enabled": form.get("autologon_enabled", ""), + "username": form.get("autologon_username", ""), + "password": form.get("autologon_password", ""), + "plain_text": form.get("autologon_plaintext", "true"), + "logon_count": form.get("autologon_logoncount", ""), + } + + data["intl"] = { + "input_locale": form.get("intl_input_locale", ""), + "system_locale": form.get("intl_system_locale", ""), + "ui_language": form.get("intl_ui_language", ""), + "user_locale": form.get("intl_user_locale", ""), + } + + data["oobe_timezone"] = form.get("oobe_timezone", "") + + return data diff --git a/webapp/services/wim.py b/webapp/services/wim.py new file mode 100644 index 0000000..9556f42 --- /dev/null +++ b/webapp/services/wim.py @@ -0,0 +1,70 @@ +"""boot.wim manipulation via wimtools (wimextract / wimupdate / wimdir). + +Used by the startnet.cmd editor to extract + update the boot script +that runs when WinPE boots from PXE. +""" + +import os +import shutil +import subprocess +import tempfile + + +def extract_startnet(wim_path): + """Extract startnet.cmd from a WIM file. Returns the contents or None.""" + tmpdir = tempfile.mkdtemp() + try: + result = subprocess.run( + ["wimextract", wim_path, "1", + "/Windows/System32/startnet.cmd", + "--dest-dir", tmpdir], + capture_output=True, text=True, timeout=30, + ) + startnet_path = os.path.join(tmpdir, "startnet.cmd") + if result.returncode == 0 and os.path.isfile(startnet_path): + with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh: + return fh.read() + return None + except Exception: + return None + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +def update_startnet(wim_path, content): + """Update startnet.cmd inside a WIM file via wimupdate. + + Returns (ok, error_message). Writes CRLF line endings. + """ + tmpdir = tempfile.mkdtemp() + try: + startnet_path = os.path.join(tmpdir, "startnet.cmd") + with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh: + fh.write(content) + update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n" + result = subprocess.run( + ["wimupdate", wim_path, "1"], + input=update_cmd, + capture_output=True, text=True, timeout=60, + ) + if result.returncode != 0: + return False, result.stderr.strip() + return True, "" + except Exception as exc: + return False, str(exc) + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +def list_files(wim_path, path="/"): + """List files inside a WIM at the given path.""" + try: + result = subprocess.run( + ["wimdir", wim_path, "1", path], + capture_output=True, text=True, timeout=30, + ) + if result.returncode == 0: + return [l.strip() for l in result.stdout.splitlines() if l.strip()] + return [] + except Exception: + return []