#!/usr/bin/env python3
"""Flask web application for managing a GE Aerospace PXE server."""
import copy
import os
import shutil
import subprocess
from pathlib import Path
from flask import (
Flask,
flash,
jsonify,
redirect,
render_template,
request,
url_for,
)
from lxml import etree
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
IMAGE_TYPES = [
"geastandardpbr",
"geaengineerpbr",
"geashopfloorpbr",
"gestandardlegacy",
"geengineerlegacy",
"geshopfloorlegacy",
]
FRIENDLY_NAMES = {
"geastandardpbr": "GEA Standard PBR",
"geaengineerpbr": "GEA Engineer PBR",
"geashopfloorpbr": "GEA Shop Floor PBR",
"gestandardlegacy": "GE Standard Legacy",
"geengineerlegacy": "GE Engineer Legacy",
"geshopfloorlegacy": "GE Shop Floor Legacy",
}
NS = "urn:schemas-microsoft-com:unattend"
WCM = "http://schemas.microsoft.com/WMIConfig/2002/State"
NSMAP = {None: NS, "wcm": WCM}
# Convenience qualified-name helpers
def qn(tag):
"""Return a tag qualified with the default unattend namespace."""
return f"{{{NS}}}{tag}"
def qwcm(attr):
"""Return an attribute qualified with the wcm namespace."""
return f"{{{WCM}}}{attr}"
# ---------------------------------------------------------------------------
# Utility helpers
# ---------------------------------------------------------------------------
def deploy_path(image_type):
"""Return the Deploy directory for an image type."""
return os.path.join(SAMBA_SHARE, image_type, "Deploy")
def unattend_path(image_type):
"""Return the unattend.xml path for an image type."""
return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml")
def image_status(image_type):
"""Return a dict describing the state of an image type."""
dp = deploy_path(image_type)
up = unattend_path(image_type)
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
has_unattend = os.path.isfile(up)
return {
"image_type": image_type,
"friendly_name": FRIENDLY_NAMES.get(image_type, image_type),
"deploy_path": dp,
"has_content": has_content,
"has_unattend": has_unattend,
}
def service_status(service_name):
"""Check whether a systemd service is active."""
try:
result = subprocess.run(
["systemctl", "is-active", service_name],
capture_output=True,
text=True,
timeout=5,
)
state = result.stdout.strip()
return {"name": service_name, "active": state == "active", "state": state}
except Exception as exc:
return {"name": service_name, "active": False, "state": str(exc)}
def find_usb_mounts():
"""Return a list of mount-point paths that look like removable media."""
mounts = []
try:
with open("/proc/mounts", "r") as fh:
for line in fh:
parts = line.split()
if len(parts) >= 2:
mount_point = parts[1]
if mount_point.startswith(("/mnt/", "/media/")):
if os.path.isdir(mount_point):
mounts.append(mount_point)
except OSError:
pass
return sorted(set(mounts))
# ---------------------------------------------------------------------------
# XML helpers — parse / build unattend.xml
# ---------------------------------------------------------------------------
UNATTEND_TEMPLATE = """\
"""
def _find_or_create(parent, tag):
"""Find the first child with *tag* or create it."""
el = parent.find(tag, namespaces={"": NS})
if el is None:
el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag)
return el
def _settings_pass(root, pass_name):
"""Return the element, creating if needed."""
for s in root.findall(qn("settings")):
if s.get("pass") == pass_name:
return s
s = etree.SubElement(root, qn("settings"))
s.set("pass", pass_name)
return s
def parse_unattend(xml_path):
"""Parse an unattend.xml and return a dict of editable data."""
data = {
"driver_paths": [],
"computer_name": "",
"registered_organization": "",
"registered_owner": "",
"time_zone": "",
"specialize_commands": [],
"oobe": {
"HideEULAPage": "true",
"HideOEMRegistrationScreen": "true",
"HideOnlineAccountScreens": "true",
"HideWirelessSetupInOOBE": "true",
"HideLocalAccountScreen": "true",
"NetworkLocation": "Work",
"ProtectYourPC": "3",
"SkipUserOOBE": "true",
"SkipMachineOOBE": "true",
},
"firstlogon_commands": [],
"raw_xml": "",
}
if not os.path.isfile(xml_path):
data["raw_xml"] = UNATTEND_TEMPLATE
return data
raw = open(xml_path, "r", encoding="utf-8").read()
data["raw_xml"] = raw
try:
root = etree.fromstring(raw.encode("utf-8"))
except etree.XMLSyntaxError:
return data
ns = {"u": NS}
# --- offlineServicing: DriverPaths ---
for dp_el in root.xpath(
"u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path",
namespaces=ns,
):
if dp_el.text:
data["driver_paths"].append(dp_el.text.strip())
# --- specialize: Shell-Setup ---
for comp in root.xpath(
"u:settings[@pass='specialize']/u:component",
namespaces=ns,
):
comp_name = comp.get("name", "")
if "Shell-Setup" in comp_name:
for tag, key in [
("ComputerName", "computer_name"),
("RegisteredOrganization", "registered_organization"),
("RegisteredOwner", "registered_owner"),
("TimeZone", "time_zone"),
]:
el = comp.find(qn(tag))
if el is not None and el.text:
data[key] = el.text.strip()
# --- specialize: RunSynchronous commands ---
for cmd in root.xpath(
"u:settings[@pass='specialize']//u:RunSynchronousCommand",
namespaces=ns,
):
order_el = cmd.find(qn("Order"))
path_el = cmd.find(qn("Path"))
desc_el = cmd.find(qn("Description"))
data["specialize_commands"].append({
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
"path": path_el.text.strip() if path_el is not None and path_el.text else "",
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
})
# --- oobeSystem ---
for comp in root.xpath(
"u:settings[@pass='oobeSystem']/u:component",
namespaces=ns,
):
comp_name = comp.get("name", "")
if "OOBE" in comp_name or "Shell-Setup" in comp_name:
oobe_el = comp.find(qn("OOBE"))
if oobe_el is not None:
for child in oobe_el:
local = etree.QName(child).localname
if local in data["oobe"] and child.text:
data["oobe"][local] = child.text.strip()
# FirstLogonCommands
flc = comp.find(qn("FirstLogonCommands"))
if flc is not None:
for sync in flc.findall(qn("SynchronousCommand")):
order_el = sync.find(qn("Order"))
cl_el = sync.find(qn("CommandLine"))
desc_el = sync.find(qn("Description"))
data["firstlogon_commands"].append({
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
"commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "",
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
})
return data
def build_unattend_xml(form_data):
"""Build a complete unattend.xml string from form data dict."""
root = etree.Element(qn("unattend"), nsmap=NSMAP)
# --- windowsPE (empty) ---
_settings_pass(root, "windowsPE")
# --- offlineServicing: DriverPaths ---
offline = _settings_pass(root, "offlineServicing")
driver_paths = form_data.get("driver_paths", [])
if driver_paths:
comp = etree.SubElement(offline, qn("component"))
comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE")
comp.set("processorArchitecture", "amd64")
comp.set("publicKeyToken", "31bf3856ad364e35")
comp.set("language", "neutral")
comp.set("versionScope", "nonSxS")
dp_container = etree.SubElement(comp, qn("DriverPaths"))
for idx, dp in enumerate(driver_paths, start=1):
if not dp.strip():
continue
pac = etree.SubElement(dp_container, qn("PathAndCredentials"))
pac.set(qwcm("action"), "add")
pac.set(qwcm("keyValue"), str(idx))
path_el = etree.SubElement(pac, qn("Path"))
path_el.text = dp.strip()
# --- specialize ---
spec = _settings_pass(root, "specialize")
# Shell-Setup component
shell_comp = etree.SubElement(spec, qn("component"))
shell_comp.set("name", "Microsoft-Windows-Shell-Setup")
shell_comp.set("processorArchitecture", "amd64")
shell_comp.set("publicKeyToken", "31bf3856ad364e35")
shell_comp.set("language", "neutral")
shell_comp.set("versionScope", "nonSxS")
for tag, key in [
("ComputerName", "computer_name"),
("RegisteredOrganization", "registered_organization"),
("RegisteredOwner", "registered_owner"),
("TimeZone", "time_zone"),
]:
val = form_data.get(key, "").strip()
if val:
el = etree.SubElement(shell_comp, qn(tag))
el.text = val
# Deployment / RunSynchronous commands
spec_cmds = form_data.get("specialize_commands", [])
if spec_cmds:
deploy_comp = etree.SubElement(spec, qn("component"))
deploy_comp.set("name", "Microsoft-Windows-Deployment")
deploy_comp.set("processorArchitecture", "amd64")
deploy_comp.set("publicKeyToken", "31bf3856ad364e35")
deploy_comp.set("language", "neutral")
deploy_comp.set("versionScope", "nonSxS")
rs = etree.SubElement(deploy_comp, qn("RunSynchronous"))
for idx, cmd in enumerate(spec_cmds, start=1):
if not cmd.get("path", "").strip():
continue
rsc = etree.SubElement(rs, qn("RunSynchronousCommand"))
rsc.set(qwcm("action"), "add")
order_el = etree.SubElement(rsc, qn("Order"))
order_el.text = str(idx)
path_el = etree.SubElement(rsc, qn("Path"))
path_el.text = cmd["path"].strip()
desc_el = etree.SubElement(rsc, qn("Description"))
desc_el.text = cmd.get("description", "").strip()
# --- oobeSystem ---
oobe_settings = _settings_pass(root, "oobeSystem")
oobe_comp = etree.SubElement(oobe_settings, qn("component"))
oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
oobe_comp.set("processorArchitecture", "amd64")
oobe_comp.set("publicKeyToken", "31bf3856ad364e35")
oobe_comp.set("language", "neutral")
oobe_comp.set("versionScope", "nonSxS")
oobe_el = etree.SubElement(oobe_comp, qn("OOBE"))
oobe_data = form_data.get("oobe", {})
for key in [
"HideEULAPage",
"HideOEMRegistrationScreen",
"HideOnlineAccountScreens",
"HideWirelessSetupInOOBE",
"HideLocalAccountScreen",
"NetworkLocation",
"ProtectYourPC",
"SkipUserOOBE",
"SkipMachineOOBE",
]:
val = oobe_data.get(key, "")
if val:
child = etree.SubElement(oobe_el, qn(key))
child.text = str(val)
# FirstLogonCommands
fl_cmds = form_data.get("firstlogon_commands", [])
if fl_cmds:
flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands"))
for idx, cmd in enumerate(fl_cmds, start=1):
if not cmd.get("commandline", "").strip():
continue
sc = etree.SubElement(flc, qn("SynchronousCommand"))
sc.set(qwcm("action"), "add")
order_el = etree.SubElement(sc, qn("Order"))
order_el.text = str(idx)
cl_el = etree.SubElement(sc, qn("CommandLine"))
cl_el.text = cmd["commandline"].strip()
desc_el = etree.SubElement(sc, qn("Description"))
desc_el.text = cmd.get("description", "").strip()
xml_bytes = etree.tostring(
root,
pretty_print=True,
xml_declaration=True,
encoding="utf-8",
)
return xml_bytes.decode("utf-8")
def _extract_form_data(form):
"""Pull structured data from the submitted form."""
data = {}
# Driver paths
dp_list = form.getlist("driver_path[]")
data["driver_paths"] = [p for p in dp_list if p.strip()]
# Machine settings
data["computer_name"] = form.get("computer_name", "")
data["registered_organization"] = form.get("registered_organization", "")
data["registered_owner"] = form.get("registered_owner", "")
data["time_zone"] = form.get("time_zone", "")
# Specialize commands
spec_paths = form.getlist("spec_cmd_path[]")
spec_descs = form.getlist("spec_cmd_desc[]")
data["specialize_commands"] = []
for i in range(len(spec_paths)):
if spec_paths[i].strip():
data["specialize_commands"].append({
"path": spec_paths[i],
"description": spec_descs[i] if i < len(spec_descs) else "",
})
# OOBE settings
data["oobe"] = {}
for key in [
"HideEULAPage",
"HideOEMRegistrationScreen",
"HideOnlineAccountScreens",
"HideWirelessSetupInOOBE",
"HideLocalAccountScreen",
"SkipUserOOBE",
"SkipMachineOOBE",
]:
data["oobe"][key] = form.get(f"oobe_{key}", "false")
data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work")
data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3")
# First logon commands
fl_cls = form.getlist("fl_cmd_commandline[]")
fl_descs = form.getlist("fl_cmd_desc[]")
data["firstlogon_commands"] = []
for i in range(len(fl_cls)):
if fl_cls[i].strip():
data["firstlogon_commands"].append({
"commandline": fl_cls[i],
"description": fl_descs[i] if i < len(fl_descs) else "",
})
return data
# ---------------------------------------------------------------------------
# Routes — pages
# ---------------------------------------------------------------------------
@app.route("/")
def dashboard():
images = [image_status(it) for it in IMAGE_TYPES]
services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
return render_template(
"dashboard.html",
images=images,
services=services,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/images/import", methods=["GET", "POST"])
def images_import():
usb_mounts = find_usb_mounts()
images = [image_status(it) for it in IMAGE_TYPES]
if request.method == "POST":
source = request.form.get("source", "")
target = request.form.get("target", "")
if not source or not target:
flash("Please select both a source and a target image type.", "danger")
return redirect(url_for("images_import"))
if target not in IMAGE_TYPES:
flash("Invalid target image type.", "danger")
return redirect(url_for("images_import"))
if not os.path.isdir(source):
flash(f"Source path does not exist: {source}", "danger")
return redirect(url_for("images_import"))
dest = deploy_path(target)
try:
os.makedirs(dest, exist_ok=True)
# Use rsync-style copy: copy contents of source into dest
for item in os.listdir(source):
src_item = os.path.join(source, item)
dst_item = os.path.join(dest, item)
if os.path.isdir(src_item):
if os.path.exists(dst_item):
shutil.rmtree(dst_item)
shutil.copytree(src_item, dst_item)
else:
shutil.copy2(src_item, dst_item)
flash(
f"Successfully imported content from {source} to {FRIENDLY_NAMES.get(target, target)}.",
"success",
)
except Exception as exc:
flash(f"Import failed: {exc}", "danger")
return redirect(url_for("images_import"))
return render_template(
"import.html",
usb_mounts=usb_mounts,
images=images,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/images//unattend", methods=["GET", "POST"])
def unattend_editor(image_type):
if image_type not in IMAGE_TYPES:
flash("Unknown image type.", "danger")
return redirect(url_for("dashboard"))
xml_file = unattend_path(image_type)
if request.method == "POST":
save_mode = request.form.get("save_mode", "form")
if save_mode == "raw":
raw_xml = request.form.get("raw_xml", "")
# Validate the XML before saving
try:
etree.fromstring(raw_xml.encode("utf-8"))
except etree.XMLSyntaxError as exc:
flash(f"Invalid XML: {exc}", "danger")
data = parse_unattend(xml_file)
data["raw_xml"] = raw_xml
return render_template(
"unattend_editor.html",
image_type=image_type,
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
data=data,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
xml_content = raw_xml
else:
form_data = _extract_form_data(request.form)
xml_content = build_unattend_xml(form_data)
# Write to disk
try:
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
with open(xml_file, "w", encoding="utf-8") as fh:
fh.write(xml_content)
flash("unattend.xml saved successfully.", "success")
except Exception as exc:
flash(f"Failed to save: {exc}", "danger")
return redirect(url_for("unattend_editor", image_type=image_type))
data = parse_unattend(xml_file)
return render_template(
"unattend_editor.html",
image_type=image_type,
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
data=data,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
# ---------------------------------------------------------------------------
# Routes — API
# ---------------------------------------------------------------------------
@app.route("/api/services")
def api_services():
services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
return jsonify(services)
@app.route("/api/images")
def api_images():
images = [image_status(it) for it in IMAGE_TYPES]
return jsonify(images)
@app.route("/api/images//unattend", methods=["POST"])
def api_save_unattend(image_type):
if image_type not in IMAGE_TYPES:
return jsonify({"error": "Unknown image type"}), 404
xml_file = unattend_path(image_type)
payload = request.get_json(silent=True)
if not payload:
return jsonify({"error": "No JSON body provided"}), 400
if "raw_xml" in payload:
raw_xml = payload["raw_xml"]
try:
etree.fromstring(raw_xml.encode("utf-8"))
except etree.XMLSyntaxError as exc:
return jsonify({"error": f"Invalid XML: {exc}"}), 400
xml_content = raw_xml
else:
try:
xml_content = build_unattend_xml(payload)
except Exception as exc:
return jsonify({"error": f"Failed to build XML: {exc}"}), 400
try:
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
with open(xml_file, "w", encoding="utf-8") as fh:
fh.write(xml_content)
except Exception as exc:
return jsonify({"error": f"Failed to write file: {exc}"}), 500
return jsonify({"status": "ok", "path": xml_file})
# ---------------------------------------------------------------------------
# Context processor — make data available to all templates
# ---------------------------------------------------------------------------
@app.context_processor
def inject_globals():
return {
"all_image_types": IMAGE_TYPES,
"all_friendly_names": FRIENDLY_NAMES,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)