"""PXE imaging progress tracking. Imaging clients POST coarse-grained status updates to /imaging/status as they progress through WIM apply -> drivers -> first boot -> PPKG -> per-PC shopfloor setup. Each session is keyed by the BIOS serial number (stable across the WinPE -> Windows transition; survives a target hostname change). Storage is one JSON file per serial under IMAGING_DIR. Atomic write via tempfile + rename. Reads merge the new payload into existing state so clients can post partial updates (just the current_stage + log_tail tick). """ from __future__ import annotations import json import os import re import tempfile from datetime import datetime, timezone from typing import Optional import config # How many recent log lines to keep per session. LOG_TAIL_MAX = 50 # Sessions older than this are considered stale and dropped from the dashboard # "active" list. Still readable individually. ACTIVE_WINDOW_HOURS = 6 # Filenames are derived from serial; sanitize to avoid path traversal / # weird filesystem characters. Anything outside [A-Za-z0-9._-] becomes _. _SAFE_SERIAL = re.compile(r"[^A-Za-z0-9._-]") def _ensure_dir(): os.makedirs(config.IMAGING_DIR, exist_ok=True) def _path_for(serial: str) -> str: safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown" return os.path.join(config.IMAGING_DIR, f"{safe}.json") def _now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def update_session(payload: dict) -> dict: """Merge `payload` into the JSON file for payload['serial']. payload must include 'serial'. All other fields are optional; whatever is present overwrites the existing field. log_tail is appended to (capped). Returns the resulting full state. """ serial = (payload.get("serial") or "").strip() if not serial: raise ValueError("payload missing 'serial'") _ensure_dir() path = _path_for(serial) state: dict = {} if os.path.isfile(path): try: with open(path, "r") as f: state = json.load(f) except (json.JSONDecodeError, OSError): state = {} # Reimage detection: if the new payload's stage_index <= 2 (WinPE startnet # = 2, Run-ShopfloorSetup start = 1) AND that's lower than the cached # stage_index, treat as a fresh imaging run on the same bay. Clear # log_tail + reset started_at; preserve serial; remember the previous # run's last_updated for audit. Without this, a reimage leaves stale # "succeeded" / high-idx state visible until the new run progresses # past idx 2. # # Threshold of 2 covers the first signal of a new run: the WinPE-phase # status push from startnet.cmd at idx=2 (fires within seconds of PXE # menu choice). Previously the threshold was 1, which meant the reset # only triggered once Run-ShopfloorSetup ran post-PPKG, ~10-20 minutes # into a new run. if state: try: old_idx = int(state.get("stage_index") or 0) new_idx = int(payload.get("stage_index") or 0) except (TypeError, ValueError): old_idx, new_idx = 0, 0 rewind = new_idx > 0 and new_idx < old_idx and new_idx <= 1 prev_done = state.get("status") in ("succeeded", "failed") if rewind or (prev_done and new_idx > 0 and new_idx <= 1): state = {"serial": serial, "previous_run_at": state.get("last_updated"), "log_tail": []} if not state: state = { "serial": serial, "started_at": _now_iso(), "log_tail": [], } elif "started_at" not in state: # Fresh state after a rewind - mint a new started_at. state["started_at"] = _now_iso() # Append any new log lines (preserve old; cap to LOG_TAIL_MAX). new_lines = payload.pop("log_lines", None) if new_lines: if isinstance(new_lines, str): new_lines = [new_lines] tail = list(state.get("log_tail", [])) tail.extend(new_lines) state["log_tail"] = tail[-LOG_TAIL_MAX:] for key, value in payload.items(): if value is None or value == "": continue state[key] = value state["last_updated"] = _now_iso() if "status" not in state: state["status"] = "in_progress" fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json") try: with os.fdopen(fd, "w") as f: json.dump(state, f, indent=2) os.replace(tmp, path) except Exception: try: os.unlink(tmp) except OSError: pass raise return state def list_sessions() -> list[dict]: """Return all sessions sorted by last_updated desc.""" _ensure_dir() out: list[dict] = [] for name in os.listdir(config.IMAGING_DIR): if not name.endswith(".json") or name.startswith(".tmp-"): continue path = os.path.join(config.IMAGING_DIR, name) try: with open(path, "r") as f: out.append(json.load(f)) except (json.JSONDecodeError, OSError): continue out.sort(key=lambda s: s.get("last_updated", ""), reverse=True) return out def get_session(serial: str) -> Optional[dict]: path = _path_for(serial) if not os.path.isfile(path): return None try: with open(path, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): return None def delete_session(serial: str) -> bool: path = _path_for(serial) if not os.path.isfile(path): return False try: os.unlink(path) return True except OSError: return False