services/imaging_status.py - if a new POST arrives with stage_index <= 1 that is lower than the cached stage_index, OR the previous run already finished (status=succeeded|failed), reset the session: clear log_tail, mint a fresh started_at, drop the status field so the in_progress default re-applies. Preserves serial + records the previous run's last_updated under previous_run_at for audit. Without this, a reimage on the same bay would leave a stale 6/8 "succeeded" card visible until the new run progressed past that index. playbook/startnet.cmd - one-line PowerShell POST after the PXE menu choice + enrollment-share mount, before PESetup.exe waits to start. Captures BIOS serial via wmic, MAC via Get-NetAdapter, and posts: stage_index=2, current_stage="WinPE: PESetup / WIM apply". Best-effort; try/catch swallows any network failure so a missing webapp never blocks imaging. PXE clients will now appear on the /imaging dashboard during WinPE phase instead of only post-PPKG. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
164 lines
5.2 KiB
Python
164 lines
5.2 KiB
Python
"""PXE imaging progress tracking.
|
|
|
|
Imaging clients POST coarse-grained status updates to /imaging/status as
|
|
they progress through WIM apply -> drivers -> first boot -> PPKG -> per-PC
|
|
shopfloor setup. Each session is keyed by the BIOS serial number (stable
|
|
across the WinPE -> Windows transition; survives a target hostname change).
|
|
|
|
Storage is one JSON file per serial under IMAGING_DIR. Atomic write via
|
|
tempfile + rename. Reads merge the new payload into existing state so
|
|
clients can post partial updates (just the current_stage + log_tail tick).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import config
|
|
|
|
# How many recent log lines to keep per session.
|
|
LOG_TAIL_MAX = 50
|
|
# Sessions older than this are considered stale and dropped from the dashboard
|
|
# "active" list. Still readable individually.
|
|
ACTIVE_WINDOW_HOURS = 6
|
|
|
|
# Filenames are derived from serial; sanitize to avoid path traversal /
|
|
# weird filesystem characters. Anything outside [A-Za-z0-9._-] becomes _.
|
|
_SAFE_SERIAL = re.compile(r"[^A-Za-z0-9._-]")
|
|
|
|
|
|
def _ensure_dir():
|
|
os.makedirs(config.IMAGING_DIR, exist_ok=True)
|
|
|
|
|
|
def _path_for(serial: str) -> str:
|
|
safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown"
|
|
return os.path.join(config.IMAGING_DIR, f"{safe}.json")
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now().astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def update_session(payload: dict) -> dict:
|
|
"""Merge `payload` into the JSON file for payload['serial'].
|
|
|
|
payload must include 'serial'. All other fields are optional; whatever is
|
|
present overwrites the existing field. log_tail is appended to (capped).
|
|
Returns the resulting full state.
|
|
"""
|
|
serial = (payload.get("serial") or "").strip()
|
|
if not serial:
|
|
raise ValueError("payload missing 'serial'")
|
|
|
|
_ensure_dir()
|
|
path = _path_for(serial)
|
|
|
|
state: dict = {}
|
|
if os.path.isfile(path):
|
|
try:
|
|
with open(path, "r") as f:
|
|
state = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
state = {}
|
|
|
|
# Reimage detection: if the new payload's stage_index is <= 1 and we have
|
|
# an existing session that was further along, treat this as a fresh run.
|
|
# Clear log_tail + reset started_at; preserve serial. Without this, a
|
|
# reimage on the same bay leaves stale "succeeded" / high-idx state on
|
|
# the dashboard until the new run progresses past idx 1.
|
|
if state:
|
|
try:
|
|
old_idx = int(state.get("stage_index") or 0)
|
|
new_idx = int(payload.get("stage_index") or 0)
|
|
except (TypeError, ValueError):
|
|
old_idx, new_idx = 0, 0
|
|
rewind = new_idx > 0 and new_idx < old_idx and new_idx <= 1
|
|
prev_done = state.get("status") in ("succeeded", "failed")
|
|
if rewind or (prev_done and new_idx > 0 and new_idx <= 1):
|
|
state = {"serial": serial, "previous_run_at": state.get("last_updated"), "log_tail": []}
|
|
|
|
if not state:
|
|
state = {
|
|
"serial": serial,
|
|
"started_at": _now_iso(),
|
|
"log_tail": [],
|
|
}
|
|
elif "started_at" not in state:
|
|
# Fresh state after a rewind - mint a new started_at.
|
|
state["started_at"] = _now_iso()
|
|
|
|
# Append any new log lines (preserve old; cap to LOG_TAIL_MAX).
|
|
new_lines = payload.pop("log_lines", None)
|
|
if new_lines:
|
|
if isinstance(new_lines, str):
|
|
new_lines = [new_lines]
|
|
tail = list(state.get("log_tail", []))
|
|
tail.extend(new_lines)
|
|
state["log_tail"] = tail[-LOG_TAIL_MAX:]
|
|
|
|
for key, value in payload.items():
|
|
if value is None or value == "":
|
|
continue
|
|
state[key] = value
|
|
|
|
state["last_updated"] = _now_iso()
|
|
if "status" not in state:
|
|
state["status"] = "in_progress"
|
|
|
|
fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json")
|
|
try:
|
|
with os.fdopen(fd, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
os.replace(tmp, path)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
return state
|
|
|
|
|
|
def list_sessions() -> list[dict]:
|
|
"""Return all sessions sorted by last_updated desc."""
|
|
_ensure_dir()
|
|
out: list[dict] = []
|
|
for name in os.listdir(config.IMAGING_DIR):
|
|
if not name.endswith(".json") or name.startswith(".tmp-"):
|
|
continue
|
|
path = os.path.join(config.IMAGING_DIR, name)
|
|
try:
|
|
with open(path, "r") as f:
|
|
out.append(json.load(f))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
out.sort(key=lambda s: s.get("last_updated", ""), reverse=True)
|
|
return out
|
|
|
|
|
|
def get_session(serial: str) -> Optional[dict]:
|
|
path = _path_for(serial)
|
|
if not os.path.isfile(path):
|
|
return None
|
|
try:
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
|
|
def delete_session(serial: str) -> bool:
|
|
path = _path_for(serial)
|
|
if not os.path.isfile(path):
|
|
return False
|
|
try:
|
|
os.unlink(path)
|
|
return True
|
|
except OSError:
|
|
return False
|