Imaging dashboard
- services/imaging_log_tail.py: parses dnsmasq leases, Apache access log,
Samba per-host log files, and dnsmasq syslog (DHCP/TFTP). Synthesizes
inferred sessions keyed by MAC for bays that have only touched the boot
chain but not yet pushed to /imaging/status. Active window 90 min.
- imaging_status.list_sessions() merges inferred sessions into the dashboard
list. Real client-pushed sessions win for the same MAC.
- imaging_status: stage_history field tracks every stage transition (capped
30); sidecar .log file per serial records every log_lines push uncapped
(read_full_log() caps detail-page response to 1 MB).
- delete_session/delete_all_sessions clean up sidecar .log too.
- New SSE endpoint /imaging/stream emits a session-list hash every 5s.
Client fetches /imaging/tiles (HTML partial) on hash change and swaps
#imaging-tiles innerHTML. Polling fallback at 15s if SSE drops.
- Tile-swap preserves scroll, filter input, expanded state via localStorage,
and any LAPS input the operator is mid-pasting (swap skipped when a
laps-input is focused).
- imaging.html: removed 15s location.reload(). Added live-status dot in
header (gray idle / green SSE connected / red SSE lost).
- _imaging_tiles.html: shared partial used by both /imaging full render and
/imaging/tiles SSE refresh. Inferred bays render with yellow border +
log-inferred badge + no progress bar (stage_index inference is coarse).
- imaging_detail.html (new): per-bay forensics page at /imaging/session/
<serial>. Session metadata grid, stage timeline table, full sidecar log
with truncation indicator, Copy-support-summary button. Linked from each
client-pushed tile.
- qr-render.js exposes window.renderAllQRs() so the SSE swap can re-render
Intune device-ID QRs in the swapped-in tiles.
Image management
- services/image_registry.py: JSON registry of image types at
{SAMBA_SHARE}/image-registry.json. Bootstraps from baked-in
config.IMAGE_TYPES on first run. create/clone/delete/rename_friendly
mutate the file then call reload() which rewrites config.IMAGE_TYPES +
config.FRIENDLY_NAMES in place. Sidebar reflects on next request.
- app.py routes: /images/new, /images/<t>/clone, /images/<t>/delete (with
optional content-wipe checkbox), /images/<t>/rename.
- dashboard.html: + New image type button + Clone/Delete per row, all in
Bootstrap modals with confirmation copy.
- Clone copies Deploy/ tree but preserves symlinks to shared dirs (Out-of-
box Drivers, Operating Systems, Packages) so disk usage stays low.
- Delete with content checked unlinks symlinks (does not follow into shared
dirs).
Driver / package upload + orphan adoption
- services/images.py: upload_driver, adopt_orphan, remove_orphans,
upload_package. Filename sanitization blocks path traversal.
- app.py routes: /images/<t>/drivers/upload, /images/<t>/drivers/adopt,
/images/<t>/drivers/orphans/delete, /images/<t>/packages/upload.
- image_config.html: Upload .zip button + modal on Drivers section. Orphan
drivers card-footer rebuilt as interactive list with per-row Adopt inline
form (family + destinationDir inputs) and bulk select+delete.
- Upload .zip on Packages section with optional destinationDir field that
appends a packages.json entry.
Configuration
- config.py: new env vars DNSMASQ_LEASES, APACHE_ACCESS_LOG, SAMBA_LOG_DIR,
DNSMASQ_SYSLOG for the log-tailer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1037 lines
39 KiB
Python
1037 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
"""Flask web application for managing a GE Aerospace PXE server.
|
|
|
|
This file is the route surface; most logic lives in ``services/``:
|
|
services.audit - audit log writer
|
|
services.csrf - session CSRF token + before_request validator
|
|
services.fs - path helpers + JSON load/save
|
|
services.system - systemd service status + USB mounts
|
|
services.images - image_status + load_image_config
|
|
services.deploy - import_deploy + merge_tree + symlink dance
|
|
services.unattend - parse + build + form-extract for unattend.xml
|
|
services.wim - boot.wim startnet.cmd extract/update via wimtools
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from flask import (
|
|
Flask,
|
|
Response,
|
|
abort,
|
|
flash,
|
|
jsonify,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_file,
|
|
stream_with_context,
|
|
url_for,
|
|
)
|
|
from lxml import etree
|
|
from werkzeug.utils import secure_filename
|
|
|
|
import config
|
|
from services import (
|
|
blancco_report,
|
|
deploy,
|
|
fs,
|
|
image_registry,
|
|
images,
|
|
imaging_status,
|
|
system,
|
|
unattend,
|
|
wim,
|
|
)
|
|
from services.audit import audit
|
|
from services.csrf import init_csrf
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = config.FLASK_SECRET_KEY
|
|
app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH
|
|
|
|
init_csrf(app)
|
|
|
|
# Pull IMAGE_TYPES/FRIENDLY_NAMES from the registry file (created from the
|
|
# baked-in config.py defaults on first run). Mutates config.* in place.
|
|
image_registry.reload()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - pages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/")
|
|
def dashboard():
|
|
image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
|
|
services = [system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")]
|
|
return render_template(
|
|
"dashboard.html",
|
|
images=image_list,
|
|
services=services,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/images/import", methods=["GET", "POST"])
|
|
def images_import():
|
|
usb_mounts = system.find_usb_mounts()
|
|
upload_sources = system.find_upload_sources()
|
|
image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
|
|
|
|
if request.method == "POST":
|
|
source = request.form.get("source", "")
|
|
target = request.form.get("target", "")
|
|
|
|
if not source or not target:
|
|
flash("Please select both a source and a target image type.", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
if target not in config.IMAGE_TYPES:
|
|
flash("Invalid target image type.", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
if not deploy.allowed_import_source(source):
|
|
flash("Source path is not a valid import location.", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
if not os.path.isdir(source):
|
|
flash(f"Source path does not exist: {source}", "danger")
|
|
return redirect(url_for("images_import"))
|
|
|
|
root = fs.image_root(target)
|
|
dest = fs.deploy_path(target)
|
|
try:
|
|
os.makedirs(dest, exist_ok=True)
|
|
src_items = os.listdir(source)
|
|
|
|
# Move files from network upload to save disk space; copy from USB.
|
|
use_move = source == config.UPLOAD_DIR or source.startswith(config.UPLOAD_DIR + "/")
|
|
_transfer = shutil.move if use_move else shutil.copy2
|
|
_transfer_tree = shutil.move if use_move else shutil.copytree
|
|
|
|
top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))}
|
|
full_layout = "Deploy" in top_dirs
|
|
|
|
if full_layout:
|
|
shared_root = []
|
|
for prefix, dirs in config.SHARED_ROOT_DIRS.items():
|
|
if target.startswith(prefix):
|
|
shared_root = dirs
|
|
break
|
|
|
|
for item in src_items:
|
|
src_item = os.path.join(source, item)
|
|
if item == "Deploy":
|
|
deploy.import_deploy(src_item, dest, target, move=use_move)
|
|
elif os.path.isdir(src_item) and item in shared_root:
|
|
prefix_key = target.split("-")[0] + "-"
|
|
shared_dest = os.path.join(config.SHARED_DIR, f"{prefix_key}{item}")
|
|
os.makedirs(shared_dest, exist_ok=True)
|
|
deploy._merge_tree(src_item, shared_dest, move=use_move)
|
|
dst_item = os.path.join(root, item)
|
|
if os.path.islink(dst_item):
|
|
os.remove(dst_item)
|
|
elif os.path.isdir(dst_item):
|
|
shutil.rmtree(dst_item)
|
|
os.symlink(shared_dest, dst_item)
|
|
elif os.path.isdir(src_item):
|
|
dst_item = os.path.join(root, item)
|
|
if os.path.exists(dst_item):
|
|
shutil.rmtree(dst_item)
|
|
_transfer_tree(src_item, dst_item)
|
|
else:
|
|
_transfer(src_item, os.path.join(root, item))
|
|
else:
|
|
deploy.import_deploy(source, dest, target, move=use_move)
|
|
|
|
# Ensure Media.tag exists (FlatSetupLoader.exe drive detection).
|
|
control_dir = os.path.join(dest, "Control")
|
|
os.makedirs(control_dir, exist_ok=True)
|
|
media_tag = os.path.join(control_dir, "Media.tag")
|
|
Path(media_tag).touch()
|
|
|
|
audit("IMAGE_IMPORT", f"{source} -> {target}")
|
|
flash(
|
|
f"Successfully imported content to {config.FRIENDLY_NAMES.get(target, target)}.",
|
|
"success",
|
|
)
|
|
except Exception as exc:
|
|
flash(f"Import failed: {exc}", "danger")
|
|
|
|
return redirect(url_for("images_import"))
|
|
|
|
return render_template(
|
|
"import.html",
|
|
usb_mounts=usb_mounts,
|
|
upload_sources=upload_sources,
|
|
images=image_list,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/images/<image_type>/unattend", methods=["GET", "POST"])
|
|
def unattend_editor(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash("Unknown image type.", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
xml_file = fs.unattend_path(image_type)
|
|
|
|
if request.method == "POST":
|
|
save_mode = request.form.get("save_mode", "form")
|
|
|
|
if save_mode == "raw":
|
|
raw_xml = request.form.get("raw_xml", "")
|
|
try:
|
|
etree.fromstring(raw_xml.encode("utf-8"))
|
|
except etree.XMLSyntaxError as exc:
|
|
flash(f"Invalid XML: {exc}", "danger")
|
|
data = unattend.parse_unattend(xml_file)
|
|
data["raw_xml"] = raw_xml
|
|
return render_template(
|
|
"unattend_editor.html",
|
|
image_type=image_type,
|
|
friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
|
|
data=data,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
xml_content = raw_xml
|
|
else:
|
|
form_data = unattend.extract_form_data(request.form)
|
|
xml_content = unattend.build_unattend_xml(form_data)
|
|
|
|
try:
|
|
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
|
|
with open(xml_file, "w", encoding="utf-8") as fh:
|
|
fh.write(xml_content)
|
|
audit("UNATTEND_SAVE", f"{image_type} ({save_mode})")
|
|
flash("unattend.xml saved successfully.", "success")
|
|
except Exception as exc:
|
|
flash(f"Failed to save: {exc}", "danger")
|
|
|
|
return redirect(url_for("unattend_editor", image_type=image_type))
|
|
|
|
data = unattend.parse_unattend(xml_file)
|
|
return render_template(
|
|
"unattend_editor.html",
|
|
image_type=image_type,
|
|
friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
|
|
data=data,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/images/<image_type>/config")
|
|
def image_config(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash("Unknown image type.", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
cfg = images.load_image_config(image_type)
|
|
return render_template(
|
|
"image_config.html",
|
|
image_type=image_type,
|
|
friendly_name=config.FRIENDLY_NAMES.get(image_type, image_type),
|
|
config=cfg,
|
|
)
|
|
|
|
|
|
@app.route("/images/<image_type>/config/save", methods=["POST"])
|
|
def image_config_save(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash("Unknown image type.", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
section = request.form.get("section", "")
|
|
payload = request.form.get("payload", "[]")
|
|
try:
|
|
data = json.loads(payload)
|
|
except json.JSONDecodeError:
|
|
flash("Invalid JSON payload.", "danger")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
|
|
ctrl = fs.control_path(image_type)
|
|
tools = fs.tools_path(image_type)
|
|
|
|
try:
|
|
if section == "hardware_models":
|
|
us_file = os.path.join(tools, "user_selections.json")
|
|
us_raw = fs.load_json(us_file)
|
|
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
|
|
us_data["HardwareModelSelection"] = data
|
|
fs.save_json(us_file, [us_data])
|
|
audit("CONFIG_SAVE", f"{image_type}/hardware_models")
|
|
|
|
elif section == "drivers":
|
|
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
|
|
fs.save_json(os.path.join(ctrl, "HardwareDriver.json"), clean)
|
|
audit("CONFIG_SAVE", f"{image_type}/drivers")
|
|
|
|
elif section == "operating_systems":
|
|
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
|
|
fs.save_json(os.path.join(ctrl, "OperatingSystem.json"), clean)
|
|
audit("CONFIG_SAVE", f"{image_type}/operating_systems")
|
|
|
|
elif section == "packages":
|
|
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
|
|
fs.save_json(os.path.join(ctrl, "packages.json"), clean)
|
|
audit("CONFIG_SAVE", f"{image_type}/packages")
|
|
|
|
else:
|
|
flash(f"Unknown section: {section}", "danger")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
|
|
flash(f"Saved {section.replace('_', ' ')} successfully.", "success")
|
|
except Exception as exc:
|
|
flash(f"Failed to save {section}: {exc}", "danger")
|
|
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - Driver/package upload + orphan adoption
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/images/<image_type>/drivers/upload", methods=["POST"])
|
|
def images_drivers_upload(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash(f"Unknown image type: {image_type}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
f = request.files.get("driver_file")
|
|
if not f or not f.filename:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
family = (request.form.get("family") or "").strip()
|
|
dest = (request.form.get("destination_dir") or "").strip()
|
|
overwrite = request.form.get("overwrite") == "1"
|
|
try:
|
|
rec = images.upload_driver(image_type, f, family=family,
|
|
destination_dir=dest, overwrite=overwrite)
|
|
audit("DRIVER_UPLOAD", f"{image_type}/{rec['filename']} registered={rec['registered']}")
|
|
msg = f"Uploaded {rec['filename']}."
|
|
msg += " Registered to family in HardwareDriver.json." if rec["registered"] else " Now in orphans list until adopted."
|
|
flash(msg, "success")
|
|
except (ValueError, FileExistsError, FileNotFoundError) as ex:
|
|
flash(str(ex), "danger")
|
|
except Exception as ex:
|
|
flash(f"Upload failed: {ex}", "danger")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
|
|
|
|
@app.route("/images/<image_type>/drivers/adopt", methods=["POST"])
|
|
def images_drivers_adopt(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash(f"Unknown image type: {image_type}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
filename = (request.form.get("filename") or "").strip()
|
|
family = (request.form.get("family") or "").strip()
|
|
dest = (request.form.get("destination_dir") or "").strip()
|
|
try:
|
|
rec = images.adopt_orphan(image_type, filename, family, dest)
|
|
audit("DRIVER_ADOPT", f"{image_type}/{rec['filename']} family={family}")
|
|
if rec.get("already_registered"):
|
|
flash(f"{rec['filename']} was already registered.", "info")
|
|
else:
|
|
flash(f"Adopted {rec['filename']} into HardwareDriver.json.", "success")
|
|
except (ValueError, FileNotFoundError) as ex:
|
|
flash(str(ex), "danger")
|
|
except Exception as ex:
|
|
flash(f"Adopt failed: {ex}", "danger")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
|
|
|
|
@app.route("/images/<image_type>/drivers/orphans/delete", methods=["POST"])
|
|
def images_drivers_orphans_delete(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash(f"Unknown image type: {image_type}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
# Filenames come as repeated form fields (one per checkbox).
|
|
filenames = request.form.getlist("filename")
|
|
if not filenames:
|
|
flash("No files selected for removal.", "warning")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
rec = images.remove_orphans(image_type, filenames)
|
|
audit("DRIVER_ORPHAN_REMOVE", f"{image_type} removed={len(rec['removed'])} missing={len(rec['missing'])}")
|
|
parts = []
|
|
if rec["removed"]:
|
|
parts.append(f"Removed {len(rec['removed'])} orphan(s).")
|
|
if rec["missing"]:
|
|
parts.append(f"{len(rec['missing'])} not found / errored.")
|
|
flash(" ".join(parts) or "No-op.", "success" if rec["removed"] else "warning")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
|
|
|
|
@app.route("/images/<image_type>/packages/upload", methods=["POST"])
|
|
def images_packages_upload(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash(f"Unknown image type: {image_type}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
f = request.files.get("package_file")
|
|
if not f or not f.filename:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
dest = (request.form.get("destination_dir") or "").strip()
|
|
overwrite = request.form.get("overwrite") == "1"
|
|
try:
|
|
rec = images.upload_package(image_type, f, destination_dir=dest, overwrite=overwrite)
|
|
audit("PACKAGE_UPLOAD", f"{image_type}/{rec['filename']} registered={rec['registered']}")
|
|
msg = f"Uploaded {rec['filename']}."
|
|
msg += " Registered in packages.json." if rec["registered"] else " Not registered (no destination_dir provided)."
|
|
flash(msg, "success")
|
|
except (ValueError, FileExistsError) as ex:
|
|
flash(str(ex), "danger")
|
|
except Exception as ex:
|
|
flash(f"Upload failed: {ex}", "danger")
|
|
return redirect(url_for("image_config", image_type=image_type))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - Image type CRUD (registry-backed)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/images/new", methods=["POST"])
|
|
def images_new():
|
|
key = (request.form.get("key") or "").strip().lower()
|
|
friendly_name = (request.form.get("friendly_name") or "").strip()
|
|
try:
|
|
rec = image_registry.create(key, friendly_name)
|
|
audit("IMAGE_REGISTRY_CREATE", f"{rec['key']} ({rec['friendly_name']})")
|
|
flash(f"Created image type {rec['key']}.", "success")
|
|
except image_registry.RegistryError as ex:
|
|
flash(str(ex), "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
|
|
@app.route("/images/<image_type>/clone", methods=["POST"])
|
|
def images_clone(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash(f"Unknown source image type: {image_type}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
dst_key = (request.form.get("dst_key") or "").strip().lower()
|
|
friendly_name = (request.form.get("friendly_name") or "").strip() or None
|
|
try:
|
|
rec = image_registry.clone(image_type, dst_key, friendly_name)
|
|
audit("IMAGE_REGISTRY_CLONE", f"{image_type} -> {rec['key']}")
|
|
flash(f"Cloned {image_type} -> {rec['key']}.", "success")
|
|
except image_registry.RegistryError as ex:
|
|
flash(str(ex), "danger")
|
|
except Exception as ex:
|
|
flash(f"Clone failed: {ex}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
|
|
@app.route("/images/<image_type>/delete", methods=["POST"])
|
|
def images_delete(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash(f"Unknown image type: {image_type}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
delete_content = request.form.get("delete_content") == "1"
|
|
try:
|
|
rec = image_registry.delete(image_type, delete_content=delete_content)
|
|
audit("IMAGE_REGISTRY_DELETE", f"{rec['key']} content={rec['removed_content']}")
|
|
msg = f"Removed image type {rec['key']} from registry."
|
|
if rec["removed_content"]:
|
|
msg += " On-disk content wiped."
|
|
flash(msg, "success")
|
|
except image_registry.RegistryError as ex:
|
|
flash(str(ex), "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
|
|
@app.route("/images/<image_type>/rename", methods=["POST"])
|
|
def images_rename(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
flash(f"Unknown image type: {image_type}", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
friendly_name = (request.form.get("friendly_name") or "").strip()
|
|
try:
|
|
rec = image_registry.rename_friendly(image_type, friendly_name)
|
|
audit("IMAGE_REGISTRY_RENAME", f"{rec['key']} -> {rec['friendly_name']}")
|
|
flash(f"Renamed {rec['key']} to '{rec['friendly_name']}'.", "success")
|
|
except image_registry.RegistryError as ex:
|
|
flash(str(ex), "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - Clonezilla Backups
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/backups")
|
|
def clonezilla_backups():
|
|
backups = []
|
|
if os.path.isdir(config.CLONEZILLA_SHARE):
|
|
for f in sorted(os.listdir(config.CLONEZILLA_SHARE)):
|
|
fpath = os.path.join(config.CLONEZILLA_SHARE, f)
|
|
if os.path.isfile(fpath) and f.lower().endswith(".zip"):
|
|
stat = os.stat(fpath)
|
|
backups.append({
|
|
"filename": f,
|
|
"machine": os.path.splitext(f)[0],
|
|
"size": stat.st_size,
|
|
"modified": stat.st_mtime,
|
|
})
|
|
return render_template(
|
|
"backups.html",
|
|
backups=backups,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/backups/upload", methods=["POST"])
|
|
def clonezilla_upload():
|
|
if "backup_file" not in request.files:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
f = request.files["backup_file"]
|
|
if not f.filename:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
filename = secure_filename(f.filename)
|
|
if not filename.lower().endswith(".zip"):
|
|
flash("Only .zip files are accepted.", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
os.makedirs(config.CLONEZILLA_SHARE, exist_ok=True)
|
|
dest = os.path.join(config.CLONEZILLA_SHARE, filename)
|
|
f.save(dest)
|
|
audit("BACKUP_UPLOAD", filename)
|
|
flash(f"Uploaded {filename} successfully.", "success")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
|
|
@app.route("/backups/download/<filename>")
|
|
def clonezilla_download(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(config.CLONEZILLA_SHARE, filename)
|
|
if not os.path.isfile(fpath):
|
|
flash(f"Backup not found: {filename}", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
return send_file(fpath, as_attachment=True)
|
|
|
|
|
|
@app.route("/backups/delete/<filename>", methods=["POST"])
|
|
def clonezilla_delete(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(config.CLONEZILLA_SHARE, filename)
|
|
if os.path.isfile(fpath):
|
|
os.remove(fpath)
|
|
audit("BACKUP_DELETE", filename)
|
|
flash(f"Deleted {filename}.", "success")
|
|
else:
|
|
flash(f"Backup not found: {filename}", "danger")
|
|
return redirect(url_for("clonezilla_backups"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - Blancco Reports
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/reports")
|
|
def blancco_reports():
|
|
reports = []
|
|
if os.path.isdir(config.BLANCCO_REPORTS):
|
|
for f in sorted(os.listdir(config.BLANCCO_REPORTS), reverse=True):
|
|
fpath = os.path.join(config.BLANCCO_REPORTS, f)
|
|
if not os.path.isfile(fpath):
|
|
continue
|
|
stat = os.stat(fpath)
|
|
ext = os.path.splitext(f)[1].lower()
|
|
# Surface BIOS serial + system model so operators can find a
|
|
# report for a specific bay without opening each one. Parse
|
|
# cost is one XML walk per .xml report (~50KB-3MB each;
|
|
# negligible at fleet sizes).
|
|
serial = ""
|
|
model = ""
|
|
state = ""
|
|
if ext == ".xml":
|
|
try:
|
|
data = blancco_report.parse(fpath)
|
|
sysinfo = (data.get("hardware") or {}).get("system") or {}
|
|
serial = sysinfo.get("serial", "") or ""
|
|
model = sysinfo.get("model", "") or ""
|
|
# Overall erasure result: each erasure entry has its own
|
|
# 'state' (Successful / Failed / ...). If any drive failed
|
|
# the report rolls up to Failed; otherwise Successful.
|
|
erasures = data.get("erasures") or []
|
|
states = [(e.get("state") or "").strip() for e in erasures if e.get("state")]
|
|
if not states:
|
|
state = ""
|
|
elif any(s.lower() != "successful" for s in states):
|
|
state = "Failed"
|
|
else:
|
|
state = "Successful"
|
|
except Exception:
|
|
pass
|
|
reports.append({
|
|
"filename": f,
|
|
"size": stat.st_size,
|
|
"modified": stat.st_mtime,
|
|
"type": ext.lstrip(".").upper() or "FILE",
|
|
"serial": serial,
|
|
"model": model,
|
|
"state": state,
|
|
})
|
|
return render_template(
|
|
"reports.html",
|
|
reports=reports,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/reports/download/<filename>")
|
|
def blancco_download_report(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(config.BLANCCO_REPORTS, filename)
|
|
if not os.path.isfile(fpath):
|
|
flash(f"Report not found: {filename}", "danger")
|
|
return redirect(url_for("blancco_reports"))
|
|
return send_file(fpath, as_attachment=True)
|
|
|
|
|
|
@app.route("/reports/view/<filename>")
|
|
def blancco_view_report(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(config.BLANCCO_REPORTS, filename)
|
|
if not os.path.isfile(fpath):
|
|
flash(f"Report not found: {filename}", "danger")
|
|
return redirect(url_for("blancco_reports"))
|
|
if not filename.lower().endswith(".xml"):
|
|
flash("Formatted view supports XML reports only.", "warning")
|
|
return redirect(url_for("blancco_reports"))
|
|
try:
|
|
data = blancco_report.parse(fpath)
|
|
except Exception as ex:
|
|
flash(f"Failed to parse {filename}: {ex}", "danger")
|
|
return redirect(url_for("blancco_reports"))
|
|
return render_template("report_view.html", filename=filename, data=data)
|
|
|
|
|
|
@app.route("/reports/delete/<filename>", methods=["POST"])
|
|
def blancco_delete_report(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(config.BLANCCO_REPORTS, filename)
|
|
if os.path.isfile(fpath):
|
|
os.remove(fpath)
|
|
audit("REPORT_DELETE", filename)
|
|
flash(f"Deleted {filename}.", "success")
|
|
else:
|
|
flash(f"Report not found: {filename}", "danger")
|
|
return redirect(url_for("blancco_reports"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - Imaging Progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/imaging")
|
|
def imaging_dashboard():
|
|
sessions = imaging_status.list_sessions()
|
|
return render_template("imaging.html", sessions=sessions)
|
|
|
|
|
|
@app.route("/imaging/tiles")
|
|
def imaging_tiles_partial():
|
|
"""HTML fragment of the per-bay tile loop only, used by the dashboard's
|
|
SSE/polling refresh to swap #imaging-tiles innerHTML without a full page
|
|
reload."""
|
|
sessions = imaging_status.list_sessions()
|
|
return render_template("_imaging_tiles.html", sessions=sessions)
|
|
|
|
|
|
def _sessions_hash() -> str:
|
|
"""Compact fingerprint of the current session list. Used by the SSE
|
|
stream to detect changes without sending the full payload. Hashing
|
|
(serial-or-key, status, stage_index, current_stage, last_updated) covers
|
|
every field the dashboard renders prominently."""
|
|
sessions = imaging_status.list_sessions()
|
|
h = hashlib.sha256()
|
|
for s in sessions:
|
|
key = s.get("serial") or f"{s.get('mac','')}-{s.get('ip','')}"
|
|
h.update(repr((
|
|
key,
|
|
s.get("source", "client"),
|
|
s.get("status", ""),
|
|
s.get("stage_index", 0),
|
|
s.get("current_stage", ""),
|
|
s.get("last_updated", ""),
|
|
s.get("laps_password", "") and "1" or "0",
|
|
)).encode())
|
|
return h.hexdigest()[:16]
|
|
|
|
|
|
@app.route("/imaging/stream")
|
|
def imaging_stream():
|
|
"""Server-Sent Events stream of session-list change pings.
|
|
|
|
Emits one JSON event every SSE_PING_INTERVAL seconds. When the hash
|
|
changes from the previously sent value, the client fetches /imaging/tiles
|
|
and re-renders. A keepalive heartbeat is sent on the same cadence so
|
|
intermediate proxies don't close the connection.
|
|
|
|
Single-threaded dev server can only serve one SSE client at a time. The
|
|
live PXE box runs gunicorn with multiple workers (see playbook) so this
|
|
is fine in production.
|
|
"""
|
|
SSE_PING_INTERVAL = 5 # seconds between hash checks
|
|
SSE_MAX_DURATION = 600 # cap connection length so the worker recycles
|
|
|
|
@stream_with_context
|
|
def gen():
|
|
start = time.time()
|
|
last = None
|
|
while time.time() - start < SSE_MAX_DURATION:
|
|
try:
|
|
cur = _sessions_hash()
|
|
except Exception as ex:
|
|
yield f": error {ex}\n\n"
|
|
cur = last
|
|
payload = json.dumps({"hash": cur, "ts": int(time.time())})
|
|
yield f"data: {payload}\n\n"
|
|
last = cur
|
|
time.sleep(SSE_PING_INTERVAL)
|
|
|
|
headers = {
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no", # disable nginx/apache proxy buffering
|
|
"Connection": "keep-alive",
|
|
}
|
|
return Response(gen(), mimetype="text/event-stream", headers=headers)
|
|
|
|
|
|
@app.route("/imaging/status", methods=["POST"])
|
|
def imaging_status_post():
|
|
# CSRF-exempt machine-to-machine endpoint; see services/csrf.py exempt list.
|
|
payload = request.get_json(silent=True) or {}
|
|
if not payload.get("serial"):
|
|
return jsonify({"error": "missing serial"}), 400
|
|
try:
|
|
state = imaging_status.update_session(payload)
|
|
except Exception as ex:
|
|
audit("IMAGING_STATUS_ERROR", str(ex))
|
|
return jsonify({"error": str(ex)}), 500
|
|
return jsonify({"ok": True, "serial": state["serial"]}), 200
|
|
|
|
|
|
@app.route("/imaging/<serial>.json")
|
|
def imaging_session_json(serial):
|
|
serial = secure_filename(serial)
|
|
s = imaging_status.get_session(serial)
|
|
if not s:
|
|
return jsonify({"error": "not found"}), 404
|
|
return jsonify(s)
|
|
|
|
|
|
@app.route("/imaging/session/<serial>")
|
|
def imaging_session_detail(serial):
|
|
"""Per-bay forensics page: stage timeline, full sidecar log, all session
|
|
metadata. Linked from the dashboard tile. Returns 404 if no session JSON
|
|
exists for the serial."""
|
|
serial = secure_filename(serial)
|
|
s = imaging_status.get_session(serial)
|
|
if not s:
|
|
flash(f"No session for serial {serial}.", "danger")
|
|
return redirect(url_for("imaging_dashboard"))
|
|
full_log, truncated = imaging_status.read_full_log(serial)
|
|
return render_template(
|
|
"imaging_detail.html",
|
|
session=s,
|
|
full_log=full_log,
|
|
full_log_truncated=truncated,
|
|
)
|
|
|
|
|
|
@app.route("/imaging/delete/<serial>", methods=["POST"])
|
|
def imaging_delete_session(serial):
|
|
serial = secure_filename(serial)
|
|
if imaging_status.delete_session(serial):
|
|
audit("IMAGING_DELETE", serial)
|
|
flash(f"Cleared imaging session {serial}.", "success")
|
|
else:
|
|
flash(f"Session not found: {serial}", "danger")
|
|
return redirect(url_for("imaging_dashboard"))
|
|
|
|
|
|
@app.route("/imaging/delete_all", methods=["POST"])
|
|
def imaging_delete_all():
|
|
n = imaging_status.delete_all_sessions()
|
|
audit("IMAGING_DELETE_ALL", str(n))
|
|
flash(f"Cleared {n} imaging session(s).", "success")
|
|
return redirect(url_for("imaging_dashboard"))
|
|
|
|
|
|
@app.route("/imaging/<serial>/laps", methods=["POST"])
|
|
def imaging_set_laps(serial):
|
|
"""Save (or clear with empty value) the LAPS password for a bay so it
|
|
survives the dashboard's 5s auto-refresh. JSON body: {"password": "..."}.
|
|
Empty string removes the field. Daily reset wipes natural risk."""
|
|
serial = secure_filename(serial)
|
|
body = request.get_json(silent=True) or {}
|
|
pw = body.get("password", "")
|
|
if not isinstance(pw, str):
|
|
return {"ok": False, "error": "password must be string"}, 400
|
|
if pw == "":
|
|
# Clear by direct file write. update_session() merges payload INTO
|
|
# existing state and skips empty values, so it cannot remove a key.
|
|
# Pop the laps_password key directly from the session JSON and
|
|
# write the result atomically.
|
|
path = imaging_status._path_for(serial)
|
|
if os.path.isfile(path):
|
|
try:
|
|
with open(path, "r") as f:
|
|
state = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
state = {}
|
|
if "laps_password" in state:
|
|
state.pop("laps_password", None)
|
|
state["last_updated"] = imaging_status._now_iso()
|
|
fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json")
|
|
try:
|
|
with os.fdopen(fd, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
os.replace(tmp, path)
|
|
except Exception:
|
|
try: os.unlink(tmp)
|
|
except OSError: pass
|
|
raise
|
|
return {"ok": True, "cleared": True}
|
|
imaging_status.update_session({"serial": serial, "laps_password": pw})
|
|
return {"ok": True}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - Enrollment Packages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/enrollment")
|
|
def enrollment():
|
|
packages = []
|
|
if os.path.isdir(config.ENROLLMENT_PPKG_DIR):
|
|
for f in sorted(os.listdir(config.ENROLLMENT_PPKG_DIR)):
|
|
fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, f)
|
|
if os.path.isfile(fpath) and f.lower().endswith(".ppkg"):
|
|
stat = os.stat(fpath)
|
|
packages.append({
|
|
"filename": f,
|
|
"size": stat.st_size,
|
|
"modified": stat.st_mtime,
|
|
})
|
|
return render_template(
|
|
"enrollment.html",
|
|
packages=packages,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/enrollment/upload", methods=["POST"])
|
|
def enrollment_upload():
|
|
if "ppkg_file" not in request.files:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("enrollment"))
|
|
|
|
f = request.files["ppkg_file"]
|
|
if not f.filename:
|
|
flash("No file selected.", "danger")
|
|
return redirect(url_for("enrollment"))
|
|
|
|
filename = secure_filename(f.filename)
|
|
if not filename.lower().endswith(".ppkg"):
|
|
flash("Only .ppkg files are accepted.", "danger")
|
|
return redirect(url_for("enrollment"))
|
|
|
|
os.makedirs(config.ENROLLMENT_PPKG_DIR, exist_ok=True)
|
|
dest = os.path.join(config.ENROLLMENT_PPKG_DIR, filename)
|
|
f.save(dest)
|
|
audit("ENROLLMENT_UPLOAD", filename)
|
|
flash(f"Uploaded {filename} successfully.", "success")
|
|
return redirect(url_for("enrollment"))
|
|
|
|
|
|
@app.route("/enrollment/download/<filename>")
|
|
def enrollment_download(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, filename)
|
|
if not os.path.isfile(fpath):
|
|
flash(f"Package not found: {filename}", "danger")
|
|
return redirect(url_for("enrollment"))
|
|
return send_file(fpath, as_attachment=True)
|
|
|
|
|
|
@app.route("/enrollment/delete/<filename>", methods=["POST"])
|
|
def enrollment_delete(filename):
|
|
filename = secure_filename(filename)
|
|
fpath = os.path.join(config.ENROLLMENT_PPKG_DIR, filename)
|
|
if os.path.isfile(fpath):
|
|
os.remove(fpath)
|
|
audit("ENROLLMENT_DELETE", filename)
|
|
flash(f"Deleted {filename}.", "success")
|
|
else:
|
|
flash(f"Package not found: {filename}", "danger")
|
|
return redirect(url_for("enrollment"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - startnet.cmd Editor (boot.wim)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/startnet")
|
|
def startnet_editor():
|
|
import subprocess
|
|
wim_exists = os.path.isfile(config.BOOT_WIM)
|
|
content = ""
|
|
wim_info = {}
|
|
|
|
if wim_exists:
|
|
content = wim.extract_startnet(config.BOOT_WIM) or ""
|
|
try:
|
|
result = subprocess.run(
|
|
["wiminfo", config.BOOT_WIM],
|
|
capture_output=True, text=True, timeout=15,
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.splitlines():
|
|
if ":" in line:
|
|
key, _, val = line.partition(":")
|
|
wim_info[key.strip()] = val.strip()
|
|
except Exception:
|
|
pass
|
|
|
|
return render_template(
|
|
"startnet_editor.html",
|
|
wim_exists=wim_exists,
|
|
wim_path=config.BOOT_WIM,
|
|
content=content,
|
|
wim_info=wim_info,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
@app.route("/startnet/save", methods=["POST"])
|
|
def startnet_save():
|
|
if not os.path.isfile(config.BOOT_WIM):
|
|
flash("boot.wim not found.", "danger")
|
|
return redirect(url_for("startnet_editor"))
|
|
|
|
content = request.form.get("content", "")
|
|
ok, err = wim.update_startnet(config.BOOT_WIM, content)
|
|
if ok:
|
|
audit("STARTNET_SAVE", "boot.wim updated")
|
|
flash("startnet.cmd updated successfully in boot.wim.", "success")
|
|
else:
|
|
flash(f"Failed to update boot.wim: {err}", "danger")
|
|
return redirect(url_for("startnet_editor"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - Audit Log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/audit")
|
|
def audit_log():
|
|
entries = []
|
|
if os.path.isfile(config.AUDIT_LOG):
|
|
with open(config.AUDIT_LOG, "r") as fh:
|
|
for line in fh:
|
|
entries.append(line.strip())
|
|
entries.reverse()
|
|
return render_template(
|
|
"audit.html",
|
|
entries=entries,
|
|
image_types=config.IMAGE_TYPES,
|
|
friendly_names=config.FRIENDLY_NAMES,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes - JSON API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/api/services")
|
|
def api_services():
|
|
services = {s: system.service_status(s) for s in ("dnsmasq", "apache2", "smbd")}
|
|
return jsonify(services)
|
|
|
|
|
|
@app.route("/api/images")
|
|
def api_images():
|
|
image_list = [images.image_status(it) for it in config.IMAGE_TYPES]
|
|
return jsonify(image_list)
|
|
|
|
|
|
@app.route("/api/images/<image_type>/unattend", methods=["POST"])
|
|
def api_save_unattend(image_type):
|
|
if image_type not in config.IMAGE_TYPES:
|
|
return jsonify({"error": "Unknown image type"}), 404
|
|
|
|
xml_file = fs.unattend_path(image_type)
|
|
payload = request.get_json(silent=True)
|
|
|
|
if not payload:
|
|
return jsonify({"error": "No JSON body provided"}), 400
|
|
|
|
if "raw_xml" in payload:
|
|
raw_xml = payload["raw_xml"]
|
|
try:
|
|
etree.fromstring(raw_xml.encode("utf-8"))
|
|
except etree.XMLSyntaxError as exc:
|
|
return jsonify({"error": f"Invalid XML: {exc}"}), 400
|
|
xml_content = raw_xml
|
|
else:
|
|
try:
|
|
xml_content = unattend.build_unattend_xml(payload)
|
|
except Exception as exc:
|
|
return jsonify({"error": f"Failed to build XML: {exc}"}), 400
|
|
|
|
try:
|
|
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
|
|
with open(xml_file, "w", encoding="utf-8") as fh:
|
|
fh.write(xml_content)
|
|
except Exception as exc:
|
|
return jsonify({"error": f"Failed to write file: {exc}"}), 500
|
|
|
|
audit("UNATTEND_SAVE_API", image_type)
|
|
return jsonify({"status": "ok", "path": xml_file})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Template helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.template_filter("timestamp_fmt")
|
|
def timestamp_fmt(ts):
|
|
"""Format a Unix timestamp to a human-readable date string."""
|
|
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
|
|
|
|
|
|
@app.context_processor
|
|
def inject_globals():
|
|
return {
|
|
"all_image_types": config.IMAGE_TYPES,
|
|
"all_friendly_names": config.FRIENDLY_NAMES,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="127.0.0.1", port=9010, debug=False, threaded=True)
|