Files
pxe-server/webapp/app.py
cproudlock 725c8f43de Change webapp to port 9009, add test VM script
- Webapp now listens on port 9009 (UFW rule added)
- Apache reverse proxy updated to proxy to 9009
- test-vm.sh creates a KVM test environment with:
  - CIDATA ISO built from project files
  - Isolated libvirt network (10.9.100.0/24)
  - Ubuntu 24.04 VM with autoinstall

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 16:53:23 -05:00

933 lines
33 KiB
Python

#!/usr/bin/env python3
"""Flask web application for managing a GE Aerospace PXE server."""
import logging
import os
import shutil
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
from flask import (
Flask,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
url_for,
)
from lxml import etree
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 * 1024 # 16 GB max upload
# ---------------------------------------------------------------------------
# Audit logging
# ---------------------------------------------------------------------------
AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log")
audit_logger = logging.getLogger("pxe_audit")
audit_logger.setLevel(logging.INFO)
_audit_handler = logging.FileHandler(AUDIT_LOG, mode="a")
_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
audit_logger.addHandler(_audit_handler)
def audit(action, detail=""):
"""Write an entry to the audit log."""
ip = request.remote_addr if request else "system"
audit_logger.info(f"[{ip}] {action}: {detail}")
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla")
BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports")
WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html")
BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim")
IMAGE_TYPES = [
"gea-standard",
"gea-engineer",
"gea-shopfloor",
"ge-standard",
"ge-engineer",
"ge-shopfloor-lockdown",
"ge-shopfloor-mce",
]
FRIENDLY_NAMES = {
"gea-standard": "GE Aerospace Standard",
"gea-engineer": "GE Aerospace Engineer",
"gea-shopfloor": "GE Aerospace Shop Floor",
"ge-standard": "GE Legacy Standard",
"ge-engineer": "GE Legacy Engineer",
"ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown",
"ge-shopfloor-mce": "GE Legacy Shop Floor MCE",
}
NS = "urn:schemas-microsoft-com:unattend"
WCM = "http://schemas.microsoft.com/WMIConfig/2002/State"
NSMAP = {None: NS, "wcm": WCM}
# Convenience qualified-name helpers
def qn(tag):
"""Return a tag qualified with the default unattend namespace."""
return f"{{{NS}}}{tag}"
def qwcm(attr):
"""Return an attribute qualified with the wcm namespace."""
return f"{{{WCM}}}{attr}"
# ---------------------------------------------------------------------------
# Utility helpers
# ---------------------------------------------------------------------------
def deploy_path(image_type):
"""Return the Deploy directory for an image type."""
return os.path.join(SAMBA_SHARE, image_type, "Deploy")
def unattend_path(image_type):
"""Return the unattend.xml path for an image type."""
return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml")
def image_status(image_type):
"""Return a dict describing the state of an image type."""
dp = deploy_path(image_type)
up = unattend_path(image_type)
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
has_unattend = os.path.isfile(up)
return {
"image_type": image_type,
"friendly_name": FRIENDLY_NAMES.get(image_type, image_type),
"deploy_path": dp,
"has_content": has_content,
"has_unattend": has_unattend,
}
def service_status(service_name):
"""Check whether a systemd service is active."""
try:
result = subprocess.run(
["systemctl", "is-active", service_name],
capture_output=True,
text=True,
timeout=5,
)
state = result.stdout.strip()
return {"name": service_name, "active": state == "active", "state": state}
except Exception as exc:
return {"name": service_name, "active": False, "state": str(exc)}
def find_usb_mounts():
"""Return a list of mount-point paths that look like removable media."""
mounts = []
try:
with open("/proc/mounts", "r") as fh:
for line in fh:
parts = line.split()
if len(parts) >= 2:
mount_point = parts[1]
if mount_point.startswith(("/mnt/", "/media/")):
if os.path.isdir(mount_point):
mounts.append(mount_point)
except OSError:
pass
return sorted(set(mounts))
# ---------------------------------------------------------------------------
# XML helpers — parse / build unattend.xml
# ---------------------------------------------------------------------------
UNATTEND_TEMPLATE = """\
<?xml version="1.0" encoding="utf-8"?>
<unattend xmlns="urn:schemas-microsoft-com:unattend"
xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State">
<settings pass="windowsPE" />
<settings pass="offlineServicing" />
<settings pass="specialize" />
<settings pass="oobeSystem" />
</unattend>
"""
def _find_or_create(parent, tag):
"""Find the first child with *tag* or create it."""
el = parent.find(tag, namespaces={"": NS})
if el is None:
el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag)
return el
def _settings_pass(root, pass_name):
"""Return the <settings pass="..."> element, creating if needed."""
for s in root.findall(qn("settings")):
if s.get("pass") == pass_name:
return s
s = etree.SubElement(root, qn("settings"))
s.set("pass", pass_name)
return s
def parse_unattend(xml_path):
"""Parse an unattend.xml and return a dict of editable data."""
data = {
"driver_paths": [],
"computer_name": "",
"registered_organization": "",
"registered_owner": "",
"time_zone": "",
"specialize_commands": [],
"oobe": {
"HideEULAPage": "true",
"HideOEMRegistrationScreen": "true",
"HideOnlineAccountScreens": "true",
"HideWirelessSetupInOOBE": "true",
"HideLocalAccountScreen": "true",
"NetworkLocation": "Work",
"ProtectYourPC": "3",
"SkipUserOOBE": "true",
"SkipMachineOOBE": "true",
},
"firstlogon_commands": [],
"raw_xml": "",
}
if not os.path.isfile(xml_path):
data["raw_xml"] = UNATTEND_TEMPLATE
return data
with open(xml_path, "r", encoding="utf-8") as fh:
raw = fh.read()
data["raw_xml"] = raw
try:
root = etree.fromstring(raw.encode("utf-8"))
except etree.XMLSyntaxError:
return data
ns = {"u": NS}
# --- offlineServicing: DriverPaths ---
for dp_el in root.xpath(
"u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path",
namespaces=ns,
):
if dp_el.text:
data["driver_paths"].append(dp_el.text.strip())
# --- specialize: Shell-Setup ---
for comp in root.xpath(
"u:settings[@pass='specialize']/u:component",
namespaces=ns,
):
comp_name = comp.get("name", "")
if "Shell-Setup" in comp_name:
for tag, key in [
("ComputerName", "computer_name"),
("RegisteredOrganization", "registered_organization"),
("RegisteredOwner", "registered_owner"),
("TimeZone", "time_zone"),
]:
el = comp.find(qn(tag))
if el is not None and el.text:
data[key] = el.text.strip()
# --- specialize: RunSynchronous commands ---
for cmd in root.xpath(
"u:settings[@pass='specialize']//u:RunSynchronousCommand",
namespaces=ns,
):
order_el = cmd.find(qn("Order"))
path_el = cmd.find(qn("Path"))
desc_el = cmd.find(qn("Description"))
data["specialize_commands"].append({
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
"path": path_el.text.strip() if path_el is not None and path_el.text else "",
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
})
# --- oobeSystem ---
for comp in root.xpath(
"u:settings[@pass='oobeSystem']/u:component",
namespaces=ns,
):
comp_name = comp.get("name", "")
if "OOBE" in comp_name or "Shell-Setup" in comp_name:
oobe_el = comp.find(qn("OOBE"))
if oobe_el is not None:
for child in oobe_el:
local = etree.QName(child).localname
if local in data["oobe"] and child.text:
data["oobe"][local] = child.text.strip()
# FirstLogonCommands
flc = comp.find(qn("FirstLogonCommands"))
if flc is not None:
for sync in flc.findall(qn("SynchronousCommand")):
order_el = sync.find(qn("Order"))
cl_el = sync.find(qn("CommandLine"))
desc_el = sync.find(qn("Description"))
data["firstlogon_commands"].append({
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
"commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "",
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
})
return data
def build_unattend_xml(form_data):
"""Build a complete unattend.xml string from form data dict."""
root = etree.Element(qn("unattend"), nsmap=NSMAP)
# --- windowsPE (empty) ---
_settings_pass(root, "windowsPE")
# --- offlineServicing: DriverPaths ---
offline = _settings_pass(root, "offlineServicing")
driver_paths = form_data.get("driver_paths", [])
if driver_paths:
comp = etree.SubElement(offline, qn("component"))
comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE")
comp.set("processorArchitecture", "amd64")
comp.set("publicKeyToken", "31bf3856ad364e35")
comp.set("language", "neutral")
comp.set("versionScope", "nonSxS")
dp_container = etree.SubElement(comp, qn("DriverPaths"))
for idx, dp in enumerate(driver_paths, start=1):
if not dp.strip():
continue
pac = etree.SubElement(dp_container, qn("PathAndCredentials"))
pac.set(qwcm("action"), "add")
pac.set(qwcm("keyValue"), str(idx))
path_el = etree.SubElement(pac, qn("Path"))
path_el.text = dp.strip()
# --- specialize ---
spec = _settings_pass(root, "specialize")
# Shell-Setup component
shell_comp = etree.SubElement(spec, qn("component"))
shell_comp.set("name", "Microsoft-Windows-Shell-Setup")
shell_comp.set("processorArchitecture", "amd64")
shell_comp.set("publicKeyToken", "31bf3856ad364e35")
shell_comp.set("language", "neutral")
shell_comp.set("versionScope", "nonSxS")
for tag, key in [
("ComputerName", "computer_name"),
("RegisteredOrganization", "registered_organization"),
("RegisteredOwner", "registered_owner"),
("TimeZone", "time_zone"),
]:
val = form_data.get(key, "").strip()
if val:
el = etree.SubElement(shell_comp, qn(tag))
el.text = val
# Deployment / RunSynchronous commands
spec_cmds = form_data.get("specialize_commands", [])
if spec_cmds:
deploy_comp = etree.SubElement(spec, qn("component"))
deploy_comp.set("name", "Microsoft-Windows-Deployment")
deploy_comp.set("processorArchitecture", "amd64")
deploy_comp.set("publicKeyToken", "31bf3856ad364e35")
deploy_comp.set("language", "neutral")
deploy_comp.set("versionScope", "nonSxS")
rs = etree.SubElement(deploy_comp, qn("RunSynchronous"))
for idx, cmd in enumerate(spec_cmds, start=1):
if not cmd.get("path", "").strip():
continue
rsc = etree.SubElement(rs, qn("RunSynchronousCommand"))
rsc.set(qwcm("action"), "add")
order_el = etree.SubElement(rsc, qn("Order"))
order_el.text = str(idx)
path_el = etree.SubElement(rsc, qn("Path"))
path_el.text = cmd["path"].strip()
desc_el = etree.SubElement(rsc, qn("Description"))
desc_el.text = cmd.get("description", "").strip()
# --- oobeSystem ---
oobe_settings = _settings_pass(root, "oobeSystem")
oobe_comp = etree.SubElement(oobe_settings, qn("component"))
oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
oobe_comp.set("processorArchitecture", "amd64")
oobe_comp.set("publicKeyToken", "31bf3856ad364e35")
oobe_comp.set("language", "neutral")
oobe_comp.set("versionScope", "nonSxS")
oobe_el = etree.SubElement(oobe_comp, qn("OOBE"))
oobe_data = form_data.get("oobe", {})
for key in [
"HideEULAPage",
"HideOEMRegistrationScreen",
"HideOnlineAccountScreens",
"HideWirelessSetupInOOBE",
"HideLocalAccountScreen",
"NetworkLocation",
"ProtectYourPC",
"SkipUserOOBE",
"SkipMachineOOBE",
]:
val = oobe_data.get(key, "")
if val:
child = etree.SubElement(oobe_el, qn(key))
child.text = str(val)
# FirstLogonCommands
fl_cmds = form_data.get("firstlogon_commands", [])
if fl_cmds:
flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands"))
for idx, cmd in enumerate(fl_cmds, start=1):
if not cmd.get("commandline", "").strip():
continue
sc = etree.SubElement(flc, qn("SynchronousCommand"))
sc.set(qwcm("action"), "add")
order_el = etree.SubElement(sc, qn("Order"))
order_el.text = str(idx)
cl_el = etree.SubElement(sc, qn("CommandLine"))
cl_el.text = cmd["commandline"].strip()
desc_el = etree.SubElement(sc, qn("Description"))
desc_el.text = cmd.get("description", "").strip()
xml_bytes = etree.tostring(
root,
pretty_print=True,
xml_declaration=True,
encoding="utf-8",
)
return xml_bytes.decode("utf-8")
def _extract_form_data(form):
"""Pull structured data from the submitted form."""
data = {}
# Driver paths
dp_list = form.getlist("driver_path[]")
data["driver_paths"] = [p for p in dp_list if p.strip()]
# Machine settings
data["computer_name"] = form.get("computer_name", "")
data["registered_organization"] = form.get("registered_organization", "")
data["registered_owner"] = form.get("registered_owner", "")
data["time_zone"] = form.get("time_zone", "")
# Specialize commands
spec_paths = form.getlist("spec_cmd_path[]")
spec_descs = form.getlist("spec_cmd_desc[]")
data["specialize_commands"] = []
for i in range(len(spec_paths)):
if spec_paths[i].strip():
data["specialize_commands"].append({
"path": spec_paths[i],
"description": spec_descs[i] if i < len(spec_descs) else "",
})
# OOBE settings
data["oobe"] = {}
for key in [
"HideEULAPage",
"HideOEMRegistrationScreen",
"HideOnlineAccountScreens",
"HideWirelessSetupInOOBE",
"HideLocalAccountScreen",
"SkipUserOOBE",
"SkipMachineOOBE",
]:
data["oobe"][key] = form.get(f"oobe_{key}", "false")
data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work")
data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3")
# First logon commands
fl_cls = form.getlist("fl_cmd_commandline[]")
fl_descs = form.getlist("fl_cmd_desc[]")
data["firstlogon_commands"] = []
for i in range(len(fl_cls)):
if fl_cls[i].strip():
data["firstlogon_commands"].append({
"commandline": fl_cls[i],
"description": fl_descs[i] if i < len(fl_descs) else "",
})
return data
# ---------------------------------------------------------------------------
# Routes — pages
# ---------------------------------------------------------------------------
@app.route("/")
def dashboard():
images = [image_status(it) for it in IMAGE_TYPES]
services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
return render_template(
"dashboard.html",
images=images,
services=services,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/images/import", methods=["GET", "POST"])
def images_import():
usb_mounts = find_usb_mounts()
images = [image_status(it) for it in IMAGE_TYPES]
if request.method == "POST":
source = request.form.get("source", "")
target = request.form.get("target", "")
if not source or not target:
flash("Please select both a source and a target image type.", "danger")
return redirect(url_for("images_import"))
if target not in IMAGE_TYPES:
flash("Invalid target image type.", "danger")
return redirect(url_for("images_import"))
# Validate source is under an allowed USB mount path
allowed = find_usb_mounts()
if not any(source == m or source.startswith(m + "/") for m in allowed):
flash("Source path is not on a mounted USB device.", "danger")
return redirect(url_for("images_import"))
if not os.path.isdir(source):
flash(f"Source path does not exist: {source}", "danger")
return redirect(url_for("images_import"))
dest = deploy_path(target)
try:
os.makedirs(dest, exist_ok=True)
# Use rsync-style copy: copy contents of source into dest
for item in os.listdir(source):
src_item = os.path.join(source, item)
dst_item = os.path.join(dest, item)
if os.path.isdir(src_item):
if os.path.exists(dst_item):
shutil.rmtree(dst_item)
shutil.copytree(src_item, dst_item)
else:
shutil.copy2(src_item, dst_item)
audit("IMAGE_IMPORT", f"{source} -> {target}")
flash(
f"Successfully imported content from {source} to {FRIENDLY_NAMES.get(target, target)}.",
"success",
)
except Exception as exc:
flash(f"Import failed: {exc}", "danger")
return redirect(url_for("images_import"))
return render_template(
"import.html",
usb_mounts=usb_mounts,
images=images,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/images/<image_type>/unattend", methods=["GET", "POST"])
def unattend_editor(image_type):
if image_type not in IMAGE_TYPES:
flash("Unknown image type.", "danger")
return redirect(url_for("dashboard"))
xml_file = unattend_path(image_type)
if request.method == "POST":
save_mode = request.form.get("save_mode", "form")
if save_mode == "raw":
raw_xml = request.form.get("raw_xml", "")
# Validate the XML before saving
try:
etree.fromstring(raw_xml.encode("utf-8"))
except etree.XMLSyntaxError as exc:
flash(f"Invalid XML: {exc}", "danger")
data = parse_unattend(xml_file)
data["raw_xml"] = raw_xml
return render_template(
"unattend_editor.html",
image_type=image_type,
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
data=data,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
xml_content = raw_xml
else:
form_data = _extract_form_data(request.form)
xml_content = build_unattend_xml(form_data)
# Write to disk
try:
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
with open(xml_file, "w", encoding="utf-8") as fh:
fh.write(xml_content)
audit("UNATTEND_SAVE", f"{image_type} ({save_mode})")
flash("unattend.xml saved successfully.", "success")
except Exception as exc:
flash(f"Failed to save: {exc}", "danger")
return redirect(url_for("unattend_editor", image_type=image_type))
data = parse_unattend(xml_file)
return render_template(
"unattend_editor.html",
image_type=image_type,
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
data=data,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
# ---------------------------------------------------------------------------
# Routes — Clonezilla Backups
# ---------------------------------------------------------------------------
@app.route("/backups")
def clonezilla_backups():
backups = []
if os.path.isdir(CLONEZILLA_SHARE):
for f in sorted(os.listdir(CLONEZILLA_SHARE)):
fpath = os.path.join(CLONEZILLA_SHARE, f)
if os.path.isfile(fpath) and f.lower().endswith(".zip"):
stat = os.stat(fpath)
backups.append({
"filename": f,
"machine": os.path.splitext(f)[0],
"size": stat.st_size,
"modified": stat.st_mtime,
})
return render_template(
"backups.html",
backups=backups,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/backups/upload", methods=["POST"])
def clonezilla_upload():
if "backup_file" not in request.files:
flash("No file selected.", "danger")
return redirect(url_for("clonezilla_backups"))
f = request.files["backup_file"]
if not f.filename:
flash("No file selected.", "danger")
return redirect(url_for("clonezilla_backups"))
filename = secure_filename(f.filename)
if not filename.lower().endswith(".zip"):
flash("Only .zip files are accepted.", "danger")
return redirect(url_for("clonezilla_backups"))
os.makedirs(CLONEZILLA_SHARE, exist_ok=True)
dest = os.path.join(CLONEZILLA_SHARE, filename)
f.save(dest)
audit("BACKUP_UPLOAD", filename)
flash(f"Uploaded {filename} successfully.", "success")
return redirect(url_for("clonezilla_backups"))
@app.route("/backups/download/<filename>")
def clonezilla_download(filename):
filename = secure_filename(filename)
fpath = os.path.join(CLONEZILLA_SHARE, filename)
if not os.path.isfile(fpath):
flash(f"Backup not found: {filename}", "danger")
return redirect(url_for("clonezilla_backups"))
return send_file(fpath, as_attachment=True)
@app.route("/backups/delete/<filename>", methods=["POST"])
def clonezilla_delete(filename):
filename = secure_filename(filename)
fpath = os.path.join(CLONEZILLA_SHARE, filename)
if os.path.isfile(fpath):
os.remove(fpath)
audit("BACKUP_DELETE", filename)
flash(f"Deleted {filename}.", "success")
else:
flash(f"Backup not found: {filename}", "danger")
return redirect(url_for("clonezilla_backups"))
# ---------------------------------------------------------------------------
# Routes — Blancco Reports
# ---------------------------------------------------------------------------
@app.route("/reports")
def blancco_reports():
reports = []
if os.path.isdir(BLANCCO_REPORTS):
for f in sorted(os.listdir(BLANCCO_REPORTS), reverse=True):
fpath = os.path.join(BLANCCO_REPORTS, f)
if os.path.isfile(fpath):
stat = os.stat(fpath)
ext = os.path.splitext(f)[1].lower()
reports.append({
"filename": f,
"size": stat.st_size,
"modified": stat.st_mtime,
"type": ext.lstrip(".").upper() or "FILE",
})
return render_template(
"reports.html",
reports=reports,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/reports/download/<filename>")
def blancco_download_report(filename):
filename = secure_filename(filename)
fpath = os.path.join(BLANCCO_REPORTS, filename)
if not os.path.isfile(fpath):
flash(f"Report not found: {filename}", "danger")
return redirect(url_for("blancco_reports"))
return send_file(fpath, as_attachment=True)
@app.route("/reports/delete/<filename>", methods=["POST"])
def blancco_delete_report(filename):
filename = secure_filename(filename)
fpath = os.path.join(BLANCCO_REPORTS, filename)
if os.path.isfile(fpath):
os.remove(fpath)
audit("REPORT_DELETE", filename)
flash(f"Deleted {filename}.", "success")
else:
flash(f"Report not found: {filename}", "danger")
return redirect(url_for("blancco_reports"))
# ---------------------------------------------------------------------------
# Routes — startnet.cmd Editor (WIM)
# ---------------------------------------------------------------------------
def _wim_extract_startnet(wim_path):
"""Extract startnet.cmd from a WIM file using wimextract."""
tmpdir = tempfile.mkdtemp()
try:
result = subprocess.run(
["wimextract", wim_path, "1",
"/Windows/System32/startnet.cmd",
"--dest-dir", tmpdir],
capture_output=True, text=True, timeout=30,
)
startnet_path = os.path.join(tmpdir, "startnet.cmd")
if result.returncode == 0 and os.path.isfile(startnet_path):
with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh:
return fh.read()
return None
except Exception:
return None
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def _wim_update_startnet(wim_path, content):
"""Update startnet.cmd inside a WIM file using wimupdate."""
tmpdir = tempfile.mkdtemp()
try:
startnet_path = os.path.join(tmpdir, "startnet.cmd")
with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh:
fh.write(content)
# wimupdate reads commands from stdin
update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n"
result = subprocess.run(
["wimupdate", wim_path, "1"],
input=update_cmd,
capture_output=True, text=True, timeout=60,
)
if result.returncode != 0:
return False, result.stderr.strip()
return True, ""
except Exception as exc:
return False, str(exc)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def _wim_list_files(wim_path, path="/"):
"""List files inside a WIM at the given path."""
try:
result = subprocess.run(
["wimdir", wim_path, "1", path],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
return [l.strip() for l in result.stdout.splitlines() if l.strip()]
return []
except Exception:
return []
@app.route("/startnet")
def startnet_editor():
wim_exists = os.path.isfile(BOOT_WIM)
content = ""
wim_info = {}
if wim_exists:
content = _wim_extract_startnet(BOOT_WIM) or ""
# Get WIM info
try:
result = subprocess.run(
["wiminfo", BOOT_WIM],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
for line in result.stdout.splitlines():
if ":" in line:
key, _, val = line.partition(":")
wim_info[key.strip()] = val.strip()
except Exception:
pass
return render_template(
"startnet_editor.html",
wim_exists=wim_exists,
wim_path=BOOT_WIM,
content=content,
wim_info=wim_info,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/startnet/save", methods=["POST"])
def startnet_save():
if not os.path.isfile(BOOT_WIM):
flash("boot.wim not found.", "danger")
return redirect(url_for("startnet_editor"))
content = request.form.get("content", "")
ok, err = _wim_update_startnet(BOOT_WIM, content)
if ok:
audit("STARTNET_SAVE", "boot.wim updated")
flash("startnet.cmd updated successfully in boot.wim.", "success")
else:
flash(f"Failed to update boot.wim: {err}", "danger")
return redirect(url_for("startnet_editor"))
# ---------------------------------------------------------------------------
# Routes — Audit Log
# ---------------------------------------------------------------------------
@app.route("/audit")
def audit_log():
entries = []
if os.path.isfile(AUDIT_LOG):
with open(AUDIT_LOG, "r") as fh:
for line in fh:
entries.append(line.strip())
entries.reverse() # newest first
return render_template(
"audit.html",
entries=entries,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
# ---------------------------------------------------------------------------
# Routes — API
# ---------------------------------------------------------------------------
@app.route("/api/services")
def api_services():
services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
return jsonify(services)
@app.route("/api/images")
def api_images():
images = [image_status(it) for it in IMAGE_TYPES]
return jsonify(images)
@app.route("/api/images/<image_type>/unattend", methods=["POST"])
def api_save_unattend(image_type):
if image_type not in IMAGE_TYPES:
return jsonify({"error": "Unknown image type"}), 404
xml_file = unattend_path(image_type)
payload = request.get_json(silent=True)
if not payload:
return jsonify({"error": "No JSON body provided"}), 400
if "raw_xml" in payload:
raw_xml = payload["raw_xml"]
try:
etree.fromstring(raw_xml.encode("utf-8"))
except etree.XMLSyntaxError as exc:
return jsonify({"error": f"Invalid XML: {exc}"}), 400
xml_content = raw_xml
else:
try:
xml_content = build_unattend_xml(payload)
except Exception as exc:
return jsonify({"error": f"Failed to build XML: {exc}"}), 400
try:
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
with open(xml_file, "w", encoding="utf-8") as fh:
fh.write(xml_content)
except Exception as exc:
return jsonify({"error": f"Failed to write file: {exc}"}), 500
audit("UNATTEND_SAVE_API", image_type)
return jsonify({"status": "ok", "path": xml_file})
# ---------------------------------------------------------------------------
# Template filters
# ---------------------------------------------------------------------------
@app.template_filter("timestamp_fmt")
def timestamp_fmt(ts):
"""Format a Unix timestamp to a human-readable date string."""
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
# ---------------------------------------------------------------------------
# Context processor — make data available to all templates
# ---------------------------------------------------------------------------
@app.context_processor
def inject_globals():
return {
"all_image_types": IMAGE_TYPES,
"all_friendly_names": FRIENDLY_NAMES,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9009, debug=False)