"""PXE imaging progress tracking. Imaging clients POST coarse-grained status updates to /imaging/status as they progress through WIM apply -> drivers -> first boot -> PPKG -> per-PC shopfloor setup. Each session is keyed by the BIOS serial number (stable across the WinPE -> Windows transition; survives a target hostname change). Storage is one JSON file per serial under IMAGING_DIR. Atomic write via tempfile + rename. Reads merge the new payload into existing state so clients can post partial updates (just the current_stage + log_tail tick). """ from __future__ import annotations import json import os import re import tempfile from datetime import datetime, timezone from typing import Optional import config # How many recent log lines to keep per session. LOG_TAIL_MAX = 50 # Sessions older than this are considered stale and dropped from the dashboard # "active" list. Still readable individually. ACTIVE_WINDOW_HOURS = 6 # Filenames are derived from serial; sanitize to avoid path traversal / # weird filesystem characters. Anything outside [A-Za-z0-9._-] becomes _. _SAFE_SERIAL = re.compile(r"[^A-Za-z0-9._-]") def _ensure_dir(): os.makedirs(config.IMAGING_DIR, exist_ok=True) def _path_for(serial: str) -> str: safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown" return os.path.join(config.IMAGING_DIR, f"{safe}.json") def _now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def update_session(payload: dict) -> dict: """Merge `payload` into the JSON file for payload['serial']. payload must include 'serial'. All other fields are optional; whatever is present overwrites the existing field. log_tail is appended to (capped). Returns the resulting full state. """ serial = (payload.get("serial") or "").strip() if not serial: raise ValueError("payload missing 'serial'") _ensure_dir() path = _path_for(serial) state: dict = {} if os.path.isfile(path): try: with open(path, "r") as f: state = json.load(f) except (json.JSONDecodeError, OSError): state = {} if not state: state = { "serial": serial, "started_at": _now_iso(), "log_tail": [], } # Append any new log lines (preserve old; cap to LOG_TAIL_MAX). new_lines = payload.pop("log_lines", None) if new_lines: if isinstance(new_lines, str): new_lines = [new_lines] tail = list(state.get("log_tail", [])) tail.extend(new_lines) state["log_tail"] = tail[-LOG_TAIL_MAX:] for key, value in payload.items(): if value is None or value == "": continue state[key] = value state["last_updated"] = _now_iso() if "status" not in state: state["status"] = "in_progress" fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json") try: with os.fdopen(fd, "w") as f: json.dump(state, f, indent=2) os.replace(tmp, path) except Exception: try: os.unlink(tmp) except OSError: pass raise return state def list_sessions() -> list[dict]: """Return all sessions sorted by last_updated desc.""" _ensure_dir() out: list[dict] = [] for name in os.listdir(config.IMAGING_DIR): if not name.endswith(".json") or name.startswith(".tmp-"): continue path = os.path.join(config.IMAGING_DIR, name) try: with open(path, "r") as f: out.append(json.load(f)) except (json.JSONDecodeError, OSError): continue out.sort(key=lambda s: s.get("last_updated", ""), reverse=True) return out def get_session(serial: str) -> Optional[dict]: path = _path_for(serial) if not os.path.isfile(path): return None try: with open(path, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): return None def delete_session(serial: str) -> bool: path = _path_for(serial) if not os.path.isfile(path): return False try: os.unlink(path) return True except OSError: return False