Imaging dashboard
- services/imaging_log_tail.py: parses dnsmasq leases, Apache access log,
Samba per-host log files, and dnsmasq syslog (DHCP/TFTP). Synthesizes
inferred sessions keyed by MAC for bays that have only touched the boot
chain but not yet pushed to /imaging/status. Active window 90 min.
- imaging_status.list_sessions() merges inferred sessions into the dashboard
list. Real client-pushed sessions win for the same MAC.
- imaging_status: stage_history field tracks every stage transition (capped
30); sidecar .log file per serial records every log_lines push uncapped
(read_full_log() caps detail-page response to 1 MB).
- delete_session/delete_all_sessions clean up sidecar .log too.
- New SSE endpoint /imaging/stream emits a session-list hash every 5s.
Client fetches /imaging/tiles (HTML partial) on hash change and swaps
#imaging-tiles innerHTML. Polling fallback at 15s if SSE drops.
- Tile-swap preserves scroll, filter input, expanded state via localStorage,
and any LAPS input the operator is mid-pasting (swap skipped when a
laps-input is focused).
- imaging.html: removed 15s location.reload(). Added live-status dot in
header (gray idle / green SSE connected / red SSE lost).
- _imaging_tiles.html: shared partial used by both /imaging full render and
/imaging/tiles SSE refresh. Inferred bays render with yellow border +
log-inferred badge + no progress bar (stage_index inference is coarse).
- imaging_detail.html (new): per-bay forensics page at /imaging/session/
<serial>. Session metadata grid, stage timeline table, full sidecar log
with truncation indicator, Copy-support-summary button. Linked from each
client-pushed tile.
- qr-render.js exposes window.renderAllQRs() so the SSE swap can re-render
Intune device-ID QRs in the swapped-in tiles.
Image management
- services/image_registry.py: JSON registry of image types at
{SAMBA_SHARE}/image-registry.json. Bootstraps from baked-in
config.IMAGE_TYPES on first run. create/clone/delete/rename_friendly
mutate the file then call reload() which rewrites config.IMAGE_TYPES +
config.FRIENDLY_NAMES in place. Sidebar reflects on next request.
- app.py routes: /images/new, /images/<t>/clone, /images/<t>/delete (with
optional content-wipe checkbox), /images/<t>/rename.
- dashboard.html: + New image type button + Clone/Delete per row, all in
Bootstrap modals with confirmation copy.
- Clone copies Deploy/ tree but preserves symlinks to shared dirs (Out-of-
box Drivers, Operating Systems, Packages) so disk usage stays low.
- Delete with content checked unlinks symlinks (does not follow into shared
dirs).
Driver / package upload + orphan adoption
- services/images.py: upload_driver, adopt_orphan, remove_orphans,
upload_package. Filename sanitization blocks path traversal.
- app.py routes: /images/<t>/drivers/upload, /images/<t>/drivers/adopt,
/images/<t>/drivers/orphans/delete, /images/<t>/packages/upload.
- image_config.html: Upload .zip button + modal on Drivers section. Orphan
drivers card-footer rebuilt as interactive list with per-row Adopt inline
form (family + destinationDir inputs) and bulk select+delete.
- Upload .zip on Packages section with optional destinationDir field that
appends a packages.json entry.
Configuration
- config.py: new env vars DNSMASQ_LEASES, APACHE_ACCESS_LOG, SAMBA_LOG_DIR,
DNSMASQ_SYSLOG for the log-tailer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
289 lines
9.9 KiB
Python
289 lines
9.9 KiB
Python
"""PXE imaging progress tracking.
|
|
|
|
Imaging clients POST coarse-grained status updates to /imaging/status as
|
|
they progress through WIM apply -> drivers -> first boot -> PPKG -> per-PC
|
|
shopfloor setup. Each session is keyed by the BIOS serial number (stable
|
|
across the WinPE -> Windows transition; survives a target hostname change).
|
|
|
|
Storage is one JSON file per serial under IMAGING_DIR. Atomic write via
|
|
tempfile + rename. Reads merge the new payload into existing state so
|
|
clients can post partial updates (just the current_stage + log_tail tick).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import config
|
|
|
|
# How many recent log lines to keep per session in the JSON (dashboard tile
|
|
# quick view). The full unbounded log is appended to a sidecar .log file
|
|
# next to the .json so the detail page can show everything.
|
|
LOG_TAIL_MAX = 50
|
|
# Cap how many stage transitions we record per session (bounds JSON size on
|
|
# pathological loops; 30 covers more than any real run uses).
|
|
STAGE_HISTORY_MAX = 30
|
|
# Detail page caps how many bytes of the sidecar .log it sends to the
|
|
# browser, to avoid blowing up the response for a runaway log.
|
|
DETAIL_LOG_MAX_BYTES = 1024 * 1024 # 1 MB
|
|
# Sessions older than this are considered stale and dropped from the dashboard
|
|
# "active" list. Still readable individually.
|
|
ACTIVE_WINDOW_HOURS = 6
|
|
|
|
# Filenames are derived from serial; sanitize to avoid path traversal /
|
|
# weird filesystem characters. Anything outside [A-Za-z0-9._-] becomes _.
|
|
_SAFE_SERIAL = re.compile(r"[^A-Za-z0-9._-]")
|
|
|
|
|
|
def _ensure_dir():
|
|
os.makedirs(config.IMAGING_DIR, exist_ok=True)
|
|
|
|
|
|
def _path_for(serial: str) -> str:
|
|
safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown"
|
|
return os.path.join(config.IMAGING_DIR, f"{safe}.json")
|
|
|
|
|
|
def _log_path_for(serial: str) -> str:
|
|
safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown"
|
|
return os.path.join(config.IMAGING_DIR, f"{safe}.log")
|
|
|
|
|
|
def _append_full_log(serial: str, lines):
|
|
"""Best-effort append to the per-serial sidecar log file. Each line is
|
|
timestamped. Failures are swallowed (status-tracking is not the
|
|
authoritative log source; the .log is a convenience for the detail
|
|
page)."""
|
|
if not lines:
|
|
return
|
|
if isinstance(lines, str):
|
|
lines = [lines]
|
|
try:
|
|
_ensure_dir()
|
|
with open(_log_path_for(serial), "a") as f:
|
|
ts = _now_iso()
|
|
for line in lines:
|
|
f.write(f"{ts} {line}\n")
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now().astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def update_session(payload: dict) -> dict:
|
|
"""Merge `payload` into the JSON file for payload['serial'].
|
|
|
|
payload must include 'serial'. All other fields are optional; whatever is
|
|
present overwrites the existing field. log_tail is appended to (capped).
|
|
Returns the resulting full state.
|
|
"""
|
|
serial = (payload.get("serial") or "").strip()
|
|
if not serial:
|
|
raise ValueError("payload missing 'serial'")
|
|
|
|
_ensure_dir()
|
|
path = _path_for(serial)
|
|
|
|
state: dict = {}
|
|
if os.path.isfile(path):
|
|
try:
|
|
with open(path, "r") as f:
|
|
state = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
state = {}
|
|
|
|
# Reimage detection: if the new payload's stage_index <= 2 (WinPE startnet
|
|
# = 2, Run-ShopfloorSetup start = 1) AND that's lower than the cached
|
|
# stage_index, treat as a fresh imaging run on the same bay. Clear
|
|
# log_tail + reset started_at; preserve serial; remember the previous
|
|
# run's last_updated for audit. Without this, a reimage leaves stale
|
|
# "succeeded" / high-idx state visible until the new run progresses
|
|
# past idx 2.
|
|
#
|
|
# Threshold of 2 covers the first signal of a new run: the WinPE-phase
|
|
# status push from startnet.cmd at idx=2 (fires within seconds of PXE
|
|
# menu choice). Previously the threshold was 1, which meant the reset
|
|
# only triggered once Run-ShopfloorSetup ran post-PPKG, ~10-20 minutes
|
|
# into a new run.
|
|
if state:
|
|
try:
|
|
old_idx = int(state.get("stage_index") or 0)
|
|
new_idx = int(payload.get("stage_index") or 0)
|
|
except (TypeError, ValueError):
|
|
old_idx, new_idx = 0, 0
|
|
rewind = new_idx > 0 and new_idx < old_idx and new_idx <= 1
|
|
prev_done = state.get("status") in ("succeeded", "failed")
|
|
if rewind or (prev_done and new_idx > 0 and new_idx <= 1):
|
|
state = {"serial": serial, "previous_run_at": state.get("last_updated"), "log_tail": []}
|
|
|
|
if not state:
|
|
state = {
|
|
"serial": serial,
|
|
"started_at": _now_iso(),
|
|
"log_tail": [],
|
|
}
|
|
elif "started_at" not in state:
|
|
# Fresh state after a rewind - mint a new started_at.
|
|
state["started_at"] = _now_iso()
|
|
|
|
# Append any new log lines: capped tail in the JSON for the dashboard
|
|
# quick view, and unbounded append to the sidecar .log for the detail
|
|
# page.
|
|
new_lines = payload.pop("log_lines", None)
|
|
if new_lines:
|
|
if isinstance(new_lines, str):
|
|
new_lines = [new_lines]
|
|
tail = list(state.get("log_tail", []))
|
|
tail.extend(new_lines)
|
|
state["log_tail"] = tail[-LOG_TAIL_MAX:]
|
|
_append_full_log(serial, new_lines)
|
|
|
|
# Stage history: record a transition row whenever stage_index increases
|
|
# or status changes. Bounded to STAGE_HISTORY_MAX so a bouncing client
|
|
# can't blow up the JSON. The dashboard tile only needs current state;
|
|
# the detail page renders the timeline from this list.
|
|
history = list(state.get("stage_history", []))
|
|
try:
|
|
new_idx = int(payload.get("stage_index") or 0)
|
|
except (TypeError, ValueError):
|
|
new_idx = 0
|
|
try:
|
|
old_idx = int(state.get("stage_index") or 0)
|
|
except (TypeError, ValueError):
|
|
old_idx = 0
|
|
new_status = payload.get("status") or state.get("status") or "in_progress"
|
|
old_status = state.get("status") or ""
|
|
stage_changed = new_idx > old_idx
|
|
status_changed = new_status != old_status and new_status in ("succeeded", "failed")
|
|
if stage_changed or status_changed or not history:
|
|
history.append({
|
|
"ts": _now_iso(),
|
|
"stage_index": new_idx or old_idx,
|
|
"current_stage": payload.get("current_stage") or state.get("current_stage", ""),
|
|
"status": new_status,
|
|
})
|
|
state["stage_history"] = history[-STAGE_HISTORY_MAX:]
|
|
|
|
for key, value in payload.items():
|
|
if value is None or value == "":
|
|
continue
|
|
state[key] = value
|
|
|
|
state["last_updated"] = _now_iso()
|
|
if "status" not in state:
|
|
state["status"] = "in_progress"
|
|
|
|
fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json")
|
|
try:
|
|
with os.fdopen(fd, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
os.replace(tmp, path)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
return state
|
|
|
|
|
|
def list_sessions(include_inferred: bool = True) -> list[dict]:
|
|
"""Return all sessions sorted by last_updated desc.
|
|
|
|
When include_inferred is True (default for the dashboard), also pull
|
|
server-side log-tail evidence and append synthesized sessions for any
|
|
bay that has touched DHCP/TFTP/boot.wim but not yet pushed status.
|
|
Real client-pushed sessions always win for the same MAC.
|
|
"""
|
|
_ensure_dir()
|
|
out: list[dict] = []
|
|
for name in os.listdir(config.IMAGING_DIR):
|
|
if not name.endswith(".json") or name.startswith(".tmp-"):
|
|
continue
|
|
path = os.path.join(config.IMAGING_DIR, name)
|
|
try:
|
|
with open(path, "r") as f:
|
|
out.append(json.load(f))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
if include_inferred:
|
|
from services import imaging_log_tail
|
|
out = imaging_log_tail.merge_with_client_sessions(out)
|
|
else:
|
|
out.sort(key=lambda s: s.get("last_updated", ""), reverse=True)
|
|
return out
|
|
|
|
|
|
def get_session(serial: str) -> Optional[dict]:
|
|
path = _path_for(serial)
|
|
if not os.path.isfile(path):
|
|
return None
|
|
try:
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
|
|
def read_full_log(serial: str, max_bytes: int = DETAIL_LOG_MAX_BYTES) -> tuple[str, bool]:
|
|
"""Return (text, truncated). Reads the trailing max_bytes of the sidecar
|
|
.log. `truncated` is True when the file was larger than max_bytes and
|
|
the leading slice was dropped."""
|
|
path = _log_path_for(serial)
|
|
try:
|
|
size = os.path.getsize(path)
|
|
except OSError:
|
|
return ("", False)
|
|
truncated = size > max_bytes
|
|
start = max(0, size - max_bytes)
|
|
try:
|
|
with open(path, "rb") as f:
|
|
f.seek(start)
|
|
data = f.read()
|
|
except OSError:
|
|
return ("", False)
|
|
text = data.decode("utf-8", errors="replace")
|
|
if truncated:
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
return (text, truncated)
|
|
|
|
|
|
def delete_session(serial: str) -> bool:
|
|
path = _path_for(serial)
|
|
if not os.path.isfile(path):
|
|
return False
|
|
try:
|
|
os.unlink(path)
|
|
except OSError:
|
|
return False
|
|
# Best-effort sidecar cleanup.
|
|
try:
|
|
os.unlink(_log_path_for(serial))
|
|
except OSError:
|
|
pass
|
|
return True
|
|
|
|
|
|
def delete_all_sessions() -> int:
|
|
"""Wipe every per-bay JSON + sidecar .log in IMAGING_DIR. Returns count
|
|
of JSON files removed."""
|
|
_ensure_dir()
|
|
removed = 0
|
|
for fn in os.listdir(config.IMAGING_DIR):
|
|
if fn.endswith(".json") or fn.endswith(".log"):
|
|
try:
|
|
os.unlink(os.path.join(config.IMAGING_DIR, fn))
|
|
except OSError:
|
|
continue
|
|
if fn.endswith(".json"):
|
|
removed += 1
|
|
return removed
|