Files
pxe-server/playbook/shopfloor-setup/common/monitor-fleet-status.py
cproudlock eb68793e79 Stage 2a: unified GE-Enforce framework + share-root mirror
Consolidates per-type enforcers (CMM, Keyence, Machine, Common, Acrobat)
into one dispatcher driven by pc-type.txt + site-config and a share-side
manifest layout. Same share is now the single source of truth for routine
software updates without re-imaging.

Runtime:
  common/GE-Enforce.ps1           SYSTEM scheduled task. Reads
                                   common/manifest.json plus optional
                                   <pcType>/manifest.json and
                                   <pcType-subType>/manifest.json.
                                   Dispatches each entry through the lib.
                                   Writes _outputs/logs/<hostname>/status.json
                                   on the share after each cycle for fleet
                                   monitoring.
  common/Register-GEEnforce.ps1   Task registration. Triggers: AtLogOn +
                                   every 5 min (jittered per-PC from
                                   hostname hash) + daily at 05:45,
                                   13:45, 21:45 EST shift windows.
                                   Unregisters legacy per-type tasks on
                                   install so the two coexist at most for
                                   the duration of a single enforce cycle.
  common/Deploy-GEEnforce.ps1     Retrofit helper for already-imaged PCs
                                   (admin-run; copies runtime + registers
                                   task + optional immediate trigger).

Library (common/lib/Install-FromManifest.ps1):
  - New Type values: PS1, BAT, File, Registry, INF
  - New DetectionMethod values: Always, MarkerFile, ValueMatches, pnputil
  - TargetHostnames filter (exact + -like wildcards, ANDed with PCTypes)
  - Schema version check (logs WARN on manifest newer than lib MAJOR)
  - Auto-writes MarkerFile on successful one-shot PS1/BAT/CMD runs
  - MSI log scan on failure surfaces meaningful install errors
  - Lib version bumped 2.0 -> 2.1 for TargetHostnames

Observability:
  common/monitor-fleet-status.py  Scans _outputs/logs/*/status.json for
                                   stale check-ins, failed scopes, and
                                   version drift. Respects scope (dir-name),
                                   PCTypes, and TargetHostnames filters so
                                   entries excluded from a PC do not
                                   false-flag as drift.

Regression harness:
  common/test/                    Parameterized VM harness + README
                                   covering every action type plus
                                   rollback, bad/missing SFLD creds, and
                                   schema versioning.

Imaging integration:
  Run-ShopfloorSetup.ps1 now stages GE-Enforce.ps1 and lib to
  C:\Program Files\GE\Shopfloor\ and invokes Register-GEEnforce.ps1
  at the end of setup. Legacy Register-CommonEnforce invocation is
  kept for the transition; it and the legacy per-type enforcer files
  are dead code once Register-GEEnforce runs and will be removed in a
  dedicated cleanup pass.

Standard-Machine manifest:
  eDNC entry bumped 6.4.3 -> 6.4.5. DetectionValue pinned to the
  4-part FileVersion 6.4.5.0 verified against a fresh install in the
  Win11 analyzer VM. UDC DetectionValue pinned to 1.0.34 (registry
  stores 3-part for UDC; verified live).

scripts/mirror-from-gold.sh:
  Restructured around share-root rsyncs (one pass per Samba share)
  to close gaps in the prior per-subdir layout: winpeapps/_shared/
  Applications (7.5 GB of Adobe + fonts + Java + Office + OpenText
  + printdrivers + wireless + Zscaler), additional winpeapps image
  types, and enrollment flat-layout root files. Adds
  --skip-clonezilla and --skip-reports.

Verified end-to-end in the Win11 analyzer VM:
  - Every action Type and DetectionMethod round-tripped
  - PCTypes filter (Oracle excluded on Shopfloor, Firefox included
    on Shopfloor and DESKTOP-*, excluded elsewhere)
  - TargetHostnames filter (exact, wildcard, no-match)
  - Upgrade path: XML hash bump + fleet re-copy
  - Rollback path: history-archive restore propagates via enforcer,
    fleet converges back without per-PC intervention
  - Status writeback + monitor script drift detection
  - Graceful degradation on bad creds, missing creds, share
    unreachable (all exit 0, log clearly, retry next cycle)

Not in this commit (follow-ups):
  - Retire legacy per-type *-Enforce.ps1 files and simplify
    09-Setup-*.ps1 scripts (coordinated multi-file cleanup)
  - Stage 2b: InUseCheck close-and-reopen, ApplyMode gating,
    UpdateWindow, .apply-now.txt sentinel, BITS pre-staging,
    1618 mutex retry, PostInstallCheck, Uninstall action
  - Management app (manifest CRUD + deploy + rollback + fleet view)
  - ShopFloor autologon persistence bug (deferred for next imaging
    attempt with live registry evidence)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 11:19:23 -04:00

252 lines
8.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""monitor-fleet-status.py
Reads status.json writebacks from the shopfloor enforcer output tree and
flags:
- PCs that haven't checked in within a stale-threshold window
- PCs with any failed scope from their last run
- Expected-vs-installed version mismatches (drift) when --manifests is
supplied
Designed to run as a cron job on the PXE server (or any box with read
access to the share). Prints plaintext report to stdout; non-zero exit
code when anything needs attention so it's trivial to wrap in an alerting
script.
Usage:
./monitor-fleet-status.py --status-root /path/to/_outputs/logs
./monitor-fleet-status.py --status-root /.../_outputs/logs --stale-hours 24
./monitor-fleet-status.py --status-root /.../_outputs/logs \\
--manifests /.../common/manifest.json /.../cmm/manifest.json
Typical cron (runs hourly, mails the root user on any output):
0 * * * * camp /home/camp/bin/monitor-fleet-status.py \\
--status-root /home/camp/pxe-images/tsgwp00525-v2/shared/dt/shopfloor/_outputs/logs \\
--stale-hours 24 2>&1 | tail -100
"""
from __future__ import annotations
import argparse
import datetime as dt
import fnmatch
import json
import pathlib
import sys
from typing import Any, Iterable
def load_json(path: pathlib.Path) -> dict[str, Any] | None:
try:
return json.loads(path.read_text())
except Exception as e:
print(f"[!] {path}: parse failed: {e}", file=sys.stderr)
return None
def age_hours(iso_utc: str) -> float | None:
try:
t = dt.datetime.fromisoformat(iso_utc.replace('Z', '+00:00'))
now = dt.datetime.now(dt.timezone.utc)
return (now - t).total_seconds() / 3600.0
except Exception:
return None
def load_manifest_expectations(paths: Iterable[pathlib.Path]) -> list[dict[str, Any]]:
"""Load manifest entries with enough metadata to know which PCs each
entry should apply to. Returns a list of dicts, one per entry that has
a DetectionValue:
{ key: "scope/Name", expected: "...", scope: "common|<type>|<type>-<sub>",
pctypes: [...], target_hostnames: [...] }
Scope comes from the manifest file's parent directory name and is
treated as an implicit PC-type filter (parallels the lib's per-scope
dispatch in GE-Enforce.ps1).
"""
out: list[dict[str, Any]] = []
for p in paths:
m = load_json(p)
if not m:
continue
scope = p.parent.name
for app in m.get('Applications', []):
name = app.get('Name')
val = app.get('DetectionValue')
if not (name and val):
continue
out.append({
'key': f"{scope}/{name}",
'expected': val,
'scope': scope,
'pctypes': app.get('PCTypes') or [],
'target_hostnames': app.get('TargetHostnames') or [],
})
return out
def scope_applies_to_host(scope: str, pc_type: str, pc_sub_type: str) -> bool:
"""Mirror GE-Enforce.ps1's per-scope dispatch:
common -> applied to every PC type
<type> -> only when pc-type.txt matches <type>
<type>-<subtype> -> only when pc-type matches AND subtype matches
Case-insensitive.
"""
s = scope.lower()
if s in ('common', ''):
return True
if '-' in s:
t, sub = s.split('-', 1)
return (t == pc_type and sub == pc_sub_type)
return s == pc_type
def entry_applies_to_host(entry: dict[str, Any],
pc_type: str | None,
pc_sub_type: str | None,
hostname: str) -> bool:
"""Mirror the lib's entry-applies filter: scope + PCTypes + TargetHostnames,
all ANDed. Drift checks only flag entries that should have actually been
applied on this PC.
"""
pc_type = (pc_type or '').lower()
pc_sub_type = (pc_sub_type or '').lower()
hostname_lc = hostname.lower()
# Scope filter: per-type manifests are implicitly scoped by the dir name.
if not scope_applies_to_host(entry.get('scope', ''), pc_type, pc_sub_type):
return False
# PCTypes filter (explicit; applies within a scope): if set, PC must match.
pctypes = entry.get('pctypes') or []
if pctypes:
if not pc_type:
return False
matched = False
for t in pctypes:
t_lc = t.lower()
if t_lc == '*': matched = True; break
if t_lc == pc_type: matched = True; break
if pc_sub_type and t_lc == f"{pc_type}-{pc_sub_type}":
matched = True; break
if not matched:
return False
# TargetHostnames filter: if set, hostname must match exact or glob.
target_hosts = entry.get('target_hostnames') or []
if target_hosts:
matched = False
for h in target_hosts:
h_lc = h.lower()
if h_lc == hostname_lc: matched = True; break
if fnmatch.fnmatch(hostname_lc, h_lc): matched = True; break
if not matched:
return False
return True
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument('--status-root', required=True,
help='Root path like <share>/_outputs/logs/')
ap.add_argument('--stale-hours', type=float, default=24.0,
help='Warn if a PC hasn\'t checked in in this many hours (default 24)')
ap.add_argument('--manifests', nargs='*', type=pathlib.Path, default=[],
help='Optional manifest paths; when set, drift between manifest '
'DetectionValue and PC-reported installedVersion is flagged.')
args = ap.parse_args()
root = pathlib.Path(args.status_root)
if not root.is_dir():
print(f"ERROR: status-root not found: {root}", file=sys.stderr)
return 2
expectations = load_manifest_expectations(args.manifests)
issues = 0
seen = 0
stale = []
failed = []
drift = []
for host_dir in sorted(p for p in root.iterdir() if p.is_dir()):
status_file = host_dir / 'status.json'
if not status_file.exists():
continue
st = load_json(status_file)
if not st:
continue
host = st.get('hostname') or host_dir.name
pc_type = st.get('pcType')
sub_type = st.get('pcSubType')
seen += 1
# --- stale ---
hrs = age_hours(st.get('lastCheckIn', ''))
if hrs is None:
stale.append((host, 'unparseable timestamp'))
issues += 1
elif hrs > args.stale_hours:
stale.append((host, f'{hrs:.1f}h since last check-in (> {args.stale_hours}h)'))
issues += 1
# --- per-scope failures ---
for scope in (st.get('scopesProcessed') or []):
if (scope.get('ExitCode') or 0) != 0:
failed.append((host, scope.get('Label'), scope.get('ExitCode')))
issues += 1
# --- version drift ---
# Only check entries that should have applied to this PC. Entries
# with PCTypes or TargetHostnames filters that exclude this host
# are legitimately not installed and must not be flagged as drift.
if expectations:
installed = st.get('installedVersions', {}) or {}
for entry in expectations:
if not entry_applies_to_host(entry, pc_type, sub_type, host):
continue
key = entry['key']
want = entry['expected']
got = installed.get(key)
if got is None:
drift.append((host, key, 'missing', want))
issues += 1
elif str(got).upper() != str(want).upper():
drift.append((host, key, got, want))
issues += 1
# --- report ---
print(f"Fleet status monitor - scanned {seen} host(s) under {root}")
print(f" stale threshold: {args.stale_hours}h")
if args.manifests:
print(f" drift against: {', '.join(str(p) for p in args.manifests)}")
print()
if not issues:
print('All checked-in hosts are healthy.')
return 0
if stale:
print(f"STALE CHECK-INS ({len(stale)}):")
for host, msg in stale:
print(f" {host}: {msg}")
print()
if failed:
print(f"SCOPE FAILURES ({len(failed)}):")
for host, label, rc in failed:
print(f" {host}: scope '{label}' exited {rc}")
print()
if drift:
print(f"VERSION DRIFT ({len(drift)}):")
for host, key, got, want in drift:
print(f" {host}: {key} got={got} want={want}")
print()
print(f"Total issues: {issues}")
return 1
if __name__ == '__main__':
sys.exit(main())