Add web management UI, offline packages, WinPE consolidation, and docs
- webapp/: Flask web management app with:
- Dashboard showing image types and service status
- USB import page for WinPE deployment content
- Unattend.xml visual editor (driver paths, specialize commands,
OOBE settings, first logon commands, raw XML view)
- API endpoints for services and image management
- SETUP.md: Complete setup documentation for streamlined process
- build-usb.sh: Now copies webapp and optional WinPE images to USB
- playbook: Added webapp deployment (systemd service, Apache reverse
proxy), offline package verification, WinPE auto-import from USB
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
631
webapp/app.py
Normal file
631
webapp/app.py
Normal file
@@ -0,0 +1,631 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Flask web application for managing a GE Aerospace PXE server."""
|
||||
|
||||
import copy
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from flask import (
|
||||
Flask,
|
||||
flash,
|
||||
jsonify,
|
||||
redirect,
|
||||
render_template,
|
||||
request,
|
||||
url_for,
|
||||
)
|
||||
from lxml import etree
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
|
||||
|
||||
IMAGE_TYPES = [
|
||||
"geastandardpbr",
|
||||
"geaengineerpbr",
|
||||
"geashopfloorpbr",
|
||||
"gestandardlegacy",
|
||||
"geengineerlegacy",
|
||||
"geshopfloorlegacy",
|
||||
]
|
||||
|
||||
FRIENDLY_NAMES = {
|
||||
"geastandardpbr": "GEA Standard PBR",
|
||||
"geaengineerpbr": "GEA Engineer PBR",
|
||||
"geashopfloorpbr": "GEA Shop Floor PBR",
|
||||
"gestandardlegacy": "GE Standard Legacy",
|
||||
"geengineerlegacy": "GE Engineer Legacy",
|
||||
"geshopfloorlegacy": "GE Shop Floor Legacy",
|
||||
}
|
||||
|
||||
NS = "urn:schemas-microsoft-com:unattend"
|
||||
WCM = "http://schemas.microsoft.com/WMIConfig/2002/State"
|
||||
NSMAP = {None: NS, "wcm": WCM}
|
||||
|
||||
# Convenience qualified-name helpers
|
||||
def qn(tag):
|
||||
"""Return a tag qualified with the default unattend namespace."""
|
||||
return f"{{{NS}}}{tag}"
|
||||
|
||||
def qwcm(attr):
|
||||
"""Return an attribute qualified with the wcm namespace."""
|
||||
return f"{{{WCM}}}{attr}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Utility helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def deploy_path(image_type):
|
||||
"""Return the Deploy directory for an image type."""
|
||||
return os.path.join(SAMBA_SHARE, image_type, "Deploy")
|
||||
|
||||
|
||||
def unattend_path(image_type):
|
||||
"""Return the unattend.xml path for an image type."""
|
||||
return os.path.join(deploy_path(image_type), "Control", "unattend.xml")
|
||||
|
||||
|
||||
def image_status(image_type):
|
||||
"""Return a dict describing the state of an image type."""
|
||||
dp = deploy_path(image_type)
|
||||
up = unattend_path(image_type)
|
||||
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
|
||||
has_unattend = os.path.isfile(up)
|
||||
return {
|
||||
"image_type": image_type,
|
||||
"friendly_name": FRIENDLY_NAMES.get(image_type, image_type),
|
||||
"deploy_path": dp,
|
||||
"has_content": has_content,
|
||||
"has_unattend": has_unattend,
|
||||
}
|
||||
|
||||
|
||||
def service_status(service_name):
|
||||
"""Check whether a systemd service is active."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["systemctl", "is-active", service_name],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
state = result.stdout.strip()
|
||||
return {"name": service_name, "active": state == "active", "state": state}
|
||||
except Exception as exc:
|
||||
return {"name": service_name, "active": False, "state": str(exc)}
|
||||
|
||||
|
||||
def find_usb_mounts():
|
||||
"""Return a list of mount-point paths that look like removable media."""
|
||||
mounts = []
|
||||
try:
|
||||
with open("/proc/mounts", "r") as fh:
|
||||
for line in fh:
|
||||
parts = line.split()
|
||||
if len(parts) >= 2:
|
||||
mount_point = parts[1]
|
||||
if mount_point.startswith(("/mnt/", "/media/")):
|
||||
if os.path.isdir(mount_point):
|
||||
mounts.append(mount_point)
|
||||
except OSError:
|
||||
pass
|
||||
return sorted(set(mounts))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# XML helpers — parse / build unattend.xml
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
UNATTEND_TEMPLATE = """\
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<unattend xmlns="urn:schemas-microsoft-com:unattend"
|
||||
xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State">
|
||||
<settings pass="windowsPE" />
|
||||
<settings pass="offlineServicing" />
|
||||
<settings pass="specialize" />
|
||||
<settings pass="oobeSystem" />
|
||||
</unattend>
|
||||
"""
|
||||
|
||||
|
||||
def _find_or_create(parent, tag):
|
||||
"""Find the first child with *tag* or create it."""
|
||||
el = parent.find(tag, namespaces={"": NS})
|
||||
if el is None:
|
||||
el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag)
|
||||
return el
|
||||
|
||||
|
||||
def _settings_pass(root, pass_name):
|
||||
"""Return the <settings pass="..."> element, creating if needed."""
|
||||
for s in root.findall(qn("settings")):
|
||||
if s.get("pass") == pass_name:
|
||||
return s
|
||||
s = etree.SubElement(root, qn("settings"))
|
||||
s.set("pass", pass_name)
|
||||
return s
|
||||
|
||||
|
||||
def parse_unattend(xml_path):
|
||||
"""Parse an unattend.xml and return a dict of editable data."""
|
||||
data = {
|
||||
"driver_paths": [],
|
||||
"computer_name": "",
|
||||
"registered_organization": "",
|
||||
"registered_owner": "",
|
||||
"time_zone": "",
|
||||
"specialize_commands": [],
|
||||
"oobe": {
|
||||
"HideEULAPage": "true",
|
||||
"HideOEMRegistrationScreen": "true",
|
||||
"HideOnlineAccountScreens": "true",
|
||||
"HideWirelessSetupInOOBE": "true",
|
||||
"HideLocalAccountScreen": "true",
|
||||
"NetworkLocation": "Work",
|
||||
"ProtectYourPC": "3",
|
||||
"SkipUserOOBE": "true",
|
||||
"SkipMachineOOBE": "true",
|
||||
},
|
||||
"firstlogon_commands": [],
|
||||
"raw_xml": "",
|
||||
}
|
||||
|
||||
if not os.path.isfile(xml_path):
|
||||
data["raw_xml"] = UNATTEND_TEMPLATE
|
||||
return data
|
||||
|
||||
raw = open(xml_path, "r", encoding="utf-8").read()
|
||||
data["raw_xml"] = raw
|
||||
|
||||
try:
|
||||
root = etree.fromstring(raw.encode("utf-8"))
|
||||
except etree.XMLSyntaxError:
|
||||
return data
|
||||
|
||||
ns = {"u": NS}
|
||||
|
||||
# --- offlineServicing: DriverPaths ---
|
||||
for dp_el in root.xpath(
|
||||
"u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path",
|
||||
namespaces=ns,
|
||||
):
|
||||
if dp_el.text:
|
||||
data["driver_paths"].append(dp_el.text.strip())
|
||||
|
||||
# --- specialize: Shell-Setup ---
|
||||
for comp in root.xpath(
|
||||
"u:settings[@pass='specialize']/u:component",
|
||||
namespaces=ns,
|
||||
):
|
||||
comp_name = comp.get("name", "")
|
||||
if "Shell-Setup" in comp_name:
|
||||
for tag, key in [
|
||||
("ComputerName", "computer_name"),
|
||||
("RegisteredOrganization", "registered_organization"),
|
||||
("RegisteredOwner", "registered_owner"),
|
||||
("TimeZone", "time_zone"),
|
||||
]:
|
||||
el = comp.find(qn(tag))
|
||||
if el is not None and el.text:
|
||||
data[key] = el.text.strip()
|
||||
|
||||
# --- specialize: RunSynchronous commands ---
|
||||
for cmd in root.xpath(
|
||||
"u:settings[@pass='specialize']//u:RunSynchronousCommand",
|
||||
namespaces=ns,
|
||||
):
|
||||
order_el = cmd.find(qn("Order"))
|
||||
path_el = cmd.find(qn("Path"))
|
||||
desc_el = cmd.find(qn("Description"))
|
||||
data["specialize_commands"].append({
|
||||
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
|
||||
"path": path_el.text.strip() if path_el is not None and path_el.text else "",
|
||||
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
|
||||
})
|
||||
|
||||
# --- oobeSystem ---
|
||||
for comp in root.xpath(
|
||||
"u:settings[@pass='oobeSystem']/u:component",
|
||||
namespaces=ns,
|
||||
):
|
||||
comp_name = comp.get("name", "")
|
||||
if "OOBE" in comp_name or "Shell-Setup" in comp_name:
|
||||
oobe_el = comp.find(qn("OOBE"))
|
||||
if oobe_el is not None:
|
||||
for child in oobe_el:
|
||||
local = etree.QName(child).localname
|
||||
if local in data["oobe"] and child.text:
|
||||
data["oobe"][local] = child.text.strip()
|
||||
|
||||
# FirstLogonCommands
|
||||
flc = comp.find(qn("FirstLogonCommands"))
|
||||
if flc is not None:
|
||||
for sync in flc.findall(qn("SynchronousCommand")):
|
||||
order_el = sync.find(qn("Order"))
|
||||
cl_el = sync.find(qn("CommandLine"))
|
||||
desc_el = sync.find(qn("Description"))
|
||||
data["firstlogon_commands"].append({
|
||||
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
|
||||
"commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "",
|
||||
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
|
||||
})
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def build_unattend_xml(form_data):
|
||||
"""Build a complete unattend.xml string from form data dict."""
|
||||
root = etree.Element(qn("unattend"), nsmap=NSMAP)
|
||||
|
||||
# --- windowsPE (empty) ---
|
||||
_settings_pass(root, "windowsPE")
|
||||
|
||||
# --- offlineServicing: DriverPaths ---
|
||||
offline = _settings_pass(root, "offlineServicing")
|
||||
driver_paths = form_data.get("driver_paths", [])
|
||||
if driver_paths:
|
||||
comp = etree.SubElement(offline, qn("component"))
|
||||
comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE")
|
||||
comp.set("processorArchitecture", "amd64")
|
||||
comp.set("publicKeyToken", "31bf3856ad364e35")
|
||||
comp.set("language", "neutral")
|
||||
comp.set("versionScope", "nonSxS")
|
||||
dp_container = etree.SubElement(comp, qn("DriverPaths"))
|
||||
for idx, dp in enumerate(driver_paths, start=1):
|
||||
if not dp.strip():
|
||||
continue
|
||||
pac = etree.SubElement(dp_container, qn("PathAndCredentials"))
|
||||
pac.set(qwcm("action"), "add")
|
||||
pac.set(qwcm("keyValue"), str(idx))
|
||||
path_el = etree.SubElement(pac, qn("Path"))
|
||||
path_el.text = dp.strip()
|
||||
|
||||
# --- specialize ---
|
||||
spec = _settings_pass(root, "specialize")
|
||||
|
||||
# Shell-Setup component
|
||||
shell_comp = etree.SubElement(spec, qn("component"))
|
||||
shell_comp.set("name", "Microsoft-Windows-Shell-Setup")
|
||||
shell_comp.set("processorArchitecture", "amd64")
|
||||
shell_comp.set("publicKeyToken", "31bf3856ad364e35")
|
||||
shell_comp.set("language", "neutral")
|
||||
shell_comp.set("versionScope", "nonSxS")
|
||||
|
||||
for tag, key in [
|
||||
("ComputerName", "computer_name"),
|
||||
("RegisteredOrganization", "registered_organization"),
|
||||
("RegisteredOwner", "registered_owner"),
|
||||
("TimeZone", "time_zone"),
|
||||
]:
|
||||
val = form_data.get(key, "").strip()
|
||||
if val:
|
||||
el = etree.SubElement(shell_comp, qn(tag))
|
||||
el.text = val
|
||||
|
||||
# Deployment / RunSynchronous commands
|
||||
spec_cmds = form_data.get("specialize_commands", [])
|
||||
if spec_cmds:
|
||||
deploy_comp = etree.SubElement(spec, qn("component"))
|
||||
deploy_comp.set("name", "Microsoft-Windows-Deployment")
|
||||
deploy_comp.set("processorArchitecture", "amd64")
|
||||
deploy_comp.set("publicKeyToken", "31bf3856ad364e35")
|
||||
deploy_comp.set("language", "neutral")
|
||||
deploy_comp.set("versionScope", "nonSxS")
|
||||
rs = etree.SubElement(deploy_comp, qn("RunSynchronous"))
|
||||
for idx, cmd in enumerate(spec_cmds, start=1):
|
||||
if not cmd.get("path", "").strip():
|
||||
continue
|
||||
rsc = etree.SubElement(rs, qn("RunSynchronousCommand"))
|
||||
rsc.set(qwcm("action"), "add")
|
||||
order_el = etree.SubElement(rsc, qn("Order"))
|
||||
order_el.text = str(idx)
|
||||
path_el = etree.SubElement(rsc, qn("Path"))
|
||||
path_el.text = cmd["path"].strip()
|
||||
desc_el = etree.SubElement(rsc, qn("Description"))
|
||||
desc_el.text = cmd.get("description", "").strip()
|
||||
|
||||
# --- oobeSystem ---
|
||||
oobe_settings = _settings_pass(root, "oobeSystem")
|
||||
oobe_comp = etree.SubElement(oobe_settings, qn("component"))
|
||||
oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
|
||||
oobe_comp.set("processorArchitecture", "amd64")
|
||||
oobe_comp.set("publicKeyToken", "31bf3856ad364e35")
|
||||
oobe_comp.set("language", "neutral")
|
||||
oobe_comp.set("versionScope", "nonSxS")
|
||||
|
||||
oobe_el = etree.SubElement(oobe_comp, qn("OOBE"))
|
||||
oobe_data = form_data.get("oobe", {})
|
||||
for key in [
|
||||
"HideEULAPage",
|
||||
"HideOEMRegistrationScreen",
|
||||
"HideOnlineAccountScreens",
|
||||
"HideWirelessSetupInOOBE",
|
||||
"HideLocalAccountScreen",
|
||||
"NetworkLocation",
|
||||
"ProtectYourPC",
|
||||
"SkipUserOOBE",
|
||||
"SkipMachineOOBE",
|
||||
]:
|
||||
val = oobe_data.get(key, "")
|
||||
if val:
|
||||
child = etree.SubElement(oobe_el, qn(key))
|
||||
child.text = str(val)
|
||||
|
||||
# FirstLogonCommands
|
||||
fl_cmds = form_data.get("firstlogon_commands", [])
|
||||
if fl_cmds:
|
||||
flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands"))
|
||||
for idx, cmd in enumerate(fl_cmds, start=1):
|
||||
if not cmd.get("commandline", "").strip():
|
||||
continue
|
||||
sc = etree.SubElement(flc, qn("SynchronousCommand"))
|
||||
sc.set(qwcm("action"), "add")
|
||||
order_el = etree.SubElement(sc, qn("Order"))
|
||||
order_el.text = str(idx)
|
||||
cl_el = etree.SubElement(sc, qn("CommandLine"))
|
||||
cl_el.text = cmd["commandline"].strip()
|
||||
desc_el = etree.SubElement(sc, qn("Description"))
|
||||
desc_el.text = cmd.get("description", "").strip()
|
||||
|
||||
xml_bytes = etree.tostring(
|
||||
root,
|
||||
pretty_print=True,
|
||||
xml_declaration=True,
|
||||
encoding="utf-8",
|
||||
)
|
||||
return xml_bytes.decode("utf-8")
|
||||
|
||||
|
||||
def _extract_form_data(form):
|
||||
"""Pull structured data from the submitted form."""
|
||||
data = {}
|
||||
|
||||
# Driver paths
|
||||
dp_list = form.getlist("driver_path[]")
|
||||
data["driver_paths"] = [p for p in dp_list if p.strip()]
|
||||
|
||||
# Machine settings
|
||||
data["computer_name"] = form.get("computer_name", "")
|
||||
data["registered_organization"] = form.get("registered_organization", "")
|
||||
data["registered_owner"] = form.get("registered_owner", "")
|
||||
data["time_zone"] = form.get("time_zone", "")
|
||||
|
||||
# Specialize commands
|
||||
spec_paths = form.getlist("spec_cmd_path[]")
|
||||
spec_descs = form.getlist("spec_cmd_desc[]")
|
||||
data["specialize_commands"] = []
|
||||
for i in range(len(spec_paths)):
|
||||
if spec_paths[i].strip():
|
||||
data["specialize_commands"].append({
|
||||
"path": spec_paths[i],
|
||||
"description": spec_descs[i] if i < len(spec_descs) else "",
|
||||
})
|
||||
|
||||
# OOBE settings
|
||||
data["oobe"] = {}
|
||||
for key in [
|
||||
"HideEULAPage",
|
||||
"HideOEMRegistrationScreen",
|
||||
"HideOnlineAccountScreens",
|
||||
"HideWirelessSetupInOOBE",
|
||||
"HideLocalAccountScreen",
|
||||
"SkipUserOOBE",
|
||||
"SkipMachineOOBE",
|
||||
]:
|
||||
data["oobe"][key] = form.get(f"oobe_{key}", "false")
|
||||
data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work")
|
||||
data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3")
|
||||
|
||||
# First logon commands
|
||||
fl_cls = form.getlist("fl_cmd_commandline[]")
|
||||
fl_descs = form.getlist("fl_cmd_desc[]")
|
||||
data["firstlogon_commands"] = []
|
||||
for i in range(len(fl_cls)):
|
||||
if fl_cls[i].strip():
|
||||
data["firstlogon_commands"].append({
|
||||
"commandline": fl_cls[i],
|
||||
"description": fl_descs[i] if i < len(fl_descs) else "",
|
||||
})
|
||||
|
||||
return data
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routes — pages
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@app.route("/")
|
||||
def dashboard():
|
||||
images = [image_status(it) for it in IMAGE_TYPES]
|
||||
services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
|
||||
return render_template(
|
||||
"dashboard.html",
|
||||
images=images,
|
||||
services=services,
|
||||
image_types=IMAGE_TYPES,
|
||||
friendly_names=FRIENDLY_NAMES,
|
||||
)
|
||||
|
||||
|
||||
@app.route("/images/import", methods=["GET", "POST"])
|
||||
def images_import():
|
||||
usb_mounts = find_usb_mounts()
|
||||
images = [image_status(it) for it in IMAGE_TYPES]
|
||||
|
||||
if request.method == "POST":
|
||||
source = request.form.get("source", "")
|
||||
target = request.form.get("target", "")
|
||||
|
||||
if not source or not target:
|
||||
flash("Please select both a source and a target image type.", "danger")
|
||||
return redirect(url_for("images_import"))
|
||||
|
||||
if target not in IMAGE_TYPES:
|
||||
flash("Invalid target image type.", "danger")
|
||||
return redirect(url_for("images_import"))
|
||||
|
||||
if not os.path.isdir(source):
|
||||
flash(f"Source path does not exist: {source}", "danger")
|
||||
return redirect(url_for("images_import"))
|
||||
|
||||
dest = deploy_path(target)
|
||||
try:
|
||||
os.makedirs(dest, exist_ok=True)
|
||||
# Use rsync-style copy: copy contents of source into dest
|
||||
for item in os.listdir(source):
|
||||
src_item = os.path.join(source, item)
|
||||
dst_item = os.path.join(dest, item)
|
||||
if os.path.isdir(src_item):
|
||||
if os.path.exists(dst_item):
|
||||
shutil.rmtree(dst_item)
|
||||
shutil.copytree(src_item, dst_item)
|
||||
else:
|
||||
shutil.copy2(src_item, dst_item)
|
||||
flash(
|
||||
f"Successfully imported content from {source} to {FRIENDLY_NAMES.get(target, target)}.",
|
||||
"success",
|
||||
)
|
||||
except Exception as exc:
|
||||
flash(f"Import failed: {exc}", "danger")
|
||||
|
||||
return redirect(url_for("images_import"))
|
||||
|
||||
return render_template(
|
||||
"import.html",
|
||||
usb_mounts=usb_mounts,
|
||||
images=images,
|
||||
image_types=IMAGE_TYPES,
|
||||
friendly_names=FRIENDLY_NAMES,
|
||||
)
|
||||
|
||||
|
||||
@app.route("/images/<image_type>/unattend", methods=["GET", "POST"])
|
||||
def unattend_editor(image_type):
|
||||
if image_type not in IMAGE_TYPES:
|
||||
flash("Unknown image type.", "danger")
|
||||
return redirect(url_for("dashboard"))
|
||||
|
||||
xml_file = unattend_path(image_type)
|
||||
|
||||
if request.method == "POST":
|
||||
save_mode = request.form.get("save_mode", "form")
|
||||
|
||||
if save_mode == "raw":
|
||||
raw_xml = request.form.get("raw_xml", "")
|
||||
# Validate the XML before saving
|
||||
try:
|
||||
etree.fromstring(raw_xml.encode("utf-8"))
|
||||
except etree.XMLSyntaxError as exc:
|
||||
flash(f"Invalid XML: {exc}", "danger")
|
||||
data = parse_unattend(xml_file)
|
||||
data["raw_xml"] = raw_xml
|
||||
return render_template(
|
||||
"unattend_editor.html",
|
||||
image_type=image_type,
|
||||
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
|
||||
data=data,
|
||||
image_types=IMAGE_TYPES,
|
||||
friendly_names=FRIENDLY_NAMES,
|
||||
)
|
||||
xml_content = raw_xml
|
||||
else:
|
||||
form_data = _extract_form_data(request.form)
|
||||
xml_content = build_unattend_xml(form_data)
|
||||
|
||||
# Write to disk
|
||||
try:
|
||||
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
|
||||
with open(xml_file, "w", encoding="utf-8") as fh:
|
||||
fh.write(xml_content)
|
||||
flash("unattend.xml saved successfully.", "success")
|
||||
except Exception as exc:
|
||||
flash(f"Failed to save: {exc}", "danger")
|
||||
|
||||
return redirect(url_for("unattend_editor", image_type=image_type))
|
||||
|
||||
data = parse_unattend(xml_file)
|
||||
return render_template(
|
||||
"unattend_editor.html",
|
||||
image_type=image_type,
|
||||
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
|
||||
data=data,
|
||||
image_types=IMAGE_TYPES,
|
||||
friendly_names=FRIENDLY_NAMES,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routes — API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@app.route("/api/services")
|
||||
def api_services():
|
||||
services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
|
||||
return jsonify(services)
|
||||
|
||||
|
||||
@app.route("/api/images")
|
||||
def api_images():
|
||||
images = [image_status(it) for it in IMAGE_TYPES]
|
||||
return jsonify(images)
|
||||
|
||||
|
||||
@app.route("/api/images/<image_type>/unattend", methods=["POST"])
|
||||
def api_save_unattend(image_type):
|
||||
if image_type not in IMAGE_TYPES:
|
||||
return jsonify({"error": "Unknown image type"}), 404
|
||||
|
||||
xml_file = unattend_path(image_type)
|
||||
payload = request.get_json(silent=True)
|
||||
|
||||
if not payload:
|
||||
return jsonify({"error": "No JSON body provided"}), 400
|
||||
|
||||
if "raw_xml" in payload:
|
||||
raw_xml = payload["raw_xml"]
|
||||
try:
|
||||
etree.fromstring(raw_xml.encode("utf-8"))
|
||||
except etree.XMLSyntaxError as exc:
|
||||
return jsonify({"error": f"Invalid XML: {exc}"}), 400
|
||||
xml_content = raw_xml
|
||||
else:
|
||||
try:
|
||||
xml_content = build_unattend_xml(payload)
|
||||
except Exception as exc:
|
||||
return jsonify({"error": f"Failed to build XML: {exc}"}), 400
|
||||
|
||||
try:
|
||||
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
|
||||
with open(xml_file, "w", encoding="utf-8") as fh:
|
||||
fh.write(xml_content)
|
||||
except Exception as exc:
|
||||
return jsonify({"error": f"Failed to write file: {exc}"}), 500
|
||||
|
||||
return jsonify({"status": "ok", "path": xml_file})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Context processor — make data available to all templates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@app.context_processor
|
||||
def inject_globals():
|
||||
return {
|
||||
"all_image_types": IMAGE_TYPES,
|
||||
"all_friendly_names": FRIENDLY_NAMES,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=5000, debug=True)
|
||||
Reference in New Issue
Block a user