Shopfloor PC type system, webapp enhancements, slim Blancco GRUB

- Shopfloor PC type menu (CMM, WaxAndTrace, Keyence, Genspect, Display, Standard)
- Baseline scripts: OpenText CSF, Start Menu shortcuts, network/WinRM, power/display
- Standard type: eDNC + MarkZebra with 64-bit path mirroring
- CMM type: Hexagon CLM Tools, PC-DMIS 2016/2019 R2
- Display sub-type: Lobby vs Dashboard
- Webapp: enrollment management, image config editor, UI refresh
- Upload-Image.ps1: robocopy MCL cache to PXE server
- Download-Drivers.ps1: Dell driver download pipeline
- Slim Blancco GRUB EFI (10MB -> 660KB) for old hardware compat
- Shopfloor display imaging guide docs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
cproudlock
2026-03-26 11:25:07 -04:00
parent 6d0e6ee284
commit 76165495ff
49 changed files with 4304 additions and 147 deletions

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
"""Flask web application for managing a GE Aerospace PXE server."""
import json
import logging
import os
import secrets
@@ -51,14 +52,15 @@ def audit(action, detail=""):
SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps")
CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla")
BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports")
ENROLLMENT_SHARE = os.environ.get("ENROLLMENT_SHARE", "/srv/samba/enrollment")
UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload")
SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared")
# Subdirs inside Deploy/ shared across ALL image types
SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"]
# Subdirs inside Deploy/ shared within the same image family (by prefix)
SHARED_DEPLOY_SCOPED = {
"gea-": ["Operating Systems"],
"ge-": ["Operating Systems"],
"gea-": ["Operating Systems", "Packages"],
"ge-": ["Operating Systems", "Packages"],
}
# Sibling dirs at image root shared within the same image family
SHARED_ROOT_DIRS = {
@@ -149,6 +151,164 @@ def unattend_path(image_type):
return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml")
def control_path(image_type):
"""Return the Deploy/Control directory for an image type."""
return os.path.join(deploy_path(image_type), "Control")
def tools_path(image_type):
"""Return the Tools directory for an image type."""
return os.path.join(SAMBA_SHARE, image_type, "Tools")
def _load_json(filepath):
"""Parse a JSON file and return its contents, or [] on failure."""
try:
with open(filepath, "r", encoding="utf-8-sig") as fh:
return json.load(fh)
except (OSError, json.JSONDecodeError):
return []
def _save_json(filepath, data):
"""Write data as pretty-printed JSON."""
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2, ensure_ascii=False)
fh.write("\n")
def _resolve_destination(dest_dir, image_type):
"""Convert a Windows *destinationdir* path to a Linux filesystem path.
Replaces the ``*destinationdir*`` placeholder and backslashes, then
prepends ``SAMBA_SHARE/image_type/`` and resolves symlinks.
"""
if not dest_dir:
return ""
# Replace the placeholder (case-insensitive)
path = dest_dir
lower = path.lower()
idx = lower.find("*destinationdir*")
if idx != -1:
path = path[idx + len("*destinationdir*"):]
# Backslash → forward slash, strip leading slash
path = path.replace("\\", "/").lstrip("/")
full = os.path.join(SAMBA_SHARE, image_type, path)
# Resolve symlinks so shared dirs are found
try:
full = os.path.realpath(full)
except OSError:
pass
return full
def _load_image_config(image_type):
"""Load all JSON configs for an image and check on-disk presence."""
ctrl = control_path(image_type)
tools = tools_path(image_type)
# --- Drivers (merge HardwareDriver.json + hw_drivers.json) ---
hw_driver_file = os.path.join(ctrl, "HardwareDriver.json")
hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json")
drivers_raw = _load_json(hw_driver_file)
extra_raw = _load_json(hw_drivers_extra)
# Merge: dedup by FileName (case-insensitive)
seen_files = set()
drivers = []
for d in drivers_raw + extra_raw:
fname = (d.get("FileName") or d.get("fileName") or "").lower()
if fname and fname in seen_files:
continue
if fname:
seen_files.add(fname)
drivers.append(d)
# Check disk presence for each driver
for d in drivers:
fname = d.get("FileName") or d.get("fileName") or ""
dest = d.get("DestinationDir") or d.get("destinationDir") or ""
resolved = _resolve_destination(dest, image_type)
if resolved and fname:
d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
else:
d["_on_disk"] = False
# --- Operating Systems ---
os_file = os.path.join(ctrl, "OperatingSystem.json")
operating_systems = _load_json(os_file)
for entry in operating_systems:
osv = entry.get("operatingSystemVersion", {})
wim = osv.get("wim", {})
dest = wim.get("DestinationDir") or wim.get("destinationDir") or ""
resolved = _resolve_destination(dest, image_type)
if resolved:
entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim"))
else:
entry["_on_disk"] = False
# --- Packages ---
pkg_file = os.path.join(ctrl, "packages.json")
packages = _load_json(pkg_file)
for p in packages:
fname = p.get("fileName") or p.get("FileName") or ""
dest = p.get("destinationDir") or p.get("DestinationDir") or ""
resolved = _resolve_destination(dest, image_type)
if resolved and fname:
p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
else:
p["_on_disk"] = False
# --- Hardware Models (user_selections.json) ---
us_file = os.path.join(tools, "user_selections.json")
us_raw = _load_json(us_file)
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
hardware_models = us_data.get("HardwareModelSelection", [])
os_selection = str(us_data.get("OperatingSystemSelection", ""))
# Build a lookup: family → driver entry (for disk-presence on models)
family_lookup = {}
for d in drivers:
family = d.get("family", "")
if family:
family_lookup[family] = d
for hm in hardware_models:
family_id = hm.get("Id", "")
matched = family_lookup.get(family_id)
hm["_on_disk"] = matched["_on_disk"] if matched else False
# --- Orphan drivers: zip files on disk not referenced in any JSON ---
orphan_drivers = []
oob_dir = os.path.join(deploy_path(image_type), "Out-of-box Drivers")
# Resolve symlinks
try:
oob_dir = os.path.realpath(oob_dir)
except OSError:
pass
registered_files = set()
for d in drivers:
fname = d.get("FileName") or d.get("fileName") or ""
if fname:
registered_files.add(fname.lower())
if os.path.isdir(oob_dir):
for dirpath, _dirnames, filenames in os.walk(oob_dir):
for fn in filenames:
if fn.lower().endswith(".zip") and fn.lower() not in registered_files:
rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir)
orphan_drivers.append({"fileName": fn, "relPath": rel})
return {
"hardware_models": hardware_models,
"drivers": drivers,
"operating_systems": operating_systems,
"packages": packages,
"orphan_drivers": orphan_drivers,
"os_selection": os_selection,
}
def image_status(image_type):
"""Return a dict describing the state of an image type."""
dp = deploy_path(image_type)
@@ -255,10 +415,11 @@ def _import_deploy(src_deploy, dst_deploy, target="", move=False):
_replace_with_symlink(dst_item, shared_dest)
continue
# Normal transfer
if os.path.exists(dst_item):
shutil.rmtree(dst_item)
_transfer_tree(src_item, dst_item)
# Normal transfer — merge to preserve existing custom content
if os.path.isdir(dst_item):
_merge_tree(src_item, dst_item, move=move)
else:
_transfer_tree(src_item, dst_item)
def _replace_with_symlink(link_path, target_path):
@@ -355,6 +516,21 @@ def parse_unattend(xml_path):
"SkipMachineOOBE": "true",
},
"firstlogon_commands": [],
"user_accounts": [],
"autologon": {
"enabled": "",
"username": "",
"password": "",
"plain_text": "true",
"logon_count": "",
},
"intl": {
"input_locale": "",
"system_locale": "",
"ui_language": "",
"user_locale": "",
},
"oobe_timezone": "",
"raw_xml": "",
}
@@ -418,6 +594,19 @@ def parse_unattend(xml_path):
namespaces=ns,
):
comp_name = comp.get("name", "")
# International-Core component
if "International-Core" in comp_name:
for tag, key in [
("InputLocale", "input_locale"),
("SystemLocale", "system_locale"),
("UILanguage", "ui_language"),
("UserLocale", "user_locale"),
]:
el = comp.find(qn(tag))
if el is not None and el.text:
data["intl"][key] = el.text.strip()
if "OOBE" in comp_name or "Shell-Setup" in comp_name:
oobe_el = comp.find(qn("OOBE"))
if oobe_el is not None:
@@ -426,6 +615,57 @@ def parse_unattend(xml_path):
if local in data["oobe"] and child.text:
data["oobe"][local] = child.text.strip()
# UserAccounts / LocalAccounts
ua = comp.find(qn("UserAccounts"))
if ua is not None:
la_container = ua.find(qn("LocalAccounts"))
if la_container is not None:
for acct in la_container.findall(qn("LocalAccount")):
name_el = acct.find(qn("Name"))
group_el = acct.find(qn("Group"))
display_el = acct.find(qn("DisplayName"))
pw_el = acct.find(qn("Password"))
pw_val = ""
pw_plain = "true"
if pw_el is not None:
v = pw_el.find(qn("Value"))
p = pw_el.find(qn("PlainText"))
if v is not None and v.text:
pw_val = v.text.strip()
if p is not None and p.text:
pw_plain = p.text.strip()
data["user_accounts"].append({
"name": name_el.text.strip() if name_el is not None and name_el.text else "",
"password": pw_val,
"plain_text": pw_plain,
"group": group_el.text.strip() if group_el is not None and group_el.text else "Administrators",
"display_name": display_el.text.strip() if display_el is not None and display_el.text else "",
})
# AutoLogon
al = comp.find(qn("AutoLogon"))
if al is not None:
enabled_el = al.find(qn("Enabled"))
user_el = al.find(qn("Username"))
count_el = al.find(qn("LogonCount"))
pw_el = al.find(qn("Password"))
pw_val = ""
pw_plain = "true"
if pw_el is not None:
v = pw_el.find(qn("Value"))
p = pw_el.find(qn("PlainText"))
if v is not None and v.text:
pw_val = v.text.strip()
if p is not None and p.text:
pw_plain = p.text.strip()
data["autologon"] = {
"enabled": enabled_el.text.strip() if enabled_el is not None and enabled_el.text else "",
"username": user_el.text.strip() if user_el is not None and user_el.text else "",
"password": pw_val,
"plain_text": pw_plain,
"logon_count": count_el.text.strip() if count_el is not None and count_el.text else "",
}
# FirstLogonCommands
flc = comp.find(qn("FirstLogonCommands"))
if flc is not None:
@@ -439,6 +679,11 @@ def parse_unattend(xml_path):
"description": desc_el.text.strip() if desc_el is not None and desc_el.text else "",
})
# TimeZone (oobeSystem pass)
tz_el = comp.find(qn("TimeZone"))
if tz_el is not None and tz_el.text:
data["oobe_timezone"] = tz_el.text.strip()
return data
@@ -515,6 +760,28 @@ def build_unattend_xml(form_data):
# --- oobeSystem ---
oobe_settings = _settings_pass(root, "oobeSystem")
# International-Core component (before Shell-Setup)
intl = form_data.get("intl", {})
if any(v.strip() for v in intl.values() if v):
intl_comp = etree.SubElement(oobe_settings, qn("component"))
intl_comp.set("name", "Microsoft-Windows-International-Core")
intl_comp.set("processorArchitecture", "amd64")
intl_comp.set("publicKeyToken", "31bf3856ad364e35")
intl_comp.set("language", "neutral")
intl_comp.set("versionScope", "nonSxS")
for tag, key in [
("InputLocale", "input_locale"),
("SystemLocale", "system_locale"),
("UILanguage", "ui_language"),
("UserLocale", "user_locale"),
]:
val = intl.get(key, "").strip()
if val:
el = etree.SubElement(intl_comp, qn(tag))
el.text = val
# Shell-Setup component
oobe_comp = etree.SubElement(oobe_settings, qn("component"))
oobe_comp.set("name", "Microsoft-Windows-Shell-Setup")
oobe_comp.set("processorArchitecture", "amd64")
@@ -540,6 +807,46 @@ def build_unattend_xml(form_data):
child = etree.SubElement(oobe_el, qn(key))
child.text = str(val)
# UserAccounts / LocalAccounts
accounts = form_data.get("user_accounts", [])
if accounts:
ua = etree.SubElement(oobe_comp, qn("UserAccounts"))
la_container = etree.SubElement(ua, qn("LocalAccounts"))
for acct in accounts:
if not acct.get("name", "").strip():
continue
la = etree.SubElement(la_container, qn("LocalAccount"))
la.set(qwcm("action"), "add")
pw = etree.SubElement(la, qn("Password"))
pw_val = etree.SubElement(pw, qn("Value"))
pw_val.text = acct.get("password", "")
pw_plain = etree.SubElement(pw, qn("PlainText"))
pw_plain.text = acct.get("plain_text", "true")
name_el = etree.SubElement(la, qn("Name"))
name_el.text = acct["name"].strip()
group_el = etree.SubElement(la, qn("Group"))
group_el.text = acct.get("group", "Administrators").strip()
display_el = etree.SubElement(la, qn("DisplayName"))
display_el.text = acct.get("display_name", acct["name"]).strip()
# AutoLogon
autologon = form_data.get("autologon", {})
if autologon.get("username", "").strip():
al = etree.SubElement(oobe_comp, qn("AutoLogon"))
al_pw = etree.SubElement(al, qn("Password"))
al_pw_val = etree.SubElement(al_pw, qn("Value"))
al_pw_val.text = autologon.get("password", "")
al_pw_plain = etree.SubElement(al_pw, qn("PlainText"))
al_pw_plain.text = autologon.get("plain_text", "true")
al_enabled = etree.SubElement(al, qn("Enabled"))
al_enabled.text = autologon.get("enabled", "true")
al_user = etree.SubElement(al, qn("Username"))
al_user.text = autologon["username"].strip()
logon_count = autologon.get("logon_count", "").strip()
if logon_count:
al_count = etree.SubElement(al, qn("LogonCount"))
al_count.text = logon_count
# FirstLogonCommands
fl_cmds = form_data.get("firstlogon_commands", [])
if fl_cmds:
@@ -556,6 +863,12 @@ def build_unattend_xml(form_data):
desc_el = etree.SubElement(sc, qn("Description"))
desc_el.text = cmd.get("description", "").strip()
# TimeZone (oobeSystem pass)
oobe_tz = form_data.get("oobe_timezone", "").strip()
if oobe_tz:
tz_el = etree.SubElement(oobe_comp, qn("TimeZone"))
tz_el.text = oobe_tz
xml_bytes = etree.tostring(
root,
pretty_print=True,
@@ -616,6 +929,40 @@ def _extract_form_data(form):
"description": fl_descs[i] if i < len(fl_descs) else "",
})
# User accounts
accounts = []
i = 0
while form.get(f"account_name_{i}"):
accounts.append({
"name": form.get(f"account_name_{i}", ""),
"password": form.get(f"account_password_{i}", ""),
"plain_text": form.get(f"account_plaintext_{i}", "true"),
"group": form.get(f"account_group_{i}", "Administrators"),
"display_name": form.get(f"account_display_{i}", ""),
})
i += 1
data["user_accounts"] = accounts
# AutoLogon
data["autologon"] = {
"enabled": form.get("autologon_enabled", ""),
"username": form.get("autologon_username", ""),
"password": form.get("autologon_password", ""),
"plain_text": form.get("autologon_plaintext", "true"),
"logon_count": form.get("autologon_logoncount", ""),
}
# International settings
data["intl"] = {
"input_locale": form.get("intl_input_locale", ""),
"system_locale": form.get("intl_system_locale", ""),
"ui_language": form.get("intl_ui_language", ""),
"user_locale": form.get("intl_user_locale", ""),
}
# OOBE TimeZone
data["oobe_timezone"] = form.get("oobe_timezone", "")
return data
@@ -799,6 +1146,74 @@ def unattend_editor(image_type):
)
@app.route("/images/<image_type>/config")
def image_config(image_type):
if image_type not in IMAGE_TYPES:
flash("Unknown image type.", "danger")
return redirect(url_for("dashboard"))
config = _load_image_config(image_type)
return render_template(
"image_config.html",
image_type=image_type,
friendly_name=FRIENDLY_NAMES.get(image_type, image_type),
config=config,
)
@app.route("/images/<image_type>/config/save", methods=["POST"])
def image_config_save(image_type):
if image_type not in IMAGE_TYPES:
flash("Unknown image type.", "danger")
return redirect(url_for("dashboard"))
section = request.form.get("section", "")
payload = request.form.get("payload", "[]")
try:
data = json.loads(payload)
except json.JSONDecodeError:
flash("Invalid JSON payload.", "danger")
return redirect(url_for("image_config", image_type=image_type))
ctrl = control_path(image_type)
tools = tools_path(image_type)
try:
if section == "hardware_models":
us_file = os.path.join(tools, "user_selections.json")
us_raw = _load_json(us_file)
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
us_data["HardwareModelSelection"] = data
_save_json(us_file, [us_data])
audit("CONFIG_SAVE", f"{image_type}/hardware_models")
elif section == "drivers":
# Strip internal fields before saving
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
_save_json(os.path.join(ctrl, "HardwareDriver.json"), clean)
audit("CONFIG_SAVE", f"{image_type}/drivers")
elif section == "operating_systems":
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
_save_json(os.path.join(ctrl, "OperatingSystem.json"), clean)
audit("CONFIG_SAVE", f"{image_type}/operating_systems")
elif section == "packages":
clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data]
_save_json(os.path.join(ctrl, "packages.json"), clean)
audit("CONFIG_SAVE", f"{image_type}/packages")
else:
flash(f"Unknown section: {section}", "danger")
return redirect(url_for("image_config", image_type=image_type))
flash(f"Saved {section.replace('_', ' ')} successfully.", "success")
except Exception as exc:
flash(f"Failed to save {section}: {exc}", "danger")
return redirect(url_for("image_config", image_type=image_type))
# ---------------------------------------------------------------------------
# Routes — Clonezilla Backups
# ---------------------------------------------------------------------------
@@ -922,6 +1337,78 @@ def blancco_delete_report(filename):
return redirect(url_for("blancco_reports"))
# ---------------------------------------------------------------------------
# Routes — Enrollment Packages
# ---------------------------------------------------------------------------
@app.route("/enrollment")
def enrollment():
packages = []
if os.path.isdir(ENROLLMENT_SHARE):
for f in sorted(os.listdir(ENROLLMENT_SHARE)):
fpath = os.path.join(ENROLLMENT_SHARE, f)
if os.path.isfile(fpath) and f.lower().endswith(".ppkg"):
stat = os.stat(fpath)
packages.append({
"filename": f,
"size": stat.st_size,
"modified": stat.st_mtime,
})
return render_template(
"enrollment.html",
packages=packages,
image_types=IMAGE_TYPES,
friendly_names=FRIENDLY_NAMES,
)
@app.route("/enrollment/upload", methods=["POST"])
def enrollment_upload():
if "ppkg_file" not in request.files:
flash("No file selected.", "danger")
return redirect(url_for("enrollment"))
f = request.files["ppkg_file"]
if not f.filename:
flash("No file selected.", "danger")
return redirect(url_for("enrollment"))
filename = secure_filename(f.filename)
if not filename.lower().endswith(".ppkg"):
flash("Only .ppkg files are accepted.", "danger")
return redirect(url_for("enrollment"))
os.makedirs(ENROLLMENT_SHARE, exist_ok=True)
dest = os.path.join(ENROLLMENT_SHARE, filename)
f.save(dest)
audit("ENROLLMENT_UPLOAD", filename)
flash(f"Uploaded {filename} successfully.", "success")
return redirect(url_for("enrollment"))
@app.route("/enrollment/download/<filename>")
def enrollment_download(filename):
filename = secure_filename(filename)
fpath = os.path.join(ENROLLMENT_SHARE, filename)
if not os.path.isfile(fpath):
flash(f"Package not found: {filename}", "danger")
return redirect(url_for("enrollment"))
return send_file(fpath, as_attachment=True)
@app.route("/enrollment/delete/<filename>", methods=["POST"])
def enrollment_delete(filename):
filename = secure_filename(filename)
fpath = os.path.join(ENROLLMENT_SHARE, filename)
if os.path.isfile(fpath):
os.remove(fpath)
audit("ENROLLMENT_DELETE", filename)
flash(f"Deleted {filename}.", "success")
else:
flash(f"Package not found: {filename}", "danger")
return redirect(url_for("enrollment"))
# ---------------------------------------------------------------------------
# Routes — startnet.cmd Editor (WIM)
# ---------------------------------------------------------------------------