"""PXE imaging progress tracking. Imaging clients POST coarse-grained status updates to /imaging/status as they progress through WIM apply -> drivers -> first boot -> PPKG -> per-PC shopfloor setup. Each session is keyed by the BIOS serial number (stable across the WinPE -> Windows transition; survives a target hostname change). Storage is one JSON file per serial under IMAGING_DIR. Atomic write via tempfile + rename. Reads merge the new payload into existing state so clients can post partial updates (just the current_stage + log_tail tick). """ from __future__ import annotations import json import os import re import tempfile from datetime import datetime, timezone from typing import Optional import config # How many recent log lines to keep per session in the JSON (dashboard tile # quick view). The full unbounded log is appended to a sidecar .log file # next to the .json so the detail page can show everything. LOG_TAIL_MAX = 50 # Cap how many stage transitions we record per session (bounds JSON size on # pathological loops; 30 covers more than any real run uses). STAGE_HISTORY_MAX = 30 # Detail page caps how many bytes of the sidecar .log it sends to the # browser, to avoid blowing up the response for a runaway log. DETAIL_LOG_MAX_BYTES = 1024 * 1024 # 1 MB # Sessions older than this are considered stale and dropped from the dashboard # "active" list. Still readable individually. ACTIVE_WINDOW_HOURS = 6 # Filenames are derived from serial; sanitize to avoid path traversal / # weird filesystem characters. Anything outside [A-Za-z0-9._-] becomes _. _SAFE_SERIAL = re.compile(r"[^A-Za-z0-9._-]") def _ensure_dir(): os.makedirs(config.IMAGING_DIR, exist_ok=True) def _path_for(serial: str) -> str: safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown" return os.path.join(config.IMAGING_DIR, f"{safe}.json") def _log_path_for(serial: str) -> str: safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown" return os.path.join(config.IMAGING_DIR, f"{safe}.log") def _append_full_log(serial: str, lines): """Best-effort append to the per-serial sidecar log file. Each line is timestamped. Failures are swallowed (status-tracking is not the authoritative log source; the .log is a convenience for the detail page).""" if not lines: return if isinstance(lines, str): lines = [lines] try: _ensure_dir() with open(_log_path_for(serial), "a") as f: ts = _now_iso() for line in lines: f.write(f"{ts} {line}\n") except OSError: pass def _now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def update_session(payload: dict) -> dict: """Merge `payload` into the JSON file for payload['serial']. payload must include 'serial'. All other fields are optional; whatever is present overwrites the existing field. log_tail is appended to (capped). Returns the resulting full state. """ serial = (payload.get("serial") or "").strip() if not serial: raise ValueError("payload missing 'serial'") _ensure_dir() path = _path_for(serial) state: dict = {} if os.path.isfile(path): try: with open(path, "r") as f: state = json.load(f) except (json.JSONDecodeError, OSError): state = {} # Reimage detection: if the new payload's stage_index <= 2 (WinPE startnet # = 2, Run-ShopfloorSetup start = 1) AND that's lower than the cached # stage_index, treat as a fresh imaging run on the same bay. Clear # log_tail + reset started_at; preserve serial; remember the previous # run's last_updated for audit. Without this, a reimage leaves stale # "succeeded" / high-idx state visible until the new run progresses # past idx 2. # # Threshold of 2 covers the first signal of a new run: the WinPE-phase # status push from startnet.cmd at idx=2 (fires within seconds of PXE # menu choice). Previously the threshold was 1, which meant the reset # only triggered once Run-ShopfloorSetup ran post-PPKG, ~10-20 minutes # into a new run. if state: try: old_idx = int(state.get("stage_index") or 0) new_idx = int(payload.get("stage_index") or 0) except (TypeError, ValueError): old_idx, new_idx = 0, 0 rewind = new_idx > 0 and new_idx < old_idx and new_idx <= 1 prev_done = state.get("status") in ("succeeded", "failed") if rewind or (prev_done and new_idx > 0 and new_idx <= 1): state = {"serial": serial, "previous_run_at": state.get("last_updated"), "log_tail": []} if not state: state = { "serial": serial, "started_at": _now_iso(), "log_tail": [], } elif "started_at" not in state: # Fresh state after a rewind - mint a new started_at. state["started_at"] = _now_iso() # Append any new log lines: capped tail in the JSON for the dashboard # quick view, and unbounded append to the sidecar .log for the detail # page. new_lines = payload.pop("log_lines", None) if new_lines: if isinstance(new_lines, str): new_lines = [new_lines] tail = list(state.get("log_tail", [])) tail.extend(new_lines) state["log_tail"] = tail[-LOG_TAIL_MAX:] _append_full_log(serial, new_lines) # Stage history: record a transition row whenever stage_index increases # or status changes. Bounded to STAGE_HISTORY_MAX so a bouncing client # can't blow up the JSON. The dashboard tile only needs current state; # the detail page renders the timeline from this list. history = list(state.get("stage_history", [])) try: new_idx = int(payload.get("stage_index") or 0) except (TypeError, ValueError): new_idx = 0 try: old_idx = int(state.get("stage_index") or 0) except (TypeError, ValueError): old_idx = 0 new_status = payload.get("status") or state.get("status") or "in_progress" old_status = state.get("status") or "" stage_changed = new_idx > old_idx status_changed = new_status != old_status and new_status in ("succeeded", "failed") if stage_changed or status_changed or not history: history.append({ "ts": _now_iso(), "stage_index": new_idx or old_idx, "current_stage": payload.get("current_stage") or state.get("current_stage", ""), "status": new_status, }) state["stage_history"] = history[-STAGE_HISTORY_MAX:] for key, value in payload.items(): if value is None or value == "": continue state[key] = value state["last_updated"] = _now_iso() if "status" not in state: state["status"] = "in_progress" fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json") try: with os.fdopen(fd, "w") as f: json.dump(state, f, indent=2) os.replace(tmp, path) except Exception: try: os.unlink(tmp) except OSError: pass raise return state def list_sessions(include_inferred: bool = True) -> list[dict]: """Return all sessions sorted by last_updated desc. When include_inferred is True (default for the dashboard), also pull server-side log-tail evidence and append synthesized sessions for any bay that has touched DHCP/TFTP/boot.wim but not yet pushed status. Real client-pushed sessions always win for the same MAC. """ _ensure_dir() out: list[dict] = [] for name in os.listdir(config.IMAGING_DIR): if not name.endswith(".json") or name.startswith(".tmp-"): continue path = os.path.join(config.IMAGING_DIR, name) try: with open(path, "r") as f: out.append(json.load(f)) except (json.JSONDecodeError, OSError): continue if include_inferred: from services import imaging_log_tail out = imaging_log_tail.merge_with_client_sessions(out) else: out.sort(key=lambda s: s.get("last_updated", ""), reverse=True) return out def get_session(serial: str) -> Optional[dict]: path = _path_for(serial) if not os.path.isfile(path): return None try: with open(path, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): return None def read_full_log(serial: str, max_bytes: int = DETAIL_LOG_MAX_BYTES) -> tuple[str, bool]: """Return (text, truncated). Reads the trailing max_bytes of the sidecar .log. `truncated` is True when the file was larger than max_bytes and the leading slice was dropped.""" path = _log_path_for(serial) try: size = os.path.getsize(path) except OSError: return ("", False) truncated = size > max_bytes start = max(0, size - max_bytes) try: with open(path, "rb") as f: f.seek(start) data = f.read() except OSError: return ("", False) text = data.decode("utf-8", errors="replace") if truncated: nl = text.find("\n") if nl >= 0: text = text[nl + 1:] return (text, truncated) def delete_session(serial: str) -> bool: path = _path_for(serial) if not os.path.isfile(path): return False try: os.unlink(path) except OSError: return False # Best-effort sidecar cleanup. try: os.unlink(_log_path_for(serial)) except OSError: pass return True def delete_all_sessions() -> int: """Wipe every per-bay JSON + sidecar .log in IMAGING_DIR. Returns count of JSON files removed.""" _ensure_dir() removed = 0 for fn in os.listdir(config.IMAGING_DIR): if fn.endswith(".json") or fn.endswith(".log"): try: os.unlink(os.path.join(config.IMAGING_DIR, fn)) except OSError: continue if fn.endswith(".json"): removed += 1 return removed