webapp: extract service layer (config.py + services/) from app.py

Phase 1a of a multi-session refactor toward a clean blueprint
structure. Pulls the helper code that lived alongside the routes in
the 1621-line app.py into focused modules. app.py is now 625 lines
of mostly routes plus a small Flask wiring header. Behaviour is
unchanged: smoke-tested against the 8 main GET routes (200 OK).

New modules:

- config.py            env vars + IMAGE_TYPES + FRIENDLY_NAMES +
                       SHARED_DEPLOY_* taxonomy + unattend XML
                       namespaces.
- services/audit.py    audit log file handler + audit() helper.
- services/csrf.py     session CSRF token + before_request validator
                       wired via init_csrf(app).
- services/fs.py       image_root / deploy_path / unattend_path /
                       control_path / tools_path + load_json /
                       save_json + resolve_destination.
- services/system.py   service_status / find_usb_mounts /
                       find_upload_sources.
- services/images.py   image_status + load_image_config.
- services/deploy.py   import_deploy + _merge_tree +
                       _replace_with_symlink + allowed_import_source.
- services/unattend.py parse_unattend / build_unattend_xml /
                       extract_form_data and the qn / qwcm / settings
                       pass helpers.
- services/wim.py      extract_startnet / update_startnet / list_files
                       wrapping wimextract / wimupdate / wimdir.

Endpoint names kept stable (dashboard, clonezilla_backups, etc.) so
existing url_for(...) calls in templates are unchanged. Phase 1b
(Flask blueprints with ".endpoint" naming) deferred to a future
session because it requires updating ~30 url_for sites in templates
and is mostly cosmetic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
cproudlock
2026-05-08 18:25:32 -04:00
parent 4d6438285b
commit c16a4f23b4
11 changed files with 1660 additions and 1621 deletions

File diff suppressed because it is too large Load Diff

66
webapp/config.py Normal file
View File

@@ -0,0 +1,66 @@
"""Configuration constants for the PXE webapp.
Reads from environment variables with sensible defaults that match the
Ansible playbook's deploy paths. Also defines the canonical image type
list and shared-directory taxonomy used by the import pipeline.
"""
import os
# --- Filesystem paths --------------------------------------------------------
SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla")
BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports")
ENROLLMENT_SHARE = os.environ.get("ENROLLMENT_SHARE", "/srv/samba/enrollment")
UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload")
SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared")
WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html")
BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim")
AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log")
# --- Flask -------------------------------------------------------------------
FLASK_SECRET_KEY = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 * 1024 # 16 GB max upload
# --- Image taxonomy ----------------------------------------------------------
# Subdirs inside Deploy/ shared across ALL image types.
SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"]
# Subdirs inside Deploy/ shared within the same image family (by prefix).
SHARED_DEPLOY_SCOPED = {
"gea-": ["Operating Systems", "Packages"],
"ge-": ["Operating Systems", "Packages"],
}
# Sibling dirs at image root shared within the same image family.
SHARED_ROOT_DIRS = {
"gea-": ["Sources"],
"ge-": ["Sources"],
}
IMAGE_TYPES = [
"gea-standard",
"gea-engineer",
"gea-shopfloor",
"gea-shopfloor-mce",
"ge-standard",
"ge-engineer",
"ge-shopfloor-lockdown",
"ge-shopfloor-mce",
]
FRIENDLY_NAMES = {
"gea-standard": "GE Aerospace Standard",
"gea-engineer": "GE Aerospace Engineer",
"gea-shopfloor": "GE Aerospace Shop Floor",
"gea-shopfloor-mce": "GE Aerospace Shop Floor MCE",
"ge-standard": "GE Legacy Standard",
"ge-engineer": "GE Legacy Engineer",
"ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown",
"ge-shopfloor-mce": "GE Legacy Shop Floor MCE",
}
# --- Unattend XML namespaces -------------------------------------------------
UNATTEND_NS = "urn:schemas-microsoft-com:unattend"
WCM_NS = "http://schemas.microsoft.com/WMIConfig/2002/State"
NSMAP = {None: UNATTEND_NS, "wcm": WCM_NS}

View File

26
webapp/services/audit.py Normal file
View File

@@ -0,0 +1,26 @@
"""Audit logging for the PXE webapp.
Every write-action route should call ``audit(action, detail)`` to record
who did what. The log lives at ``AUDIT_LOG`` (configurable via env var)
and is consumed by the /audit page.
"""
import logging
from flask import request
import config
_audit_handler = logging.FileHandler(config.AUDIT_LOG, mode="a")
_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
audit_logger = logging.getLogger("pxe_audit")
audit_logger.setLevel(logging.INFO)
if not audit_logger.handlers:
audit_logger.addHandler(_audit_handler)
def audit(action, detail=""):
"""Write an entry to the audit log."""
ip = request.remote_addr if request else "system"
audit_logger.info(f"[{ip}] {action}: {detail}")

28
webapp/services/csrf.py Normal file
View File

@@ -0,0 +1,28 @@
"""Session-based CSRF token: generate per-session, double-submit on POST."""
import secrets
from flask import abort, request, session
def generate_csrf_token():
"""Return the CSRF token for the current session, creating one if needed."""
if "_csrf_token" not in session:
session["_csrf_token"] = secrets.token_hex(32)
return session["_csrf_token"]
def init_csrf(app):
"""Wire CSRF protection into a Flask app: validator + template helper."""
@app.before_request
def _validate_csrf():
if request.method != "POST":
return
token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token")
if not token or token != generate_csrf_token():
abort(403)
@app.context_processor
def _inject_csrf_token():
return {"csrf_token": generate_csrf_token}

96
webapp/services/deploy.py Normal file
View File

@@ -0,0 +1,96 @@
"""Image deploy import logic: copy/move from a USB or upload-dir source
into ``SAMBA_SHARE/<image_type>/Deploy/`` while merging shared subdirs
(``Out-of-box Drivers`` etc.) into ``SAMBA_SHARE/_shared/`` and replacing
the per-image copies with symlinks. This is what lets two image types
re-use the same multi-GB driver tree without doubling disk usage.
"""
import os
import shutil
import config
from services.system import find_usb_mounts
def _replace_with_symlink(link_path, target_path):
"""Replace a file/dir/symlink at link_path with a symlink to target_path."""
if os.path.islink(link_path):
os.remove(link_path)
elif os.path.isdir(link_path):
shutil.rmtree(link_path)
os.symlink(target_path, link_path)
def _merge_tree(src, dst, move=False):
"""Recursively merge src tree into dst, overwriting existing files.
When move=True, files are moved instead of copied (saves disk space
on imports from the local upload-dir).
"""
_transfer = shutil.move if move else shutil.copy2
_transfer_tree = shutil.move if move else shutil.copytree
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
if os.path.isdir(d):
_merge_tree(s, d, move=move)
else:
if os.path.exists(d):
os.remove(d)
_transfer_tree(s, d)
else:
os.makedirs(os.path.dirname(d), exist_ok=True)
_transfer(s, d)
def import_deploy(src_deploy, dst_deploy, target="", move=False):
"""Import Deploy/ contents, redirecting shared subdirs into _shared/."""
scoped_shared = []
prefix_key = ""
for prefix, dirs in config.SHARED_DEPLOY_SCOPED.items():
if target.startswith(prefix):
scoped_shared = dirs
prefix_key = prefix
break
_transfer = shutil.move if move else shutil.copy2
_transfer_tree = shutil.move if move else shutil.copytree
os.makedirs(dst_deploy, exist_ok=True)
for item in os.listdir(src_deploy):
src_item = os.path.join(src_deploy, item)
dst_item = os.path.join(dst_deploy, item)
if not os.path.isdir(src_item):
_transfer(src_item, dst_item)
continue
if item in config.SHARED_DEPLOY_GLOBAL:
shared_dest = os.path.join(config.SHARED_DIR, item)
os.makedirs(shared_dest, exist_ok=True)
_merge_tree(src_item, shared_dest, move=move)
_replace_with_symlink(dst_item, shared_dest)
continue
if item in scoped_shared:
shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}")
os.makedirs(shared_dest, exist_ok=True)
_merge_tree(src_item, shared_dest, move=move)
_replace_with_symlink(dst_item, shared_dest)
continue
if os.path.isdir(dst_item):
_merge_tree(src_item, dst_item, move=move)
else:
_transfer_tree(src_item, dst_item)
def allowed_import_source(source):
"""True if source is a USB mount or under the upload dir."""
usb = find_usb_mounts()
if any(source == m or source.startswith(m + "/") for m in usb):
return True
if source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/"):
return os.path.isdir(source)
return False

75
webapp/services/fs.py Normal file
View File

@@ -0,0 +1,75 @@
"""Filesystem path helpers + tiny JSON load/save utilities.
All paths are derived from ``config.SAMBA_SHARE`` so swapping the share
root only requires changing one env var.
"""
import json
import os
import config
def image_root(image_type):
"""Return the root directory for an image type."""
return os.path.join(config.SAMBA_SHARE, image_type)
def deploy_path(image_type):
"""Return the Deploy directory for an image type."""
return os.path.join(config.SAMBA_SHARE, image_type, "Deploy")
def unattend_path(image_type):
"""Return the unattend.xml path for an image type."""
return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml")
def control_path(image_type):
"""Return the Deploy/Control directory for an image type."""
return os.path.join(deploy_path(image_type), "Control")
def tools_path(image_type):
"""Return the Tools directory for an image type."""
return os.path.join(config.SAMBA_SHARE, image_type, "Tools")
def load_json(filepath):
"""Parse a JSON file and return its contents, or [] on failure."""
try:
with open(filepath, "r", encoding="utf-8-sig") as fh:
return json.load(fh)
except (OSError, json.JSONDecodeError):
return []
def save_json(filepath, data):
"""Write data as pretty-printed JSON, creating parent dirs as needed."""
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2, ensure_ascii=False)
fh.write("\n")
def resolve_destination(dest_dir, image_type):
"""Convert a Windows ``*destinationdir*`` path to a Linux filesystem path.
Replaces the placeholder + backslashes, prepends
``SAMBA_SHARE/image_type/``, then resolves symlinks so shared dirs
are followed.
"""
if not dest_dir:
return ""
path = dest_dir
lower = path.lower()
idx = lower.find("*destinationdir*")
if idx != -1:
path = path[idx + len("*destinationdir*"):]
path = path.replace("\\", "/").lstrip("/")
full = os.path.join(config.SAMBA_SHARE, image_type, path)
try:
full = os.path.realpath(full)
except OSError:
pass
return full

123
webapp/services/images.py Normal file
View File

@@ -0,0 +1,123 @@
"""Per-image-type state probes: status + config (drivers / OS / packages / models)."""
import os
import config
from services import fs
def image_status(image_type):
"""Return a dict describing the state of an image type."""
dp = fs.deploy_path(image_type)
up = fs.unattend_path(image_type)
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
has_unattend = os.path.isfile(up)
return {
"image_type": image_type,
"friendly_name": config.FRIENDLY_NAMES.get(image_type, image_type),
"deploy_path": dp,
"has_content": has_content,
"has_unattend": has_unattend,
}
def load_image_config(image_type):
"""Load all JSON configs for an image and check on-disk presence."""
ctrl = fs.control_path(image_type)
tools = fs.tools_path(image_type)
# --- Drivers (merge HardwareDriver.json + hw_drivers.json) ---
hw_driver_file = os.path.join(ctrl, "HardwareDriver.json")
hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json")
drivers_raw = fs.load_json(hw_driver_file)
extra_raw = fs.load_json(hw_drivers_extra)
seen_files = set()
drivers = []
for d in drivers_raw + extra_raw:
fname = (d.get("FileName") or d.get("fileName") or "").lower()
if fname and fname in seen_files:
continue
if fname:
seen_files.add(fname)
drivers.append(d)
for d in drivers:
fname = d.get("FileName") or d.get("fileName") or ""
dest = d.get("DestinationDir") or d.get("destinationDir") or ""
resolved = fs.resolve_destination(dest, image_type)
if resolved and fname:
d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
else:
d["_on_disk"] = False
# --- Operating Systems ---
os_file = os.path.join(ctrl, "OperatingSystem.json")
operating_systems = fs.load_json(os_file)
for entry in operating_systems:
osv = entry.get("operatingSystemVersion", {})
wim = osv.get("wim", {})
dest = wim.get("DestinationDir") or wim.get("destinationDir") or ""
resolved = fs.resolve_destination(dest, image_type)
if resolved:
entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim"))
else:
entry["_on_disk"] = False
# --- Packages ---
pkg_file = os.path.join(ctrl, "packages.json")
packages = fs.load_json(pkg_file)
for p in packages:
fname = p.get("fileName") or p.get("FileName") or ""
dest = p.get("destinationDir") or p.get("DestinationDir") or ""
resolved = fs.resolve_destination(dest, image_type)
if resolved and fname:
p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
else:
p["_on_disk"] = False
# --- Hardware Models (user_selections.json) ---
us_file = os.path.join(tools, "user_selections.json")
us_raw = fs.load_json(us_file)
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
hardware_models = us_data.get("HardwareModelSelection", [])
os_selection = str(us_data.get("OperatingSystemSelection", ""))
family_lookup = {}
for d in drivers:
family = d.get("family", "")
if family:
family_lookup[family] = d
for hm in hardware_models:
family_id = hm.get("Id", "")
matched = family_lookup.get(family_id)
hm["_on_disk"] = matched["_on_disk"] if matched else False
# --- Orphan drivers: zip files on disk not referenced in any JSON ---
orphan_drivers = []
oob_dir = os.path.join(fs.deploy_path(image_type), "Out-of-box Drivers")
try:
oob_dir = os.path.realpath(oob_dir)
except OSError:
pass
registered_files = set()
for d in drivers:
fname = d.get("FileName") or d.get("fileName") or ""
if fname:
registered_files.add(fname.lower())
if os.path.isdir(oob_dir):
for dirpath, _dirnames, filenames in os.walk(oob_dir):
for fn in filenames:
if fn.lower().endswith(".zip") and fn.lower() not in registered_files:
rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir)
orphan_drivers.append({"fileName": fn, "relPath": rel})
return {
"hardware_models": hardware_models,
"drivers": drivers,
"operating_systems": operating_systems,
"packages": packages,
"orphan_drivers": orphan_drivers,
"os_selection": os_selection,
}

55
webapp/services/system.py Normal file
View File

@@ -0,0 +1,55 @@
"""Host-system probes: systemd service status, USB mounts, upload sources."""
import os
import subprocess
import config
def service_status(service_name):
"""Check whether a systemd service is active."""
try:
result = subprocess.run(
["systemctl", "is-active", service_name],
capture_output=True,
text=True,
timeout=5,
)
state = result.stdout.strip()
return {"name": service_name, "active": state == "active", "state": state}
except Exception as exc:
return {"name": service_name, "active": False, "state": str(exc)}
def find_usb_mounts():
"""Return a list of mount-point paths that look like removable media."""
mounts = []
try:
with open("/proc/mounts", "r") as fh:
for line in fh:
parts = line.split()
if len(parts) >= 2:
mount_point = parts[1]
if mount_point.startswith(("/mnt/", "/media/")):
if os.path.isdir(mount_point):
mounts.append(mount_point)
except OSError:
pass
return sorted(set(mounts))
def find_upload_sources():
"""Return sub-directories inside UPLOAD_DIR that look like image content."""
sources = []
if os.path.isdir(config.UPLOAD_DIR):
try:
entries = os.listdir(config.UPLOAD_DIR)
if entries:
sources.append(config.UPLOAD_DIR)
for entry in entries:
full = os.path.join(config.UPLOAD_DIR, entry)
if os.path.isdir(full):
sources.append(full)
except OSError:
pass
return sources

496
webapp/services/unattend.py Normal file
View File

@@ -0,0 +1,496 @@
"""Unattend.xml parser, builder, and form-data extractor.
Round-trips a Windows ``FlatUnattendW10.xml`` between the disk format
and a flat dict the editor template can render. ``parse_unattend()``
reads disk + returns a dict; ``build_unattend_xml()`` takes the dict
+ returns a string for writing back; ``extract_form_data()`` converts
a Flask ``request.form`` MultiDict into the dict format that
``build_unattend_xml()`` expects.
"""
import os
from lxml import etree
import config
UNATTEND_TEMPLATE = """\
<?xml version="1.0" encoding="utf-8"?>
<unattend xmlns="urn:schemas-microsoft-com:unattend"
xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State">
<settings pass="windowsPE" />
<settings pass="offlineServicing" />
<settings pass="specialize" />
<settings pass="oobeSystem" />
</unattend>
"""
def qn(tag):
"""Return a tag qualified with the default unattend namespace."""
return f"{{{config.UNATTEND_NS}}}{tag}"
def qwcm(attr):
"""Return an attribute qualified with the wcm namespace."""
return f"{{{config.WCM_NS}}}{attr}"
def _find_or_create(parent, tag):
"""Find the first child with *tag* or create it."""
el = parent.find(tag, namespaces={"": config.UNATTEND_NS})
if el is None:
el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag)
return el
def _settings_pass(root, pass_name):
"""Return the <settings pass="..."> element, creating if needed."""
for s in root.findall(qn("settings")):
if s.get("pass") == pass_name:
return s
s = etree.SubElement(root, qn("settings"))
s.set("pass", pass_name)
return s
def parse_unattend(xml_path):
"""Parse an unattend.xml and return a dict of editable data."""
data = {
"driver_paths": [],
"computer_name": "",
"registered_organization": "",
"registered_owner": "",
"time_zone": "",
"specialize_commands": [],
"oobe": {
"HideEULAPage": "true",
"HideOEMRegistrationScreen": "true",
"HideOnlineAccountScreens": "true",
"HideWirelessSetupInOOBE": "true",
"HideLocalAccountScreen": "true",
"NetworkLocation": "Work",
"ProtectYourPC": "3",
"SkipUserOOBE": "true",
"SkipMachineOOBE": "true",
},
"firstlogon_commands": [],
"user_accounts": [],
"autologon": {
"enabled": "",
"username": "",
"password": "",
"plain_text": "true",
"logon_count": "",
},
"intl": {
"input_locale": "",
"system_locale": "",
"ui_language": "",
"user_locale": "",
},
"oobe_timezone": "",
"raw_xml": "",
}
if not os.path.isfile(xml_path):
data["raw_xml"] = UNATTEND_TEMPLATE
return data
with open(xml_path, "r", encoding="utf-8") as fh:
raw = fh.read()
data["raw_xml"] = raw
try:
root = etree.fromstring(raw.encode("utf-8"))
except etree.XMLSyntaxError:
return data
ns = {"u": config.UNATTEND_NS}
# --- offlineServicing: DriverPaths ---
for dp_el in root.xpath(
"u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path",
namespaces=ns,
):
if dp_el.text:
data["driver_paths"].append(dp_el.text.strip())
# --- specialize: Shell-Setup ---
for comp in root.xpath("u:settings[@pass='specialize']/u:component", namespaces=ns):
comp_name = comp.get("name", "")
if "Shell-Setup" in comp_name:
for tag, key in [
("ComputerName", "computer_name"),
("RegisteredOrganization", "registered_organization"),
("RegisteredOwner", "registered_owner"),
("TimeZone", "time_zone"),
]:
el = comp.find(qn(tag))
if el is not None and el.text:
data[key] = el.text.strip()
# --- specialize: RunSynchronous commands ---
for cmd in root.xpath(
"u:settings[@pass='specialize']//u:RunSynchronousCommand",
namespaces=ns,
):
order_el = cmd.find(qn("Order"))
path_el = cmd.find(qn("Path"))
desc_el = cmd.find(qn("Description"))
data["specialize_commands"].append({
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
"path": path_el.text.strip() if path_el is not None and path_el.text else "",
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
})
# --- oobeSystem ---
for comp in root.xpath("u:settings[@pass='oobeSystem']/u:component", namespaces=ns):
comp_name = comp.get("name", "")
if "International-Core" in comp_name:
for tag, key in [
("InputLocale", "input_locale"),
("SystemLocale", "system_locale"),
("UILanguage", "ui_language"),
("UserLocale", "user_locale"),
]:
el = comp.find(qn(tag))
if el is not None and el.text:
data["intl"][key] = el.text.strip()
if "OOBE" in comp_name or "Shell-Setup" in comp_name:
oobe_el = comp.find(qn("OOBE"))
if oobe_el is not None:
for child in oobe_el:
local = etree.QName(child).localname
if local in data["oobe"] and child.text:
data["oobe"][local] = child.text.strip()
ua = comp.find(qn("UserAccounts"))
if ua is not None:
la_container = ua.find(qn("LocalAccounts"))
if la_container is not None:
for acct in la_container.findall(qn("LocalAccount")):
name_el = acct.find(qn("Name"))
group_el = acct.find(qn("Group"))
display_el = acct.find(qn("DisplayName"))
pw_el = acct.find(qn("Password"))
pw_val = ""
pw_plain = "true"
if pw_el is not None:
v = pw_el.find(qn("Value"))
p = pw_el.find(qn("PlainText"))
if v is not None and v.text:
pw_val = v.text.strip()
if p is not None and p.text:
pw_plain = p.text.strip()
data["user_accounts"].append({
"name": name_el.text.strip() if name_el is not None and name_el.text else "",
"password": pw_val,
"plain_text": pw_plain,
"group": group_el.text.strip() if group_el is not None and group_el.text else "Administrators",
"display_name": display_el.text.strip() if display_el is not None and display_el.text else "",
})
al = comp.find(qn("AutoLogon"))
if al is not None:
enabled_el = al.find(qn("Enabled"))
user_el = al.find(qn("Username"))
count_el = al.find(qn("LogonCount"))
pw_el = al.find(qn("Password"))
pw_val = ""
pw_plain = "true"
if pw_el is not None:
v = pw_el.find(qn("Value"))
p = pw_el.find(qn("PlainText"))
if v is not None and v.text:
pw_val = v.text.strip()
if p is not None and p.text:
pw_plain = p.text.strip()
data["autologon"] = {
"enabled": enabled_el.text.strip() if enabled_el is not None and enabled_el.text else "",
"username": user_el.text.strip() if user_el is not None and user_el.text else "",
"password": pw_val,
"plain_text": pw_plain,
"logon_count": count_el.text.strip() if count_el is not None and count_el.text else "",
}
flc = comp.find(qn("FirstLogonCommands"))
if flc is not None:
for sync in flc.findall(qn("SynchronousCommand")):
order_el = sync.find(qn("Order"))
cl_el = sync.find(qn("CommandLine"))
desc_el = sync.find(qn("Description"))
data["firstlogon_commands"].append({
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
"commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "",
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
})
tz_el = comp.find(qn("TimeZone"))
if tz_el is not None and tz_el.text:
data["oobe_timezone"] = tz_el.text.strip()
return data
def build_unattend_xml(form_data):
"""Build a complete unattend.xml string from form data dict."""
root = etree.Element(qn("unattend"), nsmap=config.NSMAP)
_settings_pass(root, "windowsPE")
# --- offlineServicing: DriverPaths ---
offline = _settings_pass(root, "offlineServicing")
driver_paths = form_data.get("driver_paths", [])
if driver_paths:
comp = etree.SubElement(offline, qn("component"))
comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE")
comp.set("processorArchitecture", "amd64")
comp.set("publicKeyToken", "31bf3856ad364e35")
comp.set("language", "neutral")
comp.set("versionScope", "nonSxS")
dp_container = etree.SubElement(comp, qn("DriverPaths"))
for idx, dp in enumerate(driver_paths, start=1):
if not dp.strip():
continue
pac = etree.SubElement(dp_container, qn("PathAndCredentials"))
pac.set(qwcm("action"), "add")
pac.set(qwcm("keyValue"), str(idx))
path_el = etree.SubElement(pac, qn("Path"))
path_el.text = dp.strip()
# --- specialize ---
spec = _settings_pass(root, "specialize")
shell_comp = etree.SubElement(spec, qn("component"))
shell_comp.set("name", "Microsoft-Windows-Shell-Setup")
shell_comp.set("processorArchitecture", "amd64")
shell_comp.set("publicKeyToken", "31bf3856ad364e35")
shell_comp.set("language", "neutral")
shell_comp.set("versionScope", "nonSxS")
for tag, key in [
("ComputerName", "computer_name"),
("RegisteredOrganization", "registered_organization"),
("RegisteredOwner", "registered_owner"),
("TimeZone", "time_zone"),
]:
val = form_data.get(key, "").strip()
if val:
el = etree.SubElement(shell_comp, qn(tag))
el.text = val
spec_cmds = form_data.get("specialize_commands", [])
if spec_cmds:
deploy_comp = etree.SubElement(spec, qn("component"))
deploy_comp.set("name", "Microsoft-Windows-Deployment")
deploy_comp.set("processorArchitecture", "amd64")
deploy_comp.set("publicKeyToken", "31bf3856ad364e35")
deploy_comp.set("language", "neutral")
deploy_comp.set("versionScope", "nonSxS")
rs = etree.SubElement(deploy_comp, qn("RunSynchronous"))
for idx, cmd in enumerate(spec_cmds, start=1):
if not cmd.get("path", "").strip():
continue
rsc = etree.SubElement(rs, qn("RunSynchronousCommand"))
rsc.set(qwcm("action"), "add")
order_el = etree.SubElement(rsc, qn("Order"))
order_el.text = str(idx)
path_el = etree.SubElement(rsc, qn("Path"))
path_el.text = cmd["path"].strip()
desc_el = etree.SubElement(rsc, qn("Description"))
desc_el.text = cmd.get("description", "").strip()
# --- oobeSystem ---
oobe_settings = _settings_pass(root, "oobeSystem")
intl = form_data.get("intl", {})
if any(v.strip() for v in intl.values() if v):
intl_comp = etree.SubElement(oobe_settings, qn("component"))
intl_comp.set("name", "Microsoft-Windows-International-Core")
intl_comp.set("processorArchitecture", "amd64")
intl_comp.set("publicKeyToken", "31bf3856ad364e35")
intl_comp.set("language", "neutral")
intl_comp.set("versionScope", "nonSxS")
for tag, key in [
("InputLocale", "input_locale"),
("SystemLocale", "system_locale"),
("UILanguage", "ui_language"),
("UserLocale", "user_locale"),
]:
val = intl.get(key, "").strip()
if val:
el = etree.SubElement(intl_comp, qn(tag))
el.text = val
oobe_comp = etree.SubElement(oobe_settings, qn("component"))
oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
oobe_comp.set("processorArchitecture", "amd64")
oobe_comp.set("publicKeyToken", "31bf3856ad364e35")
oobe_comp.set("language", "neutral")
oobe_comp.set("versionScope", "nonSxS")
oobe_el = etree.SubElement(oobe_comp, qn("OOBE"))
oobe_data = form_data.get("oobe", {})
for key in [
"HideEULAPage",
"HideOEMRegistrationScreen",
"HideOnlineAccountScreens",
"HideWirelessSetupInOOBE",
"HideLocalAccountScreen",
"NetworkLocation",
"ProtectYourPC",
"SkipUserOOBE",
"SkipMachineOOBE",
]:
val = oobe_data.get(key, "")
if val:
child = etree.SubElement(oobe_el, qn(key))
child.text = str(val)
accounts = form_data.get("user_accounts", [])
if accounts:
ua = etree.SubElement(oobe_comp, qn("UserAccounts"))
la_container = etree.SubElement(ua, qn("LocalAccounts"))
for acct in accounts:
if not acct.get("name", "").strip():
continue
la = etree.SubElement(la_container, qn("LocalAccount"))
la.set(qwcm("action"), "add")
pw = etree.SubElement(la, qn("Password"))
pw_val = etree.SubElement(pw, qn("Value"))
pw_val.text = acct.get("password", "")
pw_plain = etree.SubElement(pw, qn("PlainText"))
pw_plain.text = acct.get("plain_text", "true")
name_el = etree.SubElement(la, qn("Name"))
name_el.text = acct["name"].strip()
group_el = etree.SubElement(la, qn("Group"))
group_el.text = acct.get("group", "Administrators").strip()
display_el = etree.SubElement(la, qn("DisplayName"))
display_el.text = acct.get("display_name", acct["name"]).strip()
autologon = form_data.get("autologon", {})
if autologon.get("username", "").strip():
al = etree.SubElement(oobe_comp, qn("AutoLogon"))
al_pw = etree.SubElement(al, qn("Password"))
al_pw_val = etree.SubElement(al_pw, qn("Value"))
al_pw_val.text = autologon.get("password", "")
al_pw_plain = etree.SubElement(al_pw, qn("PlainText"))
al_pw_plain.text = autologon.get("plain_text", "true")
al_enabled = etree.SubElement(al, qn("Enabled"))
al_enabled.text = autologon.get("enabled", "true")
al_user = etree.SubElement(al, qn("Username"))
al_user.text = autologon["username"].strip()
logon_count = autologon.get("logon_count", "").strip()
if logon_count:
al_count = etree.SubElement(al, qn("LogonCount"))
al_count.text = logon_count
fl_cmds = form_data.get("firstlogon_commands", [])
if fl_cmds:
flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands"))
for idx, cmd in enumerate(fl_cmds, start=1):
if not cmd.get("commandline", "").strip():
continue
sc = etree.SubElement(flc, qn("SynchronousCommand"))
sc.set(qwcm("action"), "add")
order_el = etree.SubElement(sc, qn("Order"))
order_el.text = str(idx)
cl_el = etree.SubElement(sc, qn("CommandLine"))
cl_el.text = cmd["commandline"].strip()
desc_el = etree.SubElement(sc, qn("Description"))
desc_el.text = cmd.get("description", "").strip()
oobe_tz = form_data.get("oobe_timezone", "").strip()
if oobe_tz:
tz_el = etree.SubElement(oobe_comp, qn("TimeZone"))
tz_el.text = oobe_tz
xml_bytes = etree.tostring(
root,
pretty_print=True,
xml_declaration=True,
encoding="utf-8",
)
return xml_bytes.decode("utf-8")
def extract_form_data(form):
"""Pull structured data from the submitted form."""
data = {}
dp_list = form.getlist("driver_path[]")
data["driver_paths"] = [p for p in dp_list if p.strip()]
data["computer_name"] = form.get("computer_name", "")
data["registered_organization"] = form.get("registered_organization", "")
data["registered_owner"] = form.get("registered_owner", "")
data["time_zone"] = form.get("time_zone", "")
spec_paths = form.getlist("spec_cmd_path[]")
spec_descs = form.getlist("spec_cmd_desc[]")
data["specialize_commands"] = []
for i in range(len(spec_paths)):
if spec_paths[i].strip():
data["specialize_commands"].append({
"path": spec_paths[i],
"description": spec_descs[i] if i < len(spec_descs) else "",
})
data["oobe"] = {}
for key in [
"HideEULAPage",
"HideOEMRegistrationScreen",
"HideOnlineAccountScreens",
"HideWirelessSetupInOOBE",
"HideLocalAccountScreen",
"SkipUserOOBE",
"SkipMachineOOBE",
]:
data["oobe"][key] = form.get(f"oobe_{key}", "false")
data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work")
data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3")
fl_cls = form.getlist("fl_cmd_commandline[]")
fl_descs = form.getlist("fl_cmd_desc[]")
data["firstlogon_commands"] = []
for i in range(len(fl_cls)):
if fl_cls[i].strip():
data["firstlogon_commands"].append({
"commandline": fl_cls[i],
"description": fl_descs[i] if i < len(fl_descs) else "",
})
accounts = []
i = 0
while form.get(f"account_name_{i}"):
accounts.append({
"name": form.get(f"account_name_{i}", ""),
"password": form.get(f"account_password_{i}", ""),
"plain_text": form.get(f"account_plaintext_{i}", "true"),
"group": form.get(f"account_group_{i}", "Administrators"),
"display_name": form.get(f"account_display_{i}", ""),
})
i += 1
data["user_accounts"] = accounts
data["autologon"] = {
"enabled": form.get("autologon_enabled", ""),
"username": form.get("autologon_username", ""),
"password": form.get("autologon_password", ""),
"plain_text": form.get("autologon_plaintext", "true"),
"logon_count": form.get("autologon_logoncount", ""),
}
data["intl"] = {
"input_locale": form.get("intl_input_locale", ""),
"system_locale": form.get("intl_system_locale", ""),
"ui_language": form.get("intl_ui_language", ""),
"user_locale": form.get("intl_user_locale", ""),
}
data["oobe_timezone"] = form.get("oobe_timezone", "")
return data

70
webapp/services/wim.py Normal file
View File

@@ -0,0 +1,70 @@
"""boot.wim manipulation via wimtools (wimextract / wimupdate / wimdir).
Used by the startnet.cmd editor to extract + update the boot script
that runs when WinPE boots from PXE.
"""
import os
import shutil
import subprocess
import tempfile
def extract_startnet(wim_path):
"""Extract startnet.cmd from a WIM file. Returns the contents or None."""
tmpdir = tempfile.mkdtemp()
try:
result = subprocess.run(
["wimextract", wim_path, "1",
"/Windows/System32/startnet.cmd",
"--dest-dir", tmpdir],
capture_output=True, text=True, timeout=30,
)
startnet_path = os.path.join(tmpdir, "startnet.cmd")
if result.returncode == 0 and os.path.isfile(startnet_path):
with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh:
return fh.read()
return None
except Exception:
return None
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def update_startnet(wim_path, content):
"""Update startnet.cmd inside a WIM file via wimupdate.
Returns (ok, error_message). Writes CRLF line endings.
"""
tmpdir = tempfile.mkdtemp()
try:
startnet_path = os.path.join(tmpdir, "startnet.cmd")
with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh:
fh.write(content)
update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n"
result = subprocess.run(
["wimupdate", wim_path, "1"],
input=update_cmd,
capture_output=True, text=True, timeout=60,
)
if result.returncode != 0:
return False, result.stderr.strip()
return True, ""
except Exception as exc:
return False, str(exc)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def list_files(wim_path, path="/"):
"""List files inside a WIM at the given path."""
try:
result = subprocess.run(
["wimdir", wim_path, "1", path],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
return [l.strip() for l in result.stdout.splitlines() if l.strip()]
return []
except Exception:
return []