Imaging dashboard
- services/imaging_log_tail.py: parses dnsmasq leases, Apache access log,
Samba per-host log files, and dnsmasq syslog (DHCP/TFTP). Synthesizes
inferred sessions keyed by MAC for bays that have only touched the boot
chain but not yet pushed to /imaging/status. Active window 90 min.
- imaging_status.list_sessions() merges inferred sessions into the dashboard
list. Real client-pushed sessions win for the same MAC.
- imaging_status: stage_history field tracks every stage transition (capped
30); sidecar .log file per serial records every log_lines push uncapped
(read_full_log() caps detail-page response to 1 MB).
- delete_session/delete_all_sessions clean up sidecar .log too.
- New SSE endpoint /imaging/stream emits a session-list hash every 5s.
Client fetches /imaging/tiles (HTML partial) on hash change and swaps
#imaging-tiles innerHTML. Polling fallback at 15s if SSE drops.
- Tile-swap preserves scroll, filter input, expanded state via localStorage,
and any LAPS input the operator is mid-pasting (swap skipped when a
laps-input is focused).
- imaging.html: removed 15s location.reload(). Added live-status dot in
header (gray idle / green SSE connected / red SSE lost).
- _imaging_tiles.html: shared partial used by both /imaging full render and
/imaging/tiles SSE refresh. Inferred bays render with yellow border +
log-inferred badge + no progress bar (stage_index inference is coarse).
- imaging_detail.html (new): per-bay forensics page at /imaging/session/
<serial>. Session metadata grid, stage timeline table, full sidecar log
with truncation indicator, Copy-support-summary button. Linked from each
client-pushed tile.
- qr-render.js exposes window.renderAllQRs() so the SSE swap can re-render
Intune device-ID QRs in the swapped-in tiles.
Image management
- services/image_registry.py: JSON registry of image types at
{SAMBA_SHARE}/image-registry.json. Bootstraps from baked-in
config.IMAGE_TYPES on first run. create/clone/delete/rename_friendly
mutate the file then call reload() which rewrites config.IMAGE_TYPES +
config.FRIENDLY_NAMES in place. Sidebar reflects on next request.
- app.py routes: /images/new, /images/<t>/clone, /images/<t>/delete (with
optional content-wipe checkbox), /images/<t>/rename.
- dashboard.html: + New image type button + Clone/Delete per row, all in
Bootstrap modals with confirmation copy.
- Clone copies Deploy/ tree but preserves symlinks to shared dirs (Out-of-
box Drivers, Operating Systems, Packages) so disk usage stays low.
- Delete with content checked unlinks symlinks (does not follow into shared
dirs).
Driver / package upload + orphan adoption
- services/images.py: upload_driver, adopt_orphan, remove_orphans,
upload_package. Filename sanitization blocks path traversal.
- app.py routes: /images/<t>/drivers/upload, /images/<t>/drivers/adopt,
/images/<t>/drivers/orphans/delete, /images/<t>/packages/upload.
- image_config.html: Upload .zip button + modal on Drivers section. Orphan
drivers card-footer rebuilt as interactive list with per-row Adopt inline
form (family + destinationDir inputs) and bulk select+delete.
- Upload .zip on Packages section with optional destinationDir field that
appends a packages.json entry.
Configuration
- config.py: new env vars DNSMASQ_LEASES, APACHE_ACCESS_LOG, SAMBA_LOG_DIR,
DNSMASQ_SYSLOG for the log-tailer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
265 lines
9.8 KiB
Python
265 lines
9.8 KiB
Python
"""Per-image-type state probes: status + config (drivers / OS / packages / models)."""
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
|
|
import config
|
|
from services import fs
|
|
|
|
|
|
# File-name sanitizer for uploads. Strips anything outside [A-Za-z0-9._-]
|
|
# so a malicious filename can't break out of the destination directory.
|
|
_SAFE_NAME = re.compile(r"[^A-Za-z0-9._-]")
|
|
|
|
|
|
def _safe_filename(name: str) -> str:
|
|
base = os.path.basename(name or "")
|
|
cleaned = _SAFE_NAME.sub("_", base)
|
|
return cleaned or "upload.bin"
|
|
|
|
|
|
def image_status(image_type):
|
|
"""Return a dict describing the state of an image type."""
|
|
dp = fs.deploy_path(image_type)
|
|
up = fs.unattend_path(image_type)
|
|
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
|
|
has_unattend = os.path.isfile(up)
|
|
return {
|
|
"image_type": image_type,
|
|
"friendly_name": config.FRIENDLY_NAMES.get(image_type, image_type),
|
|
"deploy_path": dp,
|
|
"has_content": has_content,
|
|
"has_unattend": has_unattend,
|
|
}
|
|
|
|
|
|
def load_image_config(image_type):
|
|
"""Load all JSON configs for an image and check on-disk presence."""
|
|
ctrl = fs.control_path(image_type)
|
|
tools = fs.tools_path(image_type)
|
|
|
|
# --- Drivers (merge HardwareDriver.json + hw_drivers.json) ---
|
|
hw_driver_file = os.path.join(ctrl, "HardwareDriver.json")
|
|
hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json")
|
|
drivers_raw = fs.load_json(hw_driver_file)
|
|
extra_raw = fs.load_json(hw_drivers_extra)
|
|
|
|
seen_files = set()
|
|
drivers = []
|
|
for d in drivers_raw + extra_raw:
|
|
fname = (d.get("FileName") or d.get("fileName") or "").lower()
|
|
if fname and fname in seen_files:
|
|
continue
|
|
if fname:
|
|
seen_files.add(fname)
|
|
drivers.append(d)
|
|
|
|
for d in drivers:
|
|
fname = d.get("FileName") or d.get("fileName") or ""
|
|
dest = d.get("DestinationDir") or d.get("destinationDir") or ""
|
|
resolved = fs.resolve_destination(dest, image_type)
|
|
if resolved and fname:
|
|
d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
|
|
else:
|
|
d["_on_disk"] = False
|
|
|
|
# --- Operating Systems ---
|
|
os_file = os.path.join(ctrl, "OperatingSystem.json")
|
|
operating_systems = fs.load_json(os_file)
|
|
for entry in operating_systems:
|
|
osv = entry.get("operatingSystemVersion", {})
|
|
wim = osv.get("wim", {})
|
|
dest = wim.get("DestinationDir") or wim.get("destinationDir") or ""
|
|
resolved = fs.resolve_destination(dest, image_type)
|
|
if resolved:
|
|
entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim"))
|
|
else:
|
|
entry["_on_disk"] = False
|
|
|
|
# --- Packages ---
|
|
pkg_file = os.path.join(ctrl, "packages.json")
|
|
packages = fs.load_json(pkg_file)
|
|
for p in packages:
|
|
fname = p.get("fileName") or p.get("FileName") or ""
|
|
dest = p.get("destinationDir") or p.get("DestinationDir") or ""
|
|
resolved = fs.resolve_destination(dest, image_type)
|
|
if resolved and fname:
|
|
p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
|
|
else:
|
|
p["_on_disk"] = False
|
|
|
|
# --- Hardware Models (user_selections.json) ---
|
|
us_file = os.path.join(tools, "user_selections.json")
|
|
us_raw = fs.load_json(us_file)
|
|
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
|
|
hardware_models = us_data.get("HardwareModelSelection", [])
|
|
os_selection = str(us_data.get("OperatingSystemSelection", ""))
|
|
|
|
family_lookup = {}
|
|
for d in drivers:
|
|
family = d.get("family", "")
|
|
if family:
|
|
family_lookup[family] = d
|
|
|
|
for hm in hardware_models:
|
|
family_id = hm.get("Id", "")
|
|
matched = family_lookup.get(family_id)
|
|
hm["_on_disk"] = matched["_on_disk"] if matched else False
|
|
|
|
# --- Orphan drivers: zip files on disk not referenced in any JSON ---
|
|
orphan_drivers = []
|
|
oob_dir = os.path.join(fs.deploy_path(image_type), "Out-of-box Drivers")
|
|
try:
|
|
oob_dir = os.path.realpath(oob_dir)
|
|
except OSError:
|
|
pass
|
|
registered_files = set()
|
|
for d in drivers:
|
|
fname = d.get("FileName") or d.get("fileName") or ""
|
|
if fname:
|
|
registered_files.add(fname.lower())
|
|
if os.path.isdir(oob_dir):
|
|
for dirpath, _dirnames, filenames in os.walk(oob_dir):
|
|
for fn in filenames:
|
|
if fn.lower().endswith(".zip") and fn.lower() not in registered_files:
|
|
rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir)
|
|
orphan_drivers.append({"fileName": fn, "relPath": rel})
|
|
|
|
return {
|
|
"hardware_models": hardware_models,
|
|
"drivers": drivers,
|
|
"operating_systems": operating_systems,
|
|
"packages": packages,
|
|
"orphan_drivers": orphan_drivers,
|
|
"os_selection": os_selection,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Driver / package upload + orphan adoption
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DRIVERS_SUBDIR = "Out-of-box Drivers"
|
|
PACKAGES_SUBDIR = "Packages"
|
|
|
|
|
|
def _drivers_dir(image_type: str) -> str:
|
|
return os.path.join(fs.deploy_path(image_type), DRIVERS_SUBDIR)
|
|
|
|
|
|
def _packages_dir(image_type: str) -> str:
|
|
return os.path.join(fs.deploy_path(image_type), PACKAGES_SUBDIR)
|
|
|
|
|
|
def upload_driver(image_type: str, uploaded_file, family: str = "",
|
|
destination_dir: str = "", overwrite: bool = False) -> dict:
|
|
"""Save an uploaded driver .zip into the image's Out-of-box Drivers dir.
|
|
When family + destination_dir are provided, also append a HardwareDriver
|
|
.json entry so the driver is recognized at deploy time. Returns a dict
|
|
describing what landed on disk + whether an entry was registered."""
|
|
fname = _safe_filename(uploaded_file.filename)
|
|
if not fname.lower().endswith(".zip"):
|
|
raise ValueError("Driver upload must be a .zip file")
|
|
dst_dir = _drivers_dir(image_type)
|
|
os.makedirs(dst_dir, exist_ok=True)
|
|
dst_path = os.path.join(dst_dir, fname)
|
|
if os.path.exists(dst_path) and not overwrite:
|
|
raise FileExistsError(f"{fname} already exists in {DRIVERS_SUBDIR}/")
|
|
uploaded_file.save(dst_path)
|
|
registered = False
|
|
if family and destination_dir:
|
|
adopt_orphan(image_type, fname, family, destination_dir)
|
|
registered = True
|
|
return {
|
|
"filename": fname,
|
|
"path": dst_path,
|
|
"registered": registered,
|
|
}
|
|
|
|
|
|
def adopt_orphan(image_type: str, filename: str, family: str,
|
|
destination_dir: str) -> dict:
|
|
"""Append a HardwareDriver.json entry for an existing .zip in the image's
|
|
Out-of-box Drivers dir, so it stops showing up as an orphan + gets
|
|
deployed for the named hardware family. Idempotent: a second adopt with
|
|
the same filename is a no-op."""
|
|
safe_name = _safe_filename(filename)
|
|
drivers_path = _drivers_dir(image_type)
|
|
if not os.path.isfile(os.path.join(drivers_path, safe_name)):
|
|
raise FileNotFoundError(f"{safe_name} not found in {DRIVERS_SUBDIR}/")
|
|
if not family:
|
|
raise ValueError("family is required (matches HardwareModelSelection.Id)")
|
|
if not destination_dir:
|
|
raise ValueError("destination_dir is required")
|
|
|
|
ctrl = fs.control_path(image_type)
|
|
hw_file = os.path.join(ctrl, "HardwareDriver.json")
|
|
entries = fs.load_json(hw_file)
|
|
for e in entries:
|
|
if (e.get("FileName") or e.get("fileName") or "").lower() == safe_name.lower():
|
|
return {"filename": safe_name, "already_registered": True}
|
|
entries.append({
|
|
"FileName": safe_name,
|
|
"DestinationDir": destination_dir,
|
|
"family": family,
|
|
})
|
|
fs.save_json(hw_file, entries)
|
|
return {"filename": safe_name, "already_registered": False}
|
|
|
|
|
|
def remove_orphans(image_type: str, filenames: list[str]) -> dict:
|
|
"""Delete the named files from Out-of-box Drivers/. Caller is
|
|
responsible for confirming this is what the user wants (orphan files
|
|
have no JSON entry, so deleting them is safe in the sense that nothing
|
|
references them; but they may be stash for future adoption)."""
|
|
drivers_path = _drivers_dir(image_type)
|
|
removed, missing = [], []
|
|
for fn in filenames:
|
|
safe = _safe_filename(fn)
|
|
path = os.path.join(drivers_path, safe)
|
|
if not os.path.isfile(path):
|
|
missing.append(safe)
|
|
continue
|
|
try:
|
|
os.unlink(path)
|
|
removed.append(safe)
|
|
except OSError:
|
|
missing.append(safe)
|
|
return {"removed": removed, "missing": missing}
|
|
|
|
|
|
def upload_package(image_type: str, uploaded_file, destination_dir: str = "",
|
|
overwrite: bool = False) -> dict:
|
|
"""Save an uploaded package (any extension) into the image's Packages
|
|
dir, and append an entry to packages.json when destination_dir is set
|
|
so it deploys at imaging time."""
|
|
fname = _safe_filename(uploaded_file.filename)
|
|
dst_dir = _packages_dir(image_type)
|
|
os.makedirs(dst_dir, exist_ok=True)
|
|
dst_path = os.path.join(dst_dir, fname)
|
|
if os.path.exists(dst_path) and not overwrite:
|
|
raise FileExistsError(f"{fname} already exists in {PACKAGES_SUBDIR}/")
|
|
uploaded_file.save(dst_path)
|
|
registered = False
|
|
if destination_dir:
|
|
ctrl = fs.control_path(image_type)
|
|
pkg_file = os.path.join(ctrl, "packages.json")
|
|
entries = fs.load_json(pkg_file)
|
|
already = any(
|
|
(e.get("fileName") or e.get("FileName") or "").lower() == fname.lower()
|
|
for e in entries
|
|
)
|
|
if not already:
|
|
entries.append({
|
|
"fileName": fname,
|
|
"destinationDir": destination_dir,
|
|
})
|
|
fs.save_json(pkg_file, entries)
|
|
registered = True
|
|
return {
|
|
"filename": fname,
|
|
"path": dst_path,
|
|
"registered": registered,
|
|
}
|