- Add startnet.cmd: FlatSetupLoader.exe + Boot.tag/Media.tag eliminates physical USB requirement for WinPE PXE deployment - Add Upload-Image.ps1: PowerShell script to robocopy MCL cached images to PXE server via SMB (Deploy, Tools, Sources) - Add gea-shopfloor-mce image type across playbook, webapp, startnet - Change webapp import to move (not copy) for upload sources to save disk - Add Samba symlink following config for shared image directories - Add Media.tag creation task in playbook for drive detection - Update prepare-boot-tools.sh with Blancco config/initramfs patching - Add grub-efi-amd64-bin to download-packages.sh Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1129 lines
40 KiB
Python
1129 lines
40 KiB
Python
#!/usr/bin/env python3
|
|
"""Flask web application for managing a GE Aerospace PXE server."""
|
|
|
|
import logging
|
|
import os
|
|
import secrets
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from flask import (
|
|
Flask,
|
|
abort,
|
|
flash,
|
|
jsonify,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_file,
|
|
session,
|
|
url_for,
|
|
)
|
|
from lxml import etree
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
|
|
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 * 1024 # 16 GB max upload
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit logging
|
|
# ---------------------------------------------------------------------------
|
|
AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log")
|
|
audit_logger = logging.getLogger("pxe_audit")
|
|
audit_logger.setLevel(logging.INFO)
|
|
_audit_handler = logging.FileHandler(AUDIT_LOG, mode="a")
|
|
_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
|
|
audit_logger.addHandler(_audit_handler)
|
|
|
|
|
|
def audit(action, detail=""):
|
|
"""Write an entry to the audit log."""
|
|
ip = request.remote_addr if request else "system"
|
|
audit_logger.info(f"[{ip}] {action}: {detail}")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
|
|
CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla")
|
|
BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports")
|
|
UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload")
|
|
SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared")
|
|
# Subdirs inside Deploy/ shared across ALL image types
|
|
SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"]
|
|
# Subdirs inside Deploy/ shared within the same image family (by prefix)
|
|
SHARED_DEPLOY_SCOPED = {
|
|
"gea-": ["Operating Systems"],
|
|
"ge-": ["Operating Systems"],
|
|
}
|
|
# Sibling dirs at image root shared within the same image family
|
|
SHARED_ROOT_DIRS = {
|
|
"gea-": ["Sources"],
|
|
"ge-": ["Sources"],
|
|
}
|
|
WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html")
|
|
BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim")
|
|
|
|
IMAGE_TYPES = [
|
|
"gea-standard",
|
|
"gea-engineer",
|
|
"gea-shopfloor",
|
|
"gea-shopfloor-mce",
|
|
"ge-standard",
|
|
"ge-engineer",
|
|
"ge-shopfloor-lockdown",
|
|
"ge-shopfloor-mce",
|
|
]
|
|
|
|
FRIENDLY_NAMES = {
|
|
"gea-standard": "GE Aerospace Standard",
|
|
"gea-engineer": "GE Aerospace Engineer",
|
|
"gea-shopfloor": "GE Aerospace Shop Floor",
|
|
"gea-shopfloor-mce": "GE Aerospace Shop Floor MCE",
|
|
"ge-standard": "GE Legacy Standard",
|
|
"ge-engineer": "GE Legacy Engineer",
|
|
"ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown",
|
|
"ge-shopfloor-mce": "GE Legacy Shop Floor MCE",
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CSRF protection
|
|
# ---------------------------------------------------------------------------
|
|
def generate_csrf_token():
|
|
"""Return the CSRF token for the current session, creating one if needed."""
|
|
if "_csrf_token" not in session:
|
|
session["_csrf_token"] = secrets.token_hex(32)
|
|
return session["_csrf_token"]
|
|
|
|
|
|
@app.context_processor
|
|
def inject_csrf_token():
|
|
"""Make csrf_token() available in all templates."""
|
|
return {"csrf_token": generate_csrf_token}
|
|
|
|
|
|
@app.before_request
|
|
def validate_csrf():
|
|
"""Reject POST requests with a missing or invalid CSRF token."""
|
|
if request.method != "POST":
|
|
return
|
|
token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token")
|
|
if not token or token != generate_csrf_token():
|
|
abort(403)
|
|
|
|
|
|
NS = "urn:schemas-microsoft-com:unattend"
|
|
WCM = "http://schemas.microsoft.com/WMIConfig/2002/State"
|
|
NSMAP = {None: NS, "wcm": WCM}
|
|
|
|
# Convenience qualified-name helpers
|
|
def qn(tag):
|
|
"""Return a tag qualified with the default unattend namespace."""
|
|
return f"{{{NS}}}{tag}"
|
|
|
|
def qwcm(attr):
|
|
"""Return an attribute qualified with the wcm namespace."""
|
|
return f"{{{WCM}}}{attr}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Utility helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def image_root(image_type):
|
|
"""Return the root directory for an image type."""
|
|
return os.path.join(SAMBA_SHARE, image_type)
|
|
|
|
|
|
def deploy_path(image_type):
|
|
"""Return the Deploy directory for an image type."""
|
|
return os.path.join(SAMBA_SHARE, image_type, "Deploy")
|
|
|
|
|
|
def unattend_path(image_type):
|
|
"""Return the unattend.xml path for an image type."""
|
|
return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml")
|
|
|
|
|
|
def image_status(image_type):
|
|
"""Return a dict describing the state of an image type."""
|
|
dp = deploy_path(image_type)
|
|
up = unattend_path(image_type)
|
|
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
|
|
has_unattend = os.path.isfile(up)
|
|
return {
|
|
"image_type": image_type,
|
|
"friendly_name": FRIENDLY_NAMES.get(image_type, image_type),
|
|
"deploy_path": dp,
|
|
"has_content": has_content,
|
|
"has_unattend": has_unattend,
|
|
}
|
|
|
|
|
|
def service_status(service_name):
|
|
"""Check whether a systemd service is active."""
|
|
try:
|
|
result = subprocess.run(
|
|
["systemctl", "is-active", service_name],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
state = result.stdout.strip()
|
|
return {"name": service_name, "active": state == "active", "state": state}
|
|
except Exception as exc:
|
|
return {"name": service_name, "active": False, "state": str(exc)}
|
|
|
|
|
|
def find_usb_mounts():
|
|
"""Return a list of mount-point paths that look like removable media."""
|
|
mounts = []
|
|
try:
|
|
with open("/proc/mounts", "r") as fh:
|
|
for line in fh:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
mount_point = parts[1]
|
|
if mount_point.startswith(("/mnt/", "/media/")):
|
|
if os.path.isdir(mount_point):
|
|
mounts.append(mount_point)
|
|
except OSError:
|
|
pass
|
|
return sorted(set(mounts))
|
|
|
|
|
|
def find_upload_sources():
|
|
"""Return sub-directories inside UPLOAD_DIR that look like image content."""
|
|
sources = []
|
|
if os.path.isdir(UPLOAD_DIR):
|
|
# Include the upload dir itself if it has content
|
|
try:
|
|
entries = os.listdir(UPLOAD_DIR)
|
|
if entries:
|
|
sources.append(UPLOAD_DIR)
|
|
# Also include immediate subdirectories
|
|
for entry in entries:
|
|
full = os.path.join(UPLOAD_DIR, entry)
|
|
if os.path.isdir(full):
|
|
sources.append(full)
|
|
except OSError:
|
|
pass
|
|
return sources
|
|
|
|
|
|
def _import_deploy(src_deploy, dst_deploy, target="", move=False):
|
|
"""Import Deploy directory contents, merging shared subdirs into _shared.
|
|
When move=True, files are moved instead of copied (saves disk space)."""
|
|
# Build list of scoped shared dirs for this target
|
|
scoped_shared = []
|
|
prefix_key = ""
|
|
for prefix, dirs in SHARED_DEPLOY_SCOPED.items():
|
|
if target.startswith(prefix):
|
|
scoped_shared = dirs
|
|
prefix_key = prefix
|
|
break
|
|
|
|
_transfer = shutil.move if move else shutil.copy2
|
|
_transfer_tree = shutil.move if move else shutil.copytree
|
|
|
|
os.makedirs(dst_deploy, exist_ok=True)
|
|
for item in os.listdir(src_deploy):
|
|
src_item = os.path.join(src_deploy, item)
|
|
dst_item = os.path.join(dst_deploy, item)
|
|
|
|
if not os.path.isdir(src_item):
|
|
_transfer(src_item, dst_item)
|
|
continue
|
|
|
|
# Global shared (e.g., Out-of-box Drivers) — one copy for all
|
|
if item in SHARED_DEPLOY_GLOBAL:
|
|
shared_dest = os.path.join(SHARED_DIR, item)
|
|
os.makedirs(shared_dest, exist_ok=True)
|
|
_merge_tree(src_item, shared_dest, move=move)
|
|
_replace_with_symlink(dst_item, shared_dest)
|
|
continue
|
|
|
|
# Scoped shared (e.g., Operating Systems) — per family prefix
|
|
if item in scoped_shared:
|
|
shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}")
|
|
os.makedirs(shared_dest, exist_ok=True)
|
|
_merge_tree(src_item, shared_dest, move=move)
|
|
_replace_with_symlink(dst_item, shared_dest)
|
|
continue
|
|
|
|
# Normal transfer
|
|
if os.path.exists(dst_item):
|
|
shutil.rmtree(dst_item)
|
|
_transfer_tree(src_item, dst_item)
|
|
|
|
|
|
def _replace_with_symlink(link_path, target_path):
|
|
"""Replace a file/dir/symlink at link_path with a symlink to target_path."""
|
|
if os.path.islink(link_path):
|
|
os.remove(link_path)
|
|
elif os.path.isdir(link_path):
|
|
shutil.rmtree(link_path)
|
|
os.symlink(target_path, link_path)
|
|
|
|
|
|
def _merge_tree(src, dst, move=False):
|
|
"""Recursively merge src tree into dst, overwriting existing files.
|
|
When move=True, files are moved instead of copied."""
|
|
_transfer = shutil.move if move else shutil.copy2
|
|
_transfer_tree = shutil.move if move else shutil.copytree
|
|
for item in os.listdir(src):
|
|
s = os.path.join(src, item)
|
|
d = os.path.join(dst, item)
|
|
if os.path.isdir(s):
|
|
if os.path.isdir(d):
|
|
_merge_tree(s, d, move=move)
|
|
else:
|
|
if os.path.exists(d):
|
|
os.remove(d)
|
|
_transfer_tree(s, d)
|
|
else:
|
|
os.makedirs(os.path.dirname(d), exist_ok=True)
|
|
_transfer(s, d)
|
|
|
|
|
|
def allowed_import_source(source):
|
|
"""Check if a source path is a valid import location (USB or upload dir)."""
|
|
usb = find_usb_mounts()
|
|
if any(source == m or source.startswith(m + "/") for m in usb):
|
|
return True
|
|
if source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/"):
|
|
return os.path.isdir(source)
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# XML helpers — parse / build unattend.xml
|
|
# ---------------------------------------------------------------------------
|
|
|
|
UNATTEND_TEMPLATE = """\
|
|
<?xml version="1.0" encoding="utf-8"?>
|
|
<unattend xmlns="urn:schemas-microsoft-com:unattend"
|
|
xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State">
|
|
<settings pass="windowsPE" />
|
|
<settings pass="offlineServicing" />
|
|
<settings pass="specialize" />
|
|
<settings pass="oobeSystem" />
|
|
</unattend>
|
|
"""
|
|
|
|
|
|
def _find_or_create(parent, tag):
|
|
"""Find the first child with *tag* or create it."""
|
|
el = parent.find(tag, namespaces={"": NS})
|
|
if el is None:
|
|
el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag)
|
|
return el
|
|
|
|
|
|
def _settings_pass(root, pass_name):
|
|
"""Return the <settings pass="..."> element, creating if needed."""
|
|
for s in root.findall(qn("settings")):
|
|
if s.get("pass") == pass_name:
|
|
return s
|
|
s = etree.SubElement(root, qn("settings"))
|
|
s.set("pass", pass_name)
|
|
return s
|
|
|
|
|
|
def parse_unattend(xml_path):
|
|
"""Parse an unattend.xml and return a dict of editable data."""
|
|
data = {
|
|
"driver_paths": [],
|
|
"computer_name": "",
|
|
"registered_organization": "",
|
|
"registered_owner": "",
|
|
"time_zone": "",
|
|
"specialize_commands": [],
|
|
"oobe": {
|
|
"HideEULAPage": "true",
|
|
"HideOEMRegistrationScreen": "true",
|
|
"HideOnlineAccountScreens": "true",
|
|
"HideWirelessSetupInOOBE": "true",
|
|
"HideLocalAccountScreen": "true",
|
|
"NetworkLocation": "Work",
|
|
"ProtectYourPC": "3",
|
|
"SkipUserOOBE": "true",
|
|
"SkipMachineOOBE": "true",
|
|
},
|
|
"firstlogon_commands": [],
|
|
"raw_xml": "",
|
|
}
|
|
|
|
if not os.path.isfile(xml_path):
|
|
data["raw_xml"] = UNATTEND_TEMPLATE
|
|
return data
|
|
|
|
with open(xml_path, "r", encoding="utf-8") as fh:
|
|
raw = fh.read()
|
|
data["raw_xml"] = raw
|
|
|
|
try:
|
|
root = etree.fromstring(raw.encode("utf-8"))
|
|
except etree.XMLSyntaxError:
|
|
return data
|
|
|
|
ns = {"u": NS}
|
|
|
|
# --- offlineServicing: DriverPaths ---
|
|
for dp_el in root.xpath(
|
|
"u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path",
|
|
namespaces=ns,
|
|
):
|
|
if dp_el.text:
|
|
data["driver_paths"].append(dp_el.text.strip())
|
|
|
|
# --- specialize: Shell-Setup ---
|
|
for comp in root.xpath(
|
|
"u:settings[@pass='specialize']/u:component",
|
|
namespaces=ns,
|
|
):
|
|
comp_name = comp.get("name", "")
|
|
if "Shell-Setup" in comp_name:
|
|
for tag, key in [
|
|
("ComputerName", "computer_name"),
|
|
("RegisteredOrganization", "registered_organization"),
|
|
("RegisteredOwner", "registered_owner"),
|
|
("TimeZone", "time_zone"),
|
|
]:
|
|
el = comp.find(qn(tag))
|
|
if el is not None and el.text:
|
|
data[key] = el.text.strip()
|
|
|
|
# --- specialize: RunSynchronous commands ---
|
|
for cmd in root.xpath(
|
|
"u:settings[@pass='specialize']//u:RunSynchronousCommand",
|
|
namespaces=ns,
|
|
):
|
|
order_el = cmd.find(qn("Order"))
|
|
path_el = cmd.find(qn("Path"))
|
|
desc_el = cmd.find(qn("Description"))
|
|
data["specialize_commands"].append({
|
|
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
|
|
"path": path_el.text.strip() if path_el is not None and path_el.text else "",
|
|
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
|
|
})
|
|
|
|
# --- oobeSystem ---
|
|
for comp in root.xpath(
|
|
"u:settings[@pass='oobeSystem']/u:component",
|
|
namespaces=ns,
|
|
):
|
|
comp_name = comp.get("name", "")
|
|
if "OOBE" in comp_name or "Shell-Setup" in comp_name:
|
|
oobe_el = comp.find(qn("OOBE"))
|
|
if oobe_el is not None:
|
|
for child in oobe_el:
|
|
local = etree.QName(child).localname
|
|
if local in data["oobe"] and child.text:
|
|
data["oobe"][local] = child.text.strip()
|
|
|
|
# FirstLogonCommands
|
|
flc = comp.find(qn("FirstLogonCommands"))
|
|
if flc is not None:
|
|
for sync in flc.findall(qn("SynchronousCommand")):
|
|
order_el = sync.find(qn("Order"))
|
|
cl_el = sync.find(qn("CommandLine"))
|
|
desc_el = sync.find(qn("Description"))
|
|
data["firstlogon_commands"].append({
|
|
"order": order_el.text.strip() if order_el is not None and order_el.text else "",
|
|
"commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "",
|
|
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
|
|
})
|
|
|
|
return data
|
|
|
|
|
|
def build_unattend_xml(form_data):
|
|
"""Build a complete unattend.xml string from form data dict."""
|
|
root = etree.Element(qn("unattend"), nsmap=NSMAP)
|
|
|
|
# --- windowsPE (empty) ---
|
|
_settings_pass(root, "windowsPE")
|
|
|
|
# --- offlineServicing: DriverPaths ---
|
|
offline = _settings_pass(root, "offlineServicing")
|
|
driver_paths = form_data.get("driver_paths", [])
|
|
if driver_paths:
|
|
comp = etree.SubElement(offline, qn("component"))
|
|
comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE")
|
|
comp.set("processorArchitecture", "amd64")
|
|
comp.set("publicKeyToken", "31bf3856ad364e35")
|
|
comp.set("language", "neutral")
|
|
comp.set("versionScope", "nonSxS")
|
|
dp_container = etree.SubElement(comp, qn("DriverPaths"))
|
|
for idx, dp in enumerate(driver_paths, start=1):
|
|
if not dp.strip():
|
|
continue
|
|
pac = etree.SubElement(dp_container, qn("PathAndCredentials"))
|
|
pac.set(qwcm("action"), "add")
|
|
pac.set(qwcm("keyValue"), str(idx))
|
|
path_el = etree.SubElement(pac, qn("Path"))
|
|
path_el.text = dp.strip()
|
|
|
|
# --- specialize ---
|
|
spec = _settings_pass(root, "specialize")
|
|
|
|
# Shell-Setup component
|
|
shell_comp = etree.SubElement(spec, qn("component"))
|
|
shell_comp.set("name", "Microsoft-Windows-Shell-Setup")
|
|
shell_comp.set("processorArchitecture", "amd64")
|
|
shell_comp.set("publicKeyToken", "31bf3856ad364e35")
|
|
shell_comp.set("language", "neutral")
|
|
shell_comp.set("versionScope", "nonSxS")
|
|
|
|
for tag, key in [
|
|
("ComputerName", "computer_name"),
|
|
("RegisteredOrganization", "registered_organization"),
|
|
("RegisteredOwner", "registered_owner"),
|
|
("TimeZone", "time_zone"),
|
|
]:
|
|
val = form_data.get(key, "").strip()
|
|
if val:
|
|
el = etree.SubElement(shell_comp, qn(tag))
|
|
el.text = val
|
|
|
|
# Deployment / RunSynchronous commands
|
|
spec_cmds = form_data.get("specialize_commands", [])
|
|
if spec_cmds:
|
|
deploy_comp = etree.SubElement(spec, qn("component"))
|
|
deploy_comp.set("name", "Microsoft-Windows-Deployment")
|
|
deploy_comp.set("processorArchitecture", "amd64")
|
|
deploy_comp.set("publicKeyToken", "31bf3856ad364e35")
|
|
deploy_comp.set("language", "neutral")
|
|
deploy_comp.set("versionScope", "nonSxS")
|
|
rs = etree.SubElement(deploy_comp, qn("RunSynchronous"))
|
|
for idx, cmd in enumerate(spec_cmds, start=1):
|
|
if not cmd.get("path", "").strip():
|
|
continue
|
|
rsc = etree.SubElement(rs, qn("RunSynchronousCommand"))
|
|
rsc.set(qwcm("action"), "add")
|
|
order_el = etree.SubElement(rsc, qn("Order"))
|
|
order_el.text = str(idx)
|
|
path_el = etree.SubElement(rsc, qn("Path"))
|
|
path_el.text = cmd["path"].strip()
|
|
desc_el = etree.SubElement(rsc, qn("Description"))
|
|
desc_el.text = cmd.get("description", "").strip()
|
|
|
|
# --- oobeSystem ---
|
|
oobe_settings = _settings_pass(root, "oobeSystem")
|
|
oobe_comp = etree.SubElement(oobe_settings, qn("component"))
|
|
oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
|
|
oobe_comp.set("processorArchitecture", "amd64")
|
|
oobe_comp.set("publicKeyToken", "31bf3856ad364e35")
|
|
oobe_comp.set("language", "neutral")
|
|
oobe_comp.set("versionScope", "nonSxS")
|
|
|
|
oobe_el = etree.SubElement(oobe_comp, qn("OOBE"))
|
|
oobe_data = form_data.get("oobe", {})
|
|
for key in [
|
|
"HideEULAPage",
|
|
"HideOEMRegistrationScreen",
|
|
"HideOnlineAccountScreens",
|
|
"HideWirelessSetupInOOBE",
|
|
"HideLocalAccountScreen",
|
|
"NetworkLocation",
|
|
"ProtectYourPC",
|
|
"SkipUserOOBE",
|
|
"SkipMachineOOBE",
|
|
]:
|
|
val = oobe_data.get(key, "")
|
|
if val:
|
|
child = etree.SubElement(oobe_el, qn(key))
|
|
child.text = str(val)
|
|
|
|
# FirstLogonCommands
|
|
fl_cmds = form_data.get("firstlogon_commands", [])
|
|
if fl_cmds:
|
|
flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands"))
|
|
for idx, cmd in enumerate(fl_cmds, start=1):
|
|
if not cmd.get("commandline", "").strip():
|
|
continue
|
|
sc = etree.SubElement(flc, qn("SynchronousCommand"))
|
|
sc.set(qwcm("action"), "add")
|
|
order_el = etree.SubElement(sc, qn("Order"))
|
|
order_el.text = str(idx)
|
|
cl_el = etree.SubElement(sc, qn("CommandLine"))
|
|
cl_el.text = cmd["commandline"].strip()
|
|
desc_el = etree.SubElement(sc, qn("Description"))
|
|
desc_el.text = cmd.get("description", "").strip()
|
|
|
|
xml_bytes = etree.tostring(
|
|
root,
|
|
pretty_print=True,
|
|
xml_declaration=True,
|
|
encoding="utf-8",
|
|
)
|
|
return xml_bytes.decode("utf-8")
|
|
|
|
|
|
def _extract_form_data(form):
|
|
"""Pull structured data from the submitted form."""
|
|
data = {}
|
|
|
|
# Driver paths
|
|
dp_list = form.getlist("driver_path[]")
|
|
data["driver_paths"] = [p for p in dp_list if p.strip()]
|
|
|
|
# Machine settings
|
|
data["computer_name"] = form.get("computer_name", "")
|
|
data["registered_organization"] = form.get("registered_organization", "")
|
|
data["registered_owner"] = form.get("registered_owner", "")
|
|
data["time_zone"] = form.get("time_zone", "")
|
|
|
|
# Specialize commands
|
|
spec_paths = form.getlist("spec_cmd_path[]")
|
|
spec_descs = form.getlist("spec_cmd_desc[]")
|
|
data["specialize_commands"] = []
|
|
for i in range(len(spec_paths)):
|
|
if spec_paths[i].strip():
|
|
data["specialize_commands"].append({
|
|
"path": spec_paths[i],
|
|
"description": spec_descs[i] if i < len(spec_descs) else "",
|
|
})
|
|
|
|
# OOBE settings
|
|
data["oobe"] = {}
|
|
for key in [
|
|
"HideEULAPage",
|
|
"HideOEMRegistrationScreen",
|
|
"HideOnlineAccountScreens",
|
|
"HideWirelessSetupInOOBE",
|
|
"HideLocalAccountScreen",
|
|
"SkipUserOOBE",
|
|
"SkipMachineOOBE",
|
|
]:
|
|
data["oobe"][key] = form.get(f"oobe_{key}", "false")
|
|
data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work")
|
|
data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3")
|
|
|
|
# First logon commands
|
|
fl_cls = form.getlist("fl_cmd_commandline[]")
|
|
fl_descs = form.getlist("fl_cmd_desc[]")
|
|
data["firstlogon_commands"] = []
|
|
for i in range(len(fl_cls)):
|
|
if fl_cls[i].strip():
|
|
data["firstlogon_commands"].append({
|
|
"commandline": fl_cls[i],
|
|
"description": fl_descs[i] if i < len(fl_descs) else "",
|
|
})
|
|
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes — pages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/")
|
|
def dashboard():
|
|
images = [image_status(it) for it in IMAGE_TYPES]
|
|
services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
|
|
return render_template(
|
|
"dashboard.html",
|
|
images=images,
|
|
services=services,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/images/import", methods=["GET", "POST"])
|
|
def images_import():
|
|
usb_mounts = find_usb_mounts()
|
|
upload_sources = find_upload_sources()
|
|
images = [image_status(it) for it in IMAGE_TYPES]
|
|
|
|
if request.method == "POST":
|
|
source = request.form.get("source", "")
|
|
target = request.form.get("target", "")
|
|
|
|
if not source or not target:
|
|
flash("Please select both a source and a target image type.", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
if target not in IMAGE_TYPES:
|
|
flash("Invalid target image type.", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
if not allowed_import_source(source):
|
|
flash("Source path is not a valid import location.", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
if not os.path.isdir(source):
|
|
flash(f"Source path does not exist: {source}", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
root = image_root(target)
|
|
dest = deploy_path(target)
|
|
try:
|
|
os.makedirs(dest, exist_ok=True)
|
|
src_items = os.listdir(source)
|
|
|
|
# Move files from network upload to save disk space; copy from USB
|
|
use_move = source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/")
|
|
_transfer = shutil.move if use_move else shutil.copy2
|
|
_transfer_tree = shutil.move if use_move else shutil.copytree
|
|
|
|
# Detect layout: if source has Deploy/, Sources/, Tools/ at top
|
|
# level, it's the full image root structure (USB-style).
|
|
# Otherwise treat it as Deploy/ contents directly.
|
|
top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))}
|
|
full_layout = "Deploy" in top_dirs
|
|
|
|
if full_layout:
|
|
# Determine which root-level dirs are shared for this target
|
|
shared_root = []
|
|
for prefix, dirs in SHARED_ROOT_DIRS.items():
|
|
if target.startswith(prefix):
|
|
shared_root = dirs
|
|
break
|
|
|
|
# Full image root: import Deploy contents + sibling dirs
|
|
for item in src_items:
|
|
src_item = os.path.join(source, item)
|
|
if item == "Deploy":
|
|
_import_deploy(src_item, dest, target, move=use_move)
|
|
elif os.path.isdir(src_item) and item in shared_root:
|
|
# Shared sibling: merge into _shared/{prefix}{item}
|
|
# and symlink from image root
|
|
prefix_key = target.split("-")[0] + "-"
|
|
shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}")
|
|
os.makedirs(shared_dest, exist_ok=True)
|
|
_merge_tree(src_item, shared_dest, move=use_move)
|
|
dst_item = os.path.join(root, item)
|
|
if os.path.islink(dst_item):
|
|
os.remove(dst_item)
|
|
elif os.path.isdir(dst_item):
|
|
shutil.rmtree(dst_item)
|
|
os.symlink(shared_dest, dst_item)
|
|
elif os.path.isdir(src_item):
|
|
# Non-shared sibling dirs (Tools) go into image root
|
|
dst_item = os.path.join(root, item)
|
|
if os.path.exists(dst_item):
|
|
shutil.rmtree(dst_item)
|
|
_transfer_tree(src_item, dst_item)
|
|
else:
|
|
_transfer(src_item, os.path.join(root, item))
|
|
else:
|
|
# Flat layout: treat source as Deploy contents
|
|
_import_deploy(source, dest, target, move=use_move)
|
|
|
|
audit("IMAGE_IMPORT", f"{source} -> {target}")
|
|
flash(
|
|
f"Successfully imported content to {FRIENDLY_NAMES.get(target, target)}.",
|
|
"success",
|
|
)
|
|
except Exception as exc:
|
|
flash(f"Import failed: {exc}", "danger")
|
|
|
|
return redirect(url_for("images_import"))
|
|
|
|
return render_template(
|
|
"import.html",
|
|
usb_mounts=usb_mounts,
|
|
upload_sources=upload_sources,
|
|
images=images,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/images/<image_type>/unattend", methods=["GET", "POST"])
|
|
def unattend_editor(image_type):
|
|
if image_type not in IMAGE_TYPES:
|
|
flash("Unknown image type.", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
xml_file = unattend_path(image_type)
|
|
|
|
if request.method == "POST":
|
|
save_mode = request.form.get("save_mode", "form")
|
|
|
|
if save_mode == "raw":
|
|
raw_xml = request.form.get("raw_xml", "")
|
|
# Validate the XML before saving
|
|
try:
|
|
etree.fromstring(raw_xml.encode("utf-8"))
|
|
except etree.XMLSyntaxError as exc:
|
|
flash(f"Invalid XML: {exc}", "danger")
|
|
data = parse_unattend(xml_file)
|
|
data["raw_xml"] = raw_xml
|
|
return render_template(
|
|
"unattend_editor.html",
|
|
image_type=image_type,
|
|
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
|
|
data=data,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
xml_content = raw_xml
|
|
else:
|
|
form_data = _extract_form_data(request.form)
|
|
xml_content = build_unattend_xml(form_data)
|
|
|
|
# Write to disk
|
|
try:
|
|
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
|
|
with open(xml_file, "w", encoding="utf-8") as fh:
|
|
fh.write(xml_content)
|
|
audit("UNATTEND_SAVE", f"{image_type} ({save_mode})")
|
|
flash("unattend.xml saved successfully.", "success")
|
|
except Exception as exc:
|
|
flash(f"Failed to save: {exc}", "danger")
|
|
|
|
return redirect(url_for("unattend_editor", image_type=image_type))
|
|
|
|
data = parse_unattend(xml_file)
|
|
return render_template(
|
|
"unattend_editor.html",
|
|
image_type=image_type,
|
|
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
|
|
data=data,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes — Clonezilla Backups
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/backups")
|
|
def clonezilla_backups():
|
|
backups = []
|
|
if os.path.isdir(CLONEZILLA_SHARE):
|
|
for f in sorted(os.listdir(CLONEZILLA_SHARE)):
|
|
fpath = os.path.join(CLONEZILLA_SHARE, f)
|
|
if os.path.isfile(fpath) and f.lower().endswith(".zip"):
|
|
stat = os.stat(fpath)
|
|
backups.append({
|
|
"filename": f,
|
|
"machine": os.path.splitext(f)[0],
|
|
"size": stat.st_size,
|
|
"modified": stat.st_mtime,
|
|
})
|
|
return render_template(
|
|
"backups.html",
|
|
backups=backups,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/backups/upload", methods=["POST"])
|
|
def clonezilla_upload():
|
|
if "backup_file" not in request.files:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
f = request.files["backup_file"]
|
|
if not f.filename:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
filename = secure_filename(f.filename)
|
|
if not filename.lower().endswith(".zip"):
|
|
flash("Only .zip files are accepted.", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
os.makedirs(CLONEZILLA_SHARE, exist_ok=True)
|
|
dest = os.path.join(CLONEZILLA_SHARE, filename)
|
|
f.save(dest)
|
|
audit("BACKUP_UPLOAD", filename)
|
|
flash(f"Uploaded {filename} successfully.", "success")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
|
|
@app.route("/backups/download/<filename>")
|
|
def clonezilla_download(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(CLONEZILLA_SHARE, filename)
|
|
if not os.path.isfile(fpath):
|
|
flash(f"Backup not found: {filename}", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
return send_file(fpath, as_attachment=True)
|
|
|
|
|
|
@app.route("/backups/delete/<filename>", methods=["POST"])
|
|
def clonezilla_delete(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(CLONEZILLA_SHARE, filename)
|
|
if os.path.isfile(fpath):
|
|
os.remove(fpath)
|
|
audit("BACKUP_DELETE", filename)
|
|
flash(f"Deleted {filename}.", "success")
|
|
else:
|
|
flash(f"Backup not found: {filename}", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes — Blancco Reports
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/reports")
|
|
def blancco_reports():
|
|
reports = []
|
|
if os.path.isdir(BLANCCO_REPORTS):
|
|
for f in sorted(os.listdir(BLANCCO_REPORTS), reverse=True):
|
|
fpath = os.path.join(BLANCCO_REPORTS, f)
|
|
if os.path.isfile(fpath):
|
|
stat = os.stat(fpath)
|
|
ext = os.path.splitext(f)[1].lower()
|
|
reports.append({
|
|
"filename": f,
|
|
"size": stat.st_size,
|
|
"modified": stat.st_mtime,
|
|
"type": ext.lstrip(".").upper() or "FILE",
|
|
})
|
|
return render_template(
|
|
"reports.html",
|
|
reports=reports,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/reports/download/<filename>")
|
|
def blancco_download_report(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(BLANCCO_REPORTS, filename)
|
|
if not os.path.isfile(fpath):
|
|
flash(f"Report not found: {filename}", "danger")
|
|
return redirect(url_for("blancco_reports"))
|
|
return send_file(fpath, as_attachment=True)
|
|
|
|
|
|
@app.route("/reports/delete/<filename>", methods=["POST"])
|
|
def blancco_delete_report(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(BLANCCO_REPORTS, filename)
|
|
if os.path.isfile(fpath):
|
|
os.remove(fpath)
|
|
audit("REPORT_DELETE", filename)
|
|
flash(f"Deleted {filename}.", "success")
|
|
else:
|
|
flash(f"Report not found: {filename}", "danger")
|
|
return redirect(url_for("blancco_reports"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes — startnet.cmd Editor (WIM)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _wim_extract_startnet(wim_path):
|
|
"""Extract startnet.cmd from a WIM file using wimextract."""
|
|
tmpdir = tempfile.mkdtemp()
|
|
try:
|
|
result = subprocess.run(
|
|
["wimextract", wim_path, "1",
|
|
"/Windows/System32/startnet.cmd",
|
|
"--dest-dir", tmpdir],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
startnet_path = os.path.join(tmpdir, "startnet.cmd")
|
|
if result.returncode == 0 and os.path.isfile(startnet_path):
|
|
with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh:
|
|
return fh.read()
|
|
return None
|
|
except Exception:
|
|
return None
|
|
finally:
|
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
|
|
|
|
|
def _wim_update_startnet(wim_path, content):
|
|
"""Update startnet.cmd inside a WIM file using wimupdate."""
|
|
tmpdir = tempfile.mkdtemp()
|
|
try:
|
|
startnet_path = os.path.join(tmpdir, "startnet.cmd")
|
|
with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh:
|
|
fh.write(content)
|
|
# wimupdate reads commands from stdin
|
|
update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n"
|
|
result = subprocess.run(
|
|
["wimupdate", wim_path, "1"],
|
|
input=update_cmd,
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
if result.returncode != 0:
|
|
return False, result.stderr.strip()
|
|
return True, ""
|
|
except Exception as exc:
|
|
return False, str(exc)
|
|
finally:
|
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
|
|
|
|
|
def _wim_list_files(wim_path, path="/"):
|
|
"""List files inside a WIM at the given path."""
|
|
try:
|
|
result = subprocess.run(
|
|
["wimdir", wim_path, "1", path],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
if result.returncode == 0:
|
|
return [l.strip() for l in result.stdout.splitlines() if l.strip()]
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
@app.route("/startnet")
|
|
def startnet_editor():
|
|
wim_exists = os.path.isfile(BOOT_WIM)
|
|
content = ""
|
|
wim_info = {}
|
|
|
|
if wim_exists:
|
|
content = _wim_extract_startnet(BOOT_WIM) or ""
|
|
# Get WIM info
|
|
try:
|
|
result = subprocess.run(
|
|
["wiminfo", BOOT_WIM],
|
|
capture_output=True, text=True, timeout=15,
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.splitlines():
|
|
if ":" in line:
|
|
key, _, val = line.partition(":")
|
|
wim_info[key.strip()] = val.strip()
|
|
except Exception:
|
|
pass
|
|
|
|
return render_template(
|
|
"startnet_editor.html",
|
|
wim_exists=wim_exists,
|
|
wim_path=BOOT_WIM,
|
|
content=content,
|
|
wim_info=wim_info,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/startnet/save", methods=["POST"])
|
|
def startnet_save():
|
|
if not os.path.isfile(BOOT_WIM):
|
|
flash("boot.wim not found.", "danger")
|
|
return redirect(url_for("startnet_editor"))
|
|
|
|
content = request.form.get("content", "")
|
|
ok, err = _wim_update_startnet(BOOT_WIM, content)
|
|
if ok:
|
|
audit("STARTNET_SAVE", "boot.wim updated")
|
|
flash("startnet.cmd updated successfully in boot.wim.", "success")
|
|
else:
|
|
flash(f"Failed to update boot.wim: {err}", "danger")
|
|
return redirect(url_for("startnet_editor"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes — Audit Log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/audit")
|
|
def audit_log():
|
|
entries = []
|
|
if os.path.isfile(AUDIT_LOG):
|
|
with open(AUDIT_LOG, "r") as fh:
|
|
for line in fh:
|
|
entries.append(line.strip())
|
|
entries.reverse() # newest first
|
|
return render_template(
|
|
"audit.html",
|
|
entries=entries,
|
|
image_types=IMAGE_TYPES,
|
|
friendly_names=FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes — API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/api/services")
|
|
def api_services():
|
|
services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
|
|
return jsonify(services)
|
|
|
|
|
|
@app.route("/api/images")
|
|
def api_images():
|
|
images = [image_status(it) for it in IMAGE_TYPES]
|
|
return jsonify(images)
|
|
|
|
|
|
@app.route("/api/images/<image_type>/unattend", methods=["POST"])
|
|
def api_save_unattend(image_type):
|
|
if image_type not in IMAGE_TYPES:
|
|
return jsonify({"error": "Unknown image type"}), 404
|
|
|
|
xml_file = unattend_path(image_type)
|
|
payload = request.get_json(silent=True)
|
|
|
|
if not payload:
|
|
return jsonify({"error": "No JSON body provided"}), 400
|
|
|
|
if "raw_xml" in payload:
|
|
raw_xml = payload["raw_xml"]
|
|
try:
|
|
etree.fromstring(raw_xml.encode("utf-8"))
|
|
except etree.XMLSyntaxError as exc:
|
|
return jsonify({"error": f"Invalid XML: {exc}"}), 400
|
|
xml_content = raw_xml
|
|
else:
|
|
try:
|
|
xml_content = build_unattend_xml(payload)
|
|
except Exception as exc:
|
|
return jsonify({"error": f"Failed to build XML: {exc}"}), 400
|
|
|
|
try:
|
|
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
|
|
with open(xml_file, "w", encoding="utf-8") as fh:
|
|
fh.write(xml_content)
|
|
except Exception as exc:
|
|
return jsonify({"error": f"Failed to write file: {exc}"}), 500
|
|
|
|
audit("UNATTEND_SAVE_API", image_type)
|
|
return jsonify({"status": "ok", "path": xml_file})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Template filters
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.template_filter("timestamp_fmt")
|
|
def timestamp_fmt(ts):
|
|
"""Format a Unix timestamp to a human-readable date string."""
|
|
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Context processor — make data available to all templates
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.context_processor
|
|
def inject_globals():
|
|
return {
|
|
"all_image_types": IMAGE_TYPES,
|
|
"all_friendly_names": FRIENDLY_NAMES,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="127.0.0.1", port=9010, debug=False, threaded=True)
|