Files
pxe-server/webapp/app.py
cproudlock 974accf98a blancco: fix silent prefs fallback, suspend trap, display blank + add View
End-to-end fixes for Blancco Drive Eraser PXE flow uncovered by chasing
"reports never reach SMB share" across two air-gapped sites:

playbook/blancco-init.sh:
  * Drop silent || true on wget of preferences.xml + config.xml. Fail
    loud with shell-drop if download or marker grep fails. Background:
    airootfs /opt/scripts/validate_preferences.sh restores
    /albus/preferences.save (factory defaults, empty network_share) if
    xmllint fails. wget failure made every report silently land nowhere.
  * Clobber /albus/preferences.save with the same served file so even if
    the validator fallback fires, the SMB target survives.
  * Bind-mount /dev/null over /sys/power/{state,disk,mem_sleep,autosleep}
    before switch_root. Albus's license-retry path writes /sys/power/state
    directly (bypassing systemd targets); this is the last-line block.
  * /dev/null symlinks for sleep/suspend/hibernate systemd targets in the
    airootfs overlay + logind drop-in with IdleAction/Handle*=ignore.
    Three independent layers because cmdline systemd.mask alone is bypassed
    by direct /sys/power/state writes.
  * xinitrc.d/00-no-screen-blank.sh runs xset s off -dpms + setterm
    -blank 0 -powerdown 0 so the Blancco GUI doesn't blank during long
    erasures.
  * Removed the 20-failsafeDriver.conf "modesetting" pin. modesetting
    needs DRM/KMS which we disable on kernel cmdline; "vesa" also failed
    on NVIDIA. With the pin gone Xorg auto-picks fbdev which uses the
    kernel framebuffer from vga=normal - works across Intel, AMD, and
    older NVIDIA without nouveau.

playbook/pxe_server_setup.yml:
  * dnsmasq.conf: explicit empty-value dhcp-option=3 + dhcp-option=6.
    Without them, dnsmasq defaults to sending its own IP as router AND
    DNS. Commenting the configured-value lines did NOT disable the push
    (root cause of "wired keeps picking up 10.9.100.1 as gateway").
  * Split the Blancco config.img extraction and preferences.xml deploy
    into separate tasks. The previous shell-with-creates: gate caused
    playbook re-runs to skip the prefs deploy entirely after first run.
  * Added a validation task that runs python3 xml parse + grep on the
    deployed preferences.xml to fail the playbook at deploy time if the
    SMB markers are missing.
  * Added Environment=TZ=America/New_York to the pxe-webapp systemd
    service so report mtimes and audit log render in Eastern time even
    if the Python process is started before timedatectl converges.

webapp:
  * services/blancco_report.py: parse Blancco's XML report format
    (recursive <entries name="..."> walker) into a friendly dict.
  * templates/report_view.html: Bootstrap "Drive Erasure Certificate"
    layout - hero summary, customer + system cards, per-drive cards with
    step-by-step erasure timeline, document signing footer with
    integrity hash detail.
  * /reports/view/<filename> route + View button on the reports list
    (XML reports only; PDFs still download).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 07:38:54 -04:00

644 lines
23 KiB
Python

#!/usr/bin/env python3
"""Flask web application for managing a GE Aerospace PXE server.
This file is the route surface; most logic lives in ``services/``:
services.audit - audit log writer
services.csrf - session CSRF token + before_request validator
services.fs - path helpers + JSON load/save
services.system - systemd service status + USB mounts
services.images - image_status + load_image_config
services.deploy - import_deploy + merge_tree + symlink dance
services.unattend - parse + build + form-extract for unattend.xml
services.wim - boot.wim startnet.cmd extract/update via wimtools
"""
import json
import os
import shutil
from datetime import datetime
from pathlib import Path
from flask import (
Flask,
abort,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
url_for,
)
from lxml import etree
from werkzeug.utils import secure_filename
import config
from services import blancco_report, deploy, fs, images, system, unattend, wim
from services.audit import audit
from services.csrf import init_csrf
app = Flask(__name__)
app.secret_key = config.FLASK_SECRET_KEY
app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH
init_csrf(app)
# ---------------------------------------------------------------------------
# Routes - pages
# ---------------------------------------------------------------------------
@app.route("/")
def dashboard():
image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
services = [system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
return render_template(
"dashboard.html",
images=image_list,
services=services,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
@app.route("/images/import", methods=["GET", "POST"])
def images_import():
usb_mounts = system.find_usb_mounts()
upload_sources = system.find_upload_sources()
image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
if request.method == "POST":
source = request.form.get("source", "")
target = request.form.get("target", "")
if not source or not target:
flash("Please select both a source and a target image type.", "danger")
return redirect(url_for("images_import"))
if target not in config.IMAGE_TYPES:
flash("Invalid target image type.", "danger")
return redirect(url_for("images_import"))
if not deploy.allowed_import_source(source):
flash("Source path is not a valid import location.", "danger")
return redirect(url_for("images_import"))
if not os.path.isdir(source):
flash(f"Source path does not exist: {source}", "danger")
return redirect(url_for("images_import"))
root = fs.image_root(target)
dest = fs.deploy_path(target)
try:
os.makedirs(dest, exist_ok=True)
src_items = os.listdir(source)
# Move files from network upload to save disk space; copy from USB.
use_move = source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/")
_transfer = shutil.move if use_move else shutil.copy2
_transfer_tree = shutil.move if use_move else shutil.copytree
top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))}
full_layout = "Deploy" in top_dirs
if full_layout:
shared_root = []
for prefix, dirs in config.SHARED_ROOT_DIRS.items():
if target.startswith(prefix):
shared_root = dirs
break
for item in src_items:
src_item = os.path.join(source, item)
if item == "Deploy":
deploy.import_deploy(src_item, dest, target, move=use_move)
elif os.path.isdir(src_item) and item in shared_root:
prefix_key = target.split("-")[0] + "-"
shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}")
os.makedirs(shared_dest, exist_ok=True)
deploy._merge_tree(src_item, shared_dest, move=use_move)
dst_item = os.path.join(root, item)
if os.path.islink(dst_item):
os.remove(dst_item)
elif os.path.isdir(dst_item):
shutil.rmtree(dst_item)
os.symlink(shared_dest, dst_item)
elif os.path.isdir(src_item):
dst_item = os.path.join(root, item)
if os.path.exists(dst_item):
shutil.rmtree(dst_item)
_transfer_tree(src_item, dst_item)
else:
_transfer(src_item, os.path.join(root, item))
else:
deploy.import_deploy(source, dest, target, move=use_move)
# Ensure Media.tag exists (FlatSetupLoader.exe drive detection).
control_dir = os.path.join(dest, "Control")
os.makedirs(control_dir, exist_ok=True)
media_tag = os.path.join(control_dir, "Media.tag")
Path(media_tag).touch()
audit("IMAGE_IMPORT", f"{source} -> {target}")
flash(
f"Successfully imported content to {config.FRIENDLY_NAMES.get(target, target)}.",
"success",
)
except Exception as exc:
flash(f"Import failed: {exc}", "danger")
return redirect(url_for("images_import"))
return render_template(
"import.html",
usb_mounts=usb_mounts,
upload_sources=upload_sources,
images=image_list,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
@app.route("/images/<image_type>/unattend", methods=["GET", "POST"])
def unattend_editor(image_type):
if image_type not in config.IMAGE_TYPES:
flash("Unknown image type.", "danger")
return redirect(url_for("dashboard"))
xml_file = fs.unattend_path(image_type)
if request.method == "POST":
save_mode = request.form.get("save_mode", "form")
if save_mode == "raw":
raw_xml = request.form.get("raw_xml", "")
try:
etree.fromstring(raw_xml.encode("utf-8"))
except etree.XMLSyntaxError as exc:
flash(f"Invalid XML: {exc}", "danger")
data = unattend.parse_unattend(xml_file)
data["raw_xml"] = raw_xml
return render_template(
"unattend_editor.html",
image_type=image_type,
friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
data=data,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
xml_content = raw_xml
else:
form_data = unattend.extract_form_data(request.form)
xml_content = unattend.build_unattend_xml(form_data)
try:
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
with open(xml_file, "w", encoding="utf-8") as fh:
fh.write(xml_content)
audit("UNATTEND_SAVE", f"{image_type} ({save_mode})")
flash("unattend.xml saved successfully.", "success")
except Exception as exc:
flash(f"Failed to save: {exc}", "danger")
return redirect(url_for("unattend_editor", image_type=image_type))
data = unattend.parse_unattend(xml_file)
return render_template(
"unattend_editor.html",
image_type=image_type,
friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
data=data,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
@app.route("/images/<image_type>/config")
def image_config(image_type):
if image_type not in config.IMAGE_TYPES:
flash("Unknown image type.", "danger")
return redirect(url_for("dashboard"))
cfg = images.load_image_config(image_type)
return render_template(
"image_config.html",
image_type=image_type,
friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
config=cfg,
)
@app.route("/images/<image_type>/config/save", methods=["POST"])
def image_config_save(image_type):
if image_type not in config.IMAGE_TYPES:
flash("Unknown image type.", "danger")
return redirect(url_for("dashboard"))
section = request.form.get("section", "")
payload = request.form.get("payload", "[]")
try:
data = json.loads(payload)
except json.JSONDecodeError:
flash("Invalid JSON payload.", "danger")
return redirect(url_for("image_config", image_type=image_type))
ctrl = fs.control_path(image_type)
tools = fs.tools_path(image_type)
try:
if section == "hardware_models":
us_file = os.path.join(tools, "user_selections.json")
us_raw = fs.load_json(us_file)
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
us_data["HardwareModelSelection"] = data
fs.save_json(us_file, [us_data])
audit("CONFIG_SAVE", f"{image_type}/hardware_models")
elif section == "drivers":
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
fs.save_json(os.path.join(ctrl, "HardwareDriver.json"), clean)
audit("CONFIG_SAVE", f"{image_type}/drivers")
elif section == "operating_systems":
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
fs.save_json(os.path.join(ctrl, "OperatingSystem.json"), clean)
audit("CONFIG_SAVE", f"{image_type}/operating_systems")
elif section == "packages":
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
fs.save_json(os.path.join(ctrl, "packages.json"), clean)
audit("CONFIG_SAVE", f"{image_type}/packages")
else:
flash(f"Unknown section: {section}", "danger")
return redirect(url_for("image_config", image_type=image_type))
flash(f"Saved {section.replace('_', ' ')} successfully.", "success")
except Exception as exc:
flash(f"Failed to save {section}: {exc}", "danger")
return redirect(url_for("image_config", image_type=image_type))
# ---------------------------------------------------------------------------
# Routes - Clonezilla Backups
# ---------------------------------------------------------------------------
@app.route("/backups")
def clonezilla_backups():
backups = []
if os.path.isdir(config.CLONEZILLA_SHARE):
for f in sorted(os.listdir(config.CLONEZILLA_SHARE)):
fpath = os.path.join(config.CLONEZILLA_SHARE, f)
if os.path.isfile(fpath) and f.lower().endswith(".zip"):
stat = os.stat(fpath)
backups.append({
"filename": f,
"machine": os.path.splitext(f)[0],
"size": stat.st_size,
"modified": stat.st_mtime,
})
return render_template(
"backups.html",
backups=backups,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
@app.route("/backups/upload", methods=["POST"])
def clonezilla_upload():
if "backup_file" not in request.files:
flash("No file selected.", "danger")
return redirect(url_for("clonezilla_backups"))
f = request.files["backup_file"]
if not f.filename:
flash("No file selected.", "danger")
return redirect(url_for("clonezilla_backups"))
filename = secure_filename(f.filename)
if not filename.lower().endswith(".zip"):
flash("Only .zip files are accepted.", "danger")
return redirect(url_for("clonezilla_backups"))
os.makedirs(config.CLONEZILLA_SHARE, exist_ok=True)
dest = os.path.join(config.CLONEZILLA_SHARE, filename)
f.save(dest)
audit("BACKUP_UPLOAD", filename)
flash(f"Uploaded {filename} successfully.", "success")
return redirect(url_for("clonezilla_backups"))
@app.route("/backups/download/<filename>")
def clonezilla_download(filename):
filename = secure_filename(filename)
fpath = os.path.join(config.CLONEZILLA_SHARE, filename)
if not os.path.isfile(fpath):
flash(f"Backup not found: {filename}", "danger")
return redirect(url_for("clonezilla_backups"))
return send_file(fpath, as_attachment=True)
@app.route("/backups/delete/<filename>", methods=["POST"])
def clonezilla_delete(filename):
filename = secure_filename(filename)
fpath = os.path.join(config.CLONEZILLA_SHARE, filename)
if os.path.isfile(fpath):
os.remove(fpath)
audit("BACKUP_DELETE", filename)
flash(f"Deleted {filename}.", "success")
else:
flash(f"Backup not found: {filename}", "danger")
return redirect(url_for("clonezilla_backups"))
# ---------------------------------------------------------------------------
# Routes - Blancco Reports
# ---------------------------------------------------------------------------
@app.route("/reports")
def blancco_reports():
reports = []
if os.path.isdir(config.BLANCCO_REPORTS):
for f in sorted(os.listdir(config.BLANCCO_REPORTS), reverse=True):
fpath = os.path.join(config.BLANCCO_REPORTS, f)
if os.path.isfile(fpath):
stat = os.stat(fpath)
ext = os.path.splitext(f)[1].lower()
reports.append({
"filename": f,
"size": stat.st_size,
"modified": stat.st_mtime,
"type": ext.lstrip(".").upper() or "FILE",
})
return render_template(
"reports.html",
reports=reports,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
@app.route("/reports/download/<filename>")
def blancco_download_report(filename):
filename = secure_filename(filename)
fpath = os.path.join(config.BLANCCO_REPORTS, filename)
if not os.path.isfile(fpath):
flash(f"Report not found: {filename}", "danger")
return redirect(url_for("blancco_reports"))
return send_file(fpath, as_attachment=True)
@app.route("/reports/view/<filename>")
def blancco_view_report(filename):
filename = secure_filename(filename)
fpath = os.path.join(config.BLANCCO_REPORTS, filename)
if not os.path.isfile(fpath):
flash(f"Report not found: {filename}", "danger")
return redirect(url_for("blancco_reports"))
if not filename.lower().endswith(".xml"):
flash("Formatted view supports XML reports only.", "warning")
return redirect(url_for("blancco_reports"))
try:
data = blancco_report.parse(fpath)
except Exception as ex:
flash(f"Failed to parse {filename}: {ex}", "danger")
return redirect(url_for("blancco_reports"))
return render_template("report_view.html", filename=filename, data=data)
@app.route("/reports/delete/<filename>", methods=["POST"])
def blancco_delete_report(filename):
filename = secure_filename(filename)
fpath = os.path.join(config.BLANCCO_REPORTS, filename)
if os.path.isfile(fpath):
os.remove(fpath)
audit("REPORT_DELETE", filename)
flash(f"Deleted {filename}.", "success")
else:
flash(f"Report not found: {filename}", "danger")
return redirect(url_for("blancco_reports"))
# ---------------------------------------------------------------------------
# Routes - Enrollment Packages
# ---------------------------------------------------------------------------
@app.route("/enrollment")
def enrollment():
packages = []
if os.path.isdir(config.ENROLLMENT_PPKG_DIR):
for f in sorted(os.listdir(config.ENROLLMENT_PPKG_DIR)):
fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, f)
if os.path.isfile(fpath) and f.lower().endswith(".ppkg"):
stat = os.stat(fpath)
packages.append({
"filename": f,
"size": stat.st_size,
"modified": stat.st_mtime,
})
return render_template(
"enrollment.html",
packages=packages,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
@app.route("/enrollment/upload", methods=["POST"])
def enrollment_upload():
if "ppkg_file" not in request.files:
flash("No file selected.", "danger")
return redirect(url_for("enrollment"))
f = request.files["ppkg_file"]
if not f.filename:
flash("No file selected.", "danger")
return redirect(url_for("enrollment"))
filename = secure_filename(f.filename)
if not filename.lower().endswith(".ppkg"):
flash("Only .ppkg files are accepted.", "danger")
return redirect(url_for("enrollment"))
os.makedirs(config.ENROLLMENT_PPKG_DIR, exist_ok=True)
dest = os.path.join(config.ENROLLMENT_PPKG_DIR, filename)
f.save(dest)
audit("ENROLLMENT_UPLOAD", filename)
flash(f"Uploaded {filename} successfully.", "success")
return redirect(url_for("enrollment"))
@app.route("/enrollment/download/<filename>")
def enrollment_download(filename):
filename = secure_filename(filename)
fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, filename)
if not os.path.isfile(fpath):
flash(f"Package not found: {filename}", "danger")
return redirect(url_for("enrollment"))
return send_file(fpath, as_attachment=True)
@app.route("/enrollment/delete/<filename>", methods=["POST"])
def enrollment_delete(filename):
filename = secure_filename(filename)
fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, filename)
if os.path.isfile(fpath):
os.remove(fpath)
audit("ENROLLMENT_DELETE", filename)
flash(f"Deleted {filename}.", "success")
else:
flash(f"Package not found: {filename}", "danger")
return redirect(url_for("enrollment"))
# ---------------------------------------------------------------------------
# Routes - startnet.cmd Editor (boot.wim)
# ---------------------------------------------------------------------------
@app.route("/startnet")
def startnet_editor():
import subprocess
wim_exists = os.path.isfile(config.BOOT_WIM)
content = ""
wim_info = {}
if wim_exists:
content = wim.extract_startnet(config.BOOT_WIM) or ""
try:
result = subprocess.run(
["wiminfo", config.BOOT_WIM],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
for line in result.stdout.splitlines():
if ":" in line:
key, _, val = line.partition(":")
wim_info[key.strip()] = val.strip()
except Exception:
pass
return render_template(
"startnet_editor.html",
wim_exists=wim_exists,
wim_path=config.BOOT_WIM,
content=content,
wim_info=wim_info,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
@app.route("/startnet/save", methods=["POST"])
def startnet_save():
if not os.path.isfile(config.BOOT_WIM):
flash("boot.wim not found.", "danger")
return redirect(url_for("startnet_editor"))
content = request.form.get("content", "")
ok, err = wim.update_startnet(config.BOOT_WIM, content)
if ok:
audit("STARTNET_SAVE", "boot.wim updated")
flash("startnet.cmd updated successfully in boot.wim.", "success")
else:
flash(f"Failed to update boot.wim: {err}", "danger")
return redirect(url_for("startnet_editor"))
# ---------------------------------------------------------------------------
# Routes - Audit Log
# ---------------------------------------------------------------------------
@app.route("/audit")
def audit_log():
entries = []
if os.path.isfile(config.AUDIT_LOG):
with open(config.AUDIT_LOG, "r") as fh:
for line in fh:
entries.append(line.strip())
entries.reverse()
return render_template(
"audit.html",
entries=entries,
image_types=config.IMAGE_TYPES,
friendly_names=config.FRIENDLY_NAMES,
)
# ---------------------------------------------------------------------------
# Routes - JSON API
# ---------------------------------------------------------------------------
@app.route("/api/services")
def api_services():
services = {s: system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
return jsonify(services)
@app.route("/api/images")
def api_images():
image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
return jsonify(image_list)
@app.route("/api/images/<image_type>/unattend", methods=["POST"])
def api_save_unattend(image_type):
if image_type not in config.IMAGE_TYPES:
return jsonify({"error": "Unknown image type"}), 404
xml_file = fs.unattend_path(image_type)
payload = request.get_json(silent=True)
if not payload:
return jsonify({"error": "No JSON body provided"}), 400
if "raw_xml" in payload:
raw_xml = payload["raw_xml"]
try:
etree.fromstring(raw_xml.encode("utf-8"))
except etree.XMLSyntaxError as exc:
return jsonify({"error": f"Invalid XML: {exc}"}), 400
xml_content = raw_xml
else:
try:
xml_content = unattend.build_unattend_xml(payload)
except Exception as exc:
return jsonify({"error": f"Failed to build XML: {exc}"}), 400
try:
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
with open(xml_file, "w", encoding="utf-8") as fh:
fh.write(xml_content)
except Exception as exc:
return jsonify({"error": f"Failed to write file: {exc}"}), 500
audit("UNATTEND_SAVE_API", image_type)
return jsonify({"status": "ok", "path": xml_file})
# ---------------------------------------------------------------------------
# Template helpers
# ---------------------------------------------------------------------------
@app.template_filter("timestamp_fmt")
def timestamp_fmt(ts):
"""Format a Unix timestamp to a human-readable date string."""
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
@app.context_processor
def inject_globals():
return {
"all_image_types": config.IMAGE_TYPES,
"all_friendly_names": config.FRIENDLY_NAMES,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
app.run(host="127.0.0.1", port=9010, debug=False, threaded=True)