#!/usr/bin/env python3 """monitor-fleet-status.py Reads status.json writebacks from the shopfloor enforcer output tree and flags: - PCs that haven't checked in within a stale-threshold window - PCs with any failed scope from their last run - Expected-vs-installed version mismatches (drift) when --manifests is supplied Designed to run as a cron job on the PXE server (or any box with read access to the share). Prints plaintext report to stdout; non-zero exit code when anything needs attention so it's trivial to wrap in an alerting script. Usage: ./monitor-fleet-status.py --status-root /path/to/_outputs/logs ./monitor-fleet-status.py --status-root /.../_outputs/logs --stale-hours 24 ./monitor-fleet-status.py --status-root /.../_outputs/logs \\ --manifests /.../common/manifest.json /.../cmm/manifest.json Typical cron (runs hourly, mails the root user on any output): 0 * * * * camp /home/camp/bin/monitor-fleet-status.py \\ --status-root /home/camp/pxe-images/tsgwp00525-v2/shared/dt/shopfloor/_outputs/logs \\ --stale-hours 24 2>&1 | tail -100 """ from __future__ import annotations import argparse import datetime as dt import fnmatch import json import pathlib import sys from typing import Any, Iterable def load_json(path: pathlib.Path) -> dict[str, Any] | None: try: return json.loads(path.read_text()) except Exception as e: print(f"[!] {path}: parse failed: {e}", file=sys.stderr) return None def age_hours(iso_utc: str) -> float | None: try: t = dt.datetime.fromisoformat(iso_utc.replace('Z', '+00:00')) now = dt.datetime.now(dt.timezone.utc) return (now - t).total_seconds() / 3600.0 except Exception: return None def load_manifest_expectations(paths: Iterable[pathlib.Path]) -> list[dict[str, Any]]: """Load manifest entries with enough metadata to know which PCs each entry should apply to. Returns a list of dicts, one per entry that has a DetectionValue: { key: "scope/Name", expected: "...", scope: "common||-", pctypes: [...], target_hostnames: [...] } Scope comes from the manifest file's parent directory name and is treated as an implicit PC-type filter (parallels the lib's per-scope dispatch in GE-Enforce.ps1). """ out: list[dict[str, Any]] = [] for p in paths: m = load_json(p) if not m: continue scope = p.parent.name for app in m.get('Applications', []): name = app.get('Name') val = app.get('DetectionValue') if not (name and val): continue out.append({ 'key': f"{scope}/{name}", 'expected': val, 'scope': scope, 'pctypes': app.get('PCTypes') or [], 'target_hostnames': app.get('TargetHostnames') or [], }) return out def scope_applies_to_host(scope: str, pc_type: str, pc_sub_type: str) -> bool: """Mirror GE-Enforce.ps1's per-scope dispatch: common -> applied to every PC type -> only when pc-type.txt matches - -> only when pc-type matches AND subtype matches Case-insensitive. """ s = scope.lower() if s in ('common', ''): return True if '-' in s: t, sub = s.split('-', 1) return (t == pc_type and sub == pc_sub_type) return s == pc_type def entry_applies_to_host(entry: dict[str, Any], pc_type: str | None, pc_sub_type: str | None, hostname: str) -> bool: """Mirror the lib's entry-applies filter: scope + PCTypes + TargetHostnames, all ANDed. Drift checks only flag entries that should have actually been applied on this PC. """ pc_type = (pc_type or '').lower() pc_sub_type = (pc_sub_type or '').lower() hostname_lc = hostname.lower() # Scope filter: per-type manifests are implicitly scoped by the dir name. if not scope_applies_to_host(entry.get('scope', ''), pc_type, pc_sub_type): return False # PCTypes filter (explicit; applies within a scope): if set, PC must match. pctypes = entry.get('pctypes') or [] if pctypes: if not pc_type: return False matched = False for t in pctypes: t_lc = t.lower() if t_lc == '*': matched = True; break if t_lc == pc_type: matched = True; break if pc_sub_type and t_lc == f"{pc_type}-{pc_sub_type}": matched = True; break if not matched: return False # TargetHostnames filter: if set, hostname must match exact or glob. target_hosts = entry.get('target_hostnames') or [] if target_hosts: matched = False for h in target_hosts: h_lc = h.lower() if h_lc == hostname_lc: matched = True; break if fnmatch.fnmatch(hostname_lc, h_lc): matched = True; break if not matched: return False return True def main() -> int: ap = argparse.ArgumentParser() ap.add_argument('--status-root', required=True, help='Root path like /_outputs/logs/') ap.add_argument('--stale-hours', type=float, default=24.0, help='Warn if a PC hasn\'t checked in in this many hours (default 24)') ap.add_argument('--manifests', nargs='*', type=pathlib.Path, default=[], help='Optional manifest paths; when set, drift between manifest ' 'DetectionValue and PC-reported installedVersion is flagged.') args = ap.parse_args() root = pathlib.Path(args.status_root) if not root.is_dir(): print(f"ERROR: status-root not found: {root}", file=sys.stderr) return 2 expectations = load_manifest_expectations(args.manifests) issues = 0 seen = 0 stale = [] failed = [] drift = [] for host_dir in sorted(p for p in root.iterdir() if p.is_dir()): status_file = host_dir / 'status.json' if not status_file.exists(): continue st = load_json(status_file) if not st: continue host = st.get('hostname') or host_dir.name pc_type = st.get('pcType') sub_type = st.get('pcSubType') seen += 1 # --- stale --- hrs = age_hours(st.get('lastCheckIn', '')) if hrs is None: stale.append((host, 'unparseable timestamp')) issues += 1 elif hrs > args.stale_hours: stale.append((host, f'{hrs:.1f}h since last check-in (> {args.stale_hours}h)')) issues += 1 # --- per-scope failures --- for scope in (st.get('scopesProcessed') or []): if (scope.get('ExitCode') or 0) != 0: failed.append((host, scope.get('Label'), scope.get('ExitCode'))) issues += 1 # --- version drift --- # Only check entries that should have applied to this PC. Entries # with PCTypes or TargetHostnames filters that exclude this host # are legitimately not installed and must not be flagged as drift. if expectations: installed = st.get('installedVersions', {}) or {} for entry in expectations: if not entry_applies_to_host(entry, pc_type, sub_type, host): continue key = entry['key'] want = entry['expected'] got = installed.get(key) if got is None: drift.append((host, key, 'missing', want)) issues += 1 elif str(got).upper() != str(want).upper(): drift.append((host, key, got, want)) issues += 1 # --- report --- print(f"Fleet status monitor - scanned {seen} host(s) under {root}") print(f" stale threshold: {args.stale_hours}h") if args.manifests: print(f" drift against: {', '.join(str(p) for p in args.manifests)}") print() if not issues: print('All checked-in hosts are healthy.') return 0 if stale: print(f"STALE CHECK-INS ({len(stale)}):") for host, msg in stale: print(f" {host}: {msg}") print() if failed: print(f"SCOPE FAILURES ({len(failed)}):") for host, label, rc in failed: print(f" {host}: scope '{label}' exited {rc}") print() if drift: print(f"VERSION DRIFT ({len(drift)}):") for host, key, got, want in drift: print(f" {host}: {key} got={got} want={want}") print() print(f"Total issues: {issues}") return 1 if __name__ == '__main__': sys.exit(main())