Previously the stage indices reflected logical milestones but not the order they fire in. Run-ShopfloorSetup posted idx=1 (start) and idx=4 (PPKG) - but 09-Setup-Keyence (inside per-type loop) ran BETWEEN them and posted idx=5/6. The dashboard then "regressed" from 6 back to 4 when PPKG fired, making it look stuck at the per-type-complete card. New numbering matches actual execution order: 1 - WinPE: PESetup / WIM apply (startnet.cmd) 2 - Run-ShopfloorSetup: starting (Run-ShopfloorSetup.ps1) 3 - 09-Setup-<Type>: starting (per-type) 4 - 09-Setup-<Type>: complete (per-type) 5 - Run-ShopfloorSetup: PPKG enrollment (Run-ShopfloorSetup.ps1) 6 - Run-ShopfloorSetup: handoff to Monitor (Run-ShopfloorSetup.ps1) 7 - Monitor-IntuneProgress: Intune Device ID captured services/imaging_status.py rewind threshold reverts to stage_index <= 1 now that WinPE startnet posts idx=1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
172 lines
5.6 KiB
Python
172 lines
5.6 KiB
Python
"""PXE imaging progress tracking.
|
|
|
|
Imaging clients POST coarse-grained status updates to /imaging/status as
|
|
they progress through WIM apply -> drivers -> first boot -> PPKG -> per-PC
|
|
shopfloor setup. Each session is keyed by the BIOS serial number (stable
|
|
across the WinPE -> Windows transition; survives a target hostname change).
|
|
|
|
Storage is one JSON file per serial under IMAGING_DIR. Atomic write via
|
|
tempfile + rename. Reads merge the new payload into existing state so
|
|
clients can post partial updates (just the current_stage + log_tail tick).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import config
|
|
|
|
# How many recent log lines to keep per session.
|
|
LOG_TAIL_MAX = 50
|
|
# Sessions older than this are considered stale and dropped from the dashboard
|
|
# "active" list. Still readable individually.
|
|
ACTIVE_WINDOW_HOURS = 6
|
|
|
|
# Filenames are derived from serial; sanitize to avoid path traversal /
|
|
# weird filesystem characters. Anything outside [A-Za-z0-9._-] becomes _.
|
|
_SAFE_SERIAL = re.compile(r"[^A-Za-z0-9._-]")
|
|
|
|
|
|
def _ensure_dir():
|
|
os.makedirs(config.IMAGING_DIR, exist_ok=True)
|
|
|
|
|
|
def _path_for(serial: str) -> str:
|
|
safe = _SAFE_SERIAL.sub("_", serial.strip()) or "unknown"
|
|
return os.path.join(config.IMAGING_DIR, f"{safe}.json")
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now().astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def update_session(payload: dict) -> dict:
|
|
"""Merge `payload` into the JSON file for payload['serial'].
|
|
|
|
payload must include 'serial'. All other fields are optional; whatever is
|
|
present overwrites the existing field. log_tail is appended to (capped).
|
|
Returns the resulting full state.
|
|
"""
|
|
serial = (payload.get("serial") or "").strip()
|
|
if not serial:
|
|
raise ValueError("payload missing 'serial'")
|
|
|
|
_ensure_dir()
|
|
path = _path_for(serial)
|
|
|
|
state: dict = {}
|
|
if os.path.isfile(path):
|
|
try:
|
|
with open(path, "r") as f:
|
|
state = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
state = {}
|
|
|
|
# Reimage detection: if the new payload's stage_index <= 2 (WinPE startnet
|
|
# = 2, Run-ShopfloorSetup start = 1) AND that's lower than the cached
|
|
# stage_index, treat as a fresh imaging run on the same bay. Clear
|
|
# log_tail + reset started_at; preserve serial; remember the previous
|
|
# run's last_updated for audit. Without this, a reimage leaves stale
|
|
# "succeeded" / high-idx state visible until the new run progresses
|
|
# past idx 2.
|
|
#
|
|
# Threshold of 2 covers the first signal of a new run: the WinPE-phase
|
|
# status push from startnet.cmd at idx=2 (fires within seconds of PXE
|
|
# menu choice). Previously the threshold was 1, which meant the reset
|
|
# only triggered once Run-ShopfloorSetup ran post-PPKG, ~10-20 minutes
|
|
# into a new run.
|
|
if state:
|
|
try:
|
|
old_idx = int(state.get("stage_index") or 0)
|
|
new_idx = int(payload.get("stage_index") or 0)
|
|
except (TypeError, ValueError):
|
|
old_idx, new_idx = 0, 0
|
|
rewind = new_idx > 0 and new_idx < old_idx and new_idx <= 1
|
|
prev_done = state.get("status") in ("succeeded", "failed")
|
|
if rewind or (prev_done and new_idx > 0 and new_idx <= 1):
|
|
state = {"serial": serial, "previous_run_at": state.get("last_updated"), "log_tail": []}
|
|
|
|
if not state:
|
|
state = {
|
|
"serial": serial,
|
|
"started_at": _now_iso(),
|
|
"log_tail": [],
|
|
}
|
|
elif "started_at" not in state:
|
|
# Fresh state after a rewind - mint a new started_at.
|
|
state["started_at"] = _now_iso()
|
|
|
|
# Append any new log lines (preserve old; cap to LOG_TAIL_MAX).
|
|
new_lines = payload.pop("log_lines", None)
|
|
if new_lines:
|
|
if isinstance(new_lines, str):
|
|
new_lines = [new_lines]
|
|
tail = list(state.get("log_tail", []))
|
|
tail.extend(new_lines)
|
|
state["log_tail"] = tail[-LOG_TAIL_MAX:]
|
|
|
|
for key, value in payload.items():
|
|
if value is None or value == "":
|
|
continue
|
|
state[key] = value
|
|
|
|
state["last_updated"] = _now_iso()
|
|
if "status" not in state:
|
|
state["status"] = "in_progress"
|
|
|
|
fd, tmp = tempfile.mkstemp(dir=config.IMAGING_DIR, prefix=".tmp-", suffix=".json")
|
|
try:
|
|
with os.fdopen(fd, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
os.replace(tmp, path)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
return state
|
|
|
|
|
|
def list_sessions() -> list[dict]:
|
|
"""Return all sessions sorted by last_updated desc."""
|
|
_ensure_dir()
|
|
out: list[dict] = []
|
|
for name in os.listdir(config.IMAGING_DIR):
|
|
if not name.endswith(".json") or name.startswith(".tmp-"):
|
|
continue
|
|
path = os.path.join(config.IMAGING_DIR, name)
|
|
try:
|
|
with open(path, "r") as f:
|
|
out.append(json.load(f))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
out.sort(key=lambda s: s.get("last_updated", ""), reverse=True)
|
|
return out
|
|
|
|
|
|
def get_session(serial: str) -> Optional[dict]:
|
|
path = _path_for(serial)
|
|
if not os.path.isfile(path):
|
|
return None
|
|
try:
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
|
|
def delete_session(serial: str) -> bool:
|
|
path = _path_for(serial)
|
|
if not os.path.isfile(path):
|
|
return False
|
|
try:
|
|
os.unlink(path)
|
|
return True
|
|
except OSError:
|
|
return False
|