Files
pxe-server/webapp/services/images.py
cproudlock 69a1682a7f webapp: imaging UX overhaul + image management CRUD
Imaging dashboard
- services/imaging_log_tail.py: parses dnsmasq leases, Apache access log,
  Samba per-host log files, and dnsmasq syslog (DHCP/TFTP). Synthesizes
  inferred sessions keyed by MAC for bays that have only touched the boot
  chain but not yet pushed to /imaging/status. Active window 90 min.
- imaging_status.list_sessions() merges inferred sessions into the dashboard
  list. Real client-pushed sessions win for the same MAC.
- imaging_status: stage_history field tracks every stage transition (capped
  30); sidecar .log file per serial records every log_lines push uncapped
  (read_full_log() caps detail-page response to 1 MB).
- delete_session/delete_all_sessions clean up sidecar .log too.
- New SSE endpoint /imaging/stream emits a session-list hash every 5s.
  Client fetches /imaging/tiles (HTML partial) on hash change and swaps
  #imaging-tiles innerHTML. Polling fallback at 15s if SSE drops.
- Tile-swap preserves scroll, filter input, expanded state via localStorage,
  and any LAPS input the operator is mid-pasting (swap skipped when a
  laps-input is focused).
- imaging.html: removed 15s location.reload(). Added live-status dot in
  header (gray idle / green SSE connected / red SSE lost).
- _imaging_tiles.html: shared partial used by both /imaging full render and
  /imaging/tiles SSE refresh. Inferred bays render with yellow border +
  log-inferred badge + no progress bar (stage_index inference is coarse).
- imaging_detail.html (new): per-bay forensics page at /imaging/session/
  <serial>. Session metadata grid, stage timeline table, full sidecar log
  with truncation indicator, Copy-support-summary button. Linked from each
  client-pushed tile.
- qr-render.js exposes window.renderAllQRs() so the SSE swap can re-render
  Intune device-ID QRs in the swapped-in tiles.

Image management
- services/image_registry.py: JSON registry of image types at
  {SAMBA_SHARE}/image-registry.json. Bootstraps from baked-in
  config.IMAGE_TYPES on first run. create/clone/delete/rename_friendly
  mutate the file then call reload() which rewrites config.IMAGE_TYPES +
  config.FRIENDLY_NAMES in place. Sidebar reflects on next request.
- app.py routes: /images/new, /images/<t>/clone, /images/<t>/delete (with
  optional content-wipe checkbox), /images/<t>/rename.
- dashboard.html: + New image type button + Clone/Delete per row, all in
  Bootstrap modals with confirmation copy.
- Clone copies Deploy/ tree but preserves symlinks to shared dirs (Out-of-
  box Drivers, Operating Systems, Packages) so disk usage stays low.
- Delete with content checked unlinks symlinks (does not follow into shared
  dirs).

Driver / package upload + orphan adoption
- services/images.py: upload_driver, adopt_orphan, remove_orphans,
  upload_package. Filename sanitization blocks path traversal.
- app.py routes: /images/<t>/drivers/upload, /images/<t>/drivers/adopt,
  /images/<t>/drivers/orphans/delete, /images/<t>/packages/upload.
- image_config.html: Upload .zip button + modal on Drivers section. Orphan
  drivers card-footer rebuilt as interactive list with per-row Adopt inline
  form (family + destinationDir inputs) and bulk select+delete.
- Upload .zip on Packages section with optional destinationDir field that
  appends a packages.json entry.

Configuration
- config.py: new env vars DNSMASQ_LEASES, APACHE_ACCESS_LOG, SAMBA_LOG_DIR,
  DNSMASQ_SYSLOG for the log-tailer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 13:21:06 -04:00

265 lines
9.8 KiB
Python

"""Per-image-type state probes: status + config (drivers / OS / packages / models)."""
import os
import re
import shutil
import config
from services import fs
# File-name sanitizer for uploads. Strips anything outside [A-Za-z0-9._-]
# so a malicious filename can't break out of the destination directory.
_SAFE_NAME = re.compile(r"[^A-Za-z0-9._-]")
def _safe_filename(name: str) -> str:
base = os.path.basename(name or "")
cleaned = _SAFE_NAME.sub("_", base)
return cleaned or "upload.bin"
def image_status(image_type):
"""Return a dict describing the state of an image type."""
dp = fs.deploy_path(image_type)
up = fs.unattend_path(image_type)
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
has_unattend = os.path.isfile(up)
return {
"image_type": image_type,
"friendly_name": config.FRIENDLY_NAMES.get(image_type, image_type),
"deploy_path": dp,
"has_content": has_content,
"has_unattend": has_unattend,
}
def load_image_config(image_type):
"""Load all JSON configs for an image and check on-disk presence."""
ctrl = fs.control_path(image_type)
tools = fs.tools_path(image_type)
# --- Drivers (merge HardwareDriver.json + hw_drivers.json) ---
hw_driver_file = os.path.join(ctrl, "HardwareDriver.json")
hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json")
drivers_raw = fs.load_json(hw_driver_file)
extra_raw = fs.load_json(hw_drivers_extra)
seen_files = set()
drivers = []
for d in drivers_raw + extra_raw:
fname = (d.get("FileName") or d.get("fileName") or "").lower()
if fname and fname in seen_files:
continue
if fname:
seen_files.add(fname)
drivers.append(d)
for d in drivers:
fname = d.get("FileName") or d.get("fileName") or ""
dest = d.get("DestinationDir") or d.get("destinationDir") or ""
resolved = fs.resolve_destination(dest, image_type)
if resolved and fname:
d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
else:
d["_on_disk"] = False
# --- Operating Systems ---
os_file = os.path.join(ctrl, "OperatingSystem.json")
operating_systems = fs.load_json(os_file)
for entry in operating_systems:
osv = entry.get("operatingSystemVersion", {})
wim = osv.get("wim", {})
dest = wim.get("DestinationDir") or wim.get("destinationDir") or ""
resolved = fs.resolve_destination(dest, image_type)
if resolved:
entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim"))
else:
entry["_on_disk"] = False
# --- Packages ---
pkg_file = os.path.join(ctrl, "packages.json")
packages = fs.load_json(pkg_file)
for p in packages:
fname = p.get("fileName") or p.get("FileName") or ""
dest = p.get("destinationDir") or p.get("DestinationDir") or ""
resolved = fs.resolve_destination(dest, image_type)
if resolved and fname:
p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
else:
p["_on_disk"] = False
# --- Hardware Models (user_selections.json) ---
us_file = os.path.join(tools, "user_selections.json")
us_raw = fs.load_json(us_file)
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
hardware_models = us_data.get("HardwareModelSelection", [])
os_selection = str(us_data.get("OperatingSystemSelection", ""))
family_lookup = {}
for d in drivers:
family = d.get("family", "")
if family:
family_lookup[family] = d
for hm in hardware_models:
family_id = hm.get("Id", "")
matched = family_lookup.get(family_id)
hm["_on_disk"] = matched["_on_disk"] if matched else False
# --- Orphan drivers: zip files on disk not referenced in any JSON ---
orphan_drivers = []
oob_dir = os.path.join(fs.deploy_path(image_type), "Out-of-box Drivers")
try:
oob_dir = os.path.realpath(oob_dir)
except OSError:
pass
registered_files = set()
for d in drivers:
fname = d.get("FileName") or d.get("fileName") or ""
if fname:
registered_files.add(fname.lower())
if os.path.isdir(oob_dir):
for dirpath, _dirnames, filenames in os.walk(oob_dir):
for fn in filenames:
if fn.lower().endswith(".zip") and fn.lower() not in registered_files:
rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir)
orphan_drivers.append({"fileName": fn, "relPath": rel})
return {
"hardware_models": hardware_models,
"drivers": drivers,
"operating_systems": operating_systems,
"packages": packages,
"orphan_drivers": orphan_drivers,
"os_selection": os_selection,
}
# ---------------------------------------------------------------------------
# Driver / package upload + orphan adoption
# ---------------------------------------------------------------------------
DRIVERS_SUBDIR = "Out-of-box Drivers"
PACKAGES_SUBDIR = "Packages"
def _drivers_dir(image_type: str) -> str:
return os.path.join(fs.deploy_path(image_type), DRIVERS_SUBDIR)
def _packages_dir(image_type: str) -> str:
return os.path.join(fs.deploy_path(image_type), PACKAGES_SUBDIR)
def upload_driver(image_type: str, uploaded_file, family: str = "",
destination_dir: str = "", overwrite: bool = False) -> dict:
"""Save an uploaded driver .zip into the image's Out-of-box Drivers dir.
When family + destination_dir are provided, also append a HardwareDriver
.json entry so the driver is recognized at deploy time. Returns a dict
describing what landed on disk + whether an entry was registered."""
fname = _safe_filename(uploaded_file.filename)
if not fname.lower().endswith(".zip"):
raise ValueError("Driver upload must be a .zip file")
dst_dir = _drivers_dir(image_type)
os.makedirs(dst_dir, exist_ok=True)
dst_path = os.path.join(dst_dir, fname)
if os.path.exists(dst_path) and not overwrite:
raise FileExistsError(f"{fname} already exists in {DRIVERS_SUBDIR}/")
uploaded_file.save(dst_path)
registered = False
if family and destination_dir:
adopt_orphan(image_type, fname, family, destination_dir)
registered = True
return {
"filename": fname,
"path": dst_path,
"registered": registered,
}
def adopt_orphan(image_type: str, filename: str, family: str,
destination_dir: str) -> dict:
"""Append a HardwareDriver.json entry for an existing .zip in the image's
Out-of-box Drivers dir, so it stops showing up as an orphan + gets
deployed for the named hardware family. Idempotent: a second adopt with
the same filename is a no-op."""
safe_name = _safe_filename(filename)
drivers_path = _drivers_dir(image_type)
if not os.path.isfile(os.path.join(drivers_path, safe_name)):
raise FileNotFoundError(f"{safe_name} not found in {DRIVERS_SUBDIR}/")
if not family:
raise ValueError("family is required (matches HardwareModelSelection.Id)")
if not destination_dir:
raise ValueError("destination_dir is required")
ctrl = fs.control_path(image_type)
hw_file = os.path.join(ctrl, "HardwareDriver.json")
entries = fs.load_json(hw_file)
for e in entries:
if (e.get("FileName") or e.get("fileName") or "").lower() == safe_name.lower():
return {"filename": safe_name, "already_registered": True}
entries.append({
"FileName": safe_name,
"DestinationDir": destination_dir,
"family": family,
})
fs.save_json(hw_file, entries)
return {"filename": safe_name, "already_registered": False}
def remove_orphans(image_type: str, filenames: list[str]) -> dict:
"""Delete the named files from Out-of-box Drivers/. Caller is
responsible for confirming this is what the user wants (orphan files
have no JSON entry, so deleting them is safe in the sense that nothing
references them; but they may be stash for future adoption)."""
drivers_path = _drivers_dir(image_type)
removed, missing = [], []
for fn in filenames:
safe = _safe_filename(fn)
path = os.path.join(drivers_path, safe)
if not os.path.isfile(path):
missing.append(safe)
continue
try:
os.unlink(path)
removed.append(safe)
except OSError:
missing.append(safe)
return {"removed": removed, "missing": missing}
def upload_package(image_type: str, uploaded_file, destination_dir: str = "",
overwrite: bool = False) -> dict:
"""Save an uploaded package (any extension) into the image's Packages
dir, and append an entry to packages.json when destination_dir is set
so it deploys at imaging time."""
fname = _safe_filename(uploaded_file.filename)
dst_dir = _packages_dir(image_type)
os.makedirs(dst_dir, exist_ok=True)
dst_path = os.path.join(dst_dir, fname)
if os.path.exists(dst_path) and not overwrite:
raise FileExistsError(f"{fname} already exists in {PACKAGES_SUBDIR}/")
uploaded_file.save(dst_path)
registered = False
if destination_dir:
ctrl = fs.control_path(image_type)
pkg_file = os.path.join(ctrl, "packages.json")
entries = fs.load_json(pkg_file)
already = any(
(e.get("fileName") or e.get("FileName") or "").lower() == fname.lower()
for e in entries
)
if not already:
entries.append({
"fileName": fname,
"destinationDir": destination_dir,
})
fs.save_json(pkg_file, entries)
registered = True
return {
"filename": fname,
"path": dst_path,
"registered": registered,
}