BIOS check fix, parallel downloads, shopfloor hardening

- Fix check-bios.cmd: replace parenthesized if blocks with goto labels
  (cmd.exe fails silently with if/else on network-mapped drives)
- Move BIOS check files to winpeapps/_shared/BIOS for reliable SMB access
- Add network wait loop before BIOS check in startnet.cmd
- Show firmware status in WinPE menu header (BIOS_STATUS variable)
- Add BypassNRO registry key to skip OOBE network requirement
- Refactor download-drivers.py with --parallel N flag (ThreadPoolExecutor)
- Set SupportUser AutoLogonCount to 3 in shopfloor unattend
- Add shutdown -a at start + shutdown /r /t 10 at end of Run-ShopfloorSetup.ps1
- Switch download-drivers.py from wget to curl for reliable stall detection

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
cproudlock
2026-03-23 11:02:36 -04:00
parent 86660a0939
commit 6d0e6ee284
6 changed files with 1538 additions and 42 deletions

814
download-drivers.py Executable file
View File

@@ -0,0 +1,814 @@
#!/usr/bin/env python3
"""
download-drivers.py — Download Dell drivers (+ BIOS) and push to PXE server
Downloads driver packs directly from Dell's public catalog (downloads.dell.com).
Matches models from user_selections.json / HardwareDriver.json against Dell's
DriverPackCatalog. No GE network or Media Creator Lite required.
Usage:
./download-drivers.py # download + push selected drivers
./download-drivers.py --list # preview without downloading
./download-drivers.py --bios # also download BIOS updates
./download-drivers.py --image gea-standard # push directly to an image
./download-drivers.py --force # re-download even if on server
./download-drivers.py --parallel 4 # process 4 packs concurrently
Requires: curl, 7z, sshpass, rsync
"""
import argparse
import concurrent.futures
import hashlib
import json
import os
import re
import subprocess
import sys
import tempfile
import threading
import xml.etree.ElementTree as ET
from pathlib import Path
REPO_DIR = Path(__file__).resolve().parent
PXE_HOST = "10.9.100.1"
PXE_USER = "pxe"
PXE_PASS = "pxe"
UPLOAD_DEST = "/home/pxe/image-upload"
IMAGE_BASE = "/srv/samba/winpeapps"
DELL_DRIVER_CATALOG = "https://downloads.dell.com/catalog/DriverPackCatalog.cab"
DELL_BIOS_CATALOG = "https://downloads.dell.com/catalog/DellSDPCatalogPC.cab"
DELL_BASE = "https://downloads.dell.com"
NS = {"d": "openmanage/cm/dm"}
SDP_CAT_NS = "http://schemas.microsoft.com/sms/2005/04/CorporatePublishing/SystemsManagementCatalog.xsd"
SDP_PKG_NS = "http://schemas.microsoft.com/wsus/2005/04/CorporatePublishing/SoftwareDistributionPackage.xsd"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def format_size(n):
if n >= 1024**3: return f"{n / 1024**3:.1f} GB"
if n >= 1024**2: return f"{n / 1024**2:.0f} MB"
return f"{n / 1024:.0f} KB"
def resolve_dest_dir(d):
"""Convert *destinationdir*\\Deploy\\... to Deploy/..."""
return d.replace("*destinationdir*\\", "").replace("*destinationdir*", "").replace("\\", "/")
def ssh_cmd(host, cmd):
return subprocess.run(
["sshpass", "-p", PXE_PASS, "ssh", "-o", "StrictHostKeyChecking=no",
"-o", "LogLevel=ERROR", f"{PXE_USER}@{host}", cmd],
capture_output=True, text=True)
def verify_sha256(filepath, expected):
sha = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest().upper() == expected.upper()
def extract_model_ids(name):
"""Extract model identifiers like '5450', 'PC14250', 'QCM1250'."""
ids = set(re.findall(r'\b([A-Z]*\d[\w]{2,})\b', name, re.I))
# Dell uses Qx* codenames where GE uses QC*/QB* (e.g. QxM1250 = QCM1250)
extras = set()
for mid in ids:
if re.match(r'^Q[A-Z][A-Z]\d', mid, re.I):
extras.add("Qx" + mid[2:]) # QCM1250 -> QxM1250
elif re.match(r'^Qx[A-Z]\d', mid, re.I):
pass # already in Qx form, will match directly
return ids | extras
def get_brand(name):
lower = name.lower()
for b in ["latitude", "precision", "optiplex", "pro max", "pro"]:
if b in lower:
return b
return None
# ---------------------------------------------------------------------------
# Catalog download + parsing
# ---------------------------------------------------------------------------
def download_and_extract_cab(url, tmpdir):
"""Download a .cab, extract with 7z, return path to XML inside."""
cab = os.path.join(tmpdir, os.path.basename(url))
print(f" Fetching {os.path.basename(url)}...", end=" ", flush=True)
r = subprocess.run(["wget", "-q", "-O", cab, url])
if r.returncode != 0:
print("FAILED"); return None
subprocess.run(["7z", "x", "-y", f"-o{tmpdir}", cab],
capture_output=True, text=True)
os.remove(cab)
xml_name = os.path.basename(url).replace(".cab", ".xml")
xml_path = os.path.join(tmpdir, xml_name)
if os.path.exists(xml_path):
print("OK"); return xml_path
print("FAILED (XML not found)"); return None
def parse_driver_catalog(xml_path, os_filter=None):
"""Parse DriverPackCatalog.xml → list of driver pack dicts.
os_filter: list of OS prefixes to match, e.g. ["Windows10", "Windows11"].
Defaults to ["Windows10", "Windows11"] (both).
"""
if os_filter is None:
os_filter = ["Windows10", "Windows11"]
tree = ET.parse(xml_path)
packs = []
for pkg in tree.getroot().findall(".//d:DriverPackage", NS):
if pkg.get("type") != "win":
continue
os_codes = [o.get("osCode", "") for o in pkg.findall(".//d:OperatingSystem", NS)]
if not any(code.startswith(prefix) for code in os_codes for prefix in os_filter):
continue
models = []
for m in pkg.findall(".//d:Model", NS):
d = m.find("d:Display", NS)
models.append({
"name": m.get("name", ""),
"display": d.text.strip() if d is not None and d.text else ""
})
sha256 = ""
for h in pkg.findall(".//d:Cryptography/d:Hash", NS):
if h.get("algorithm") == "SHA256":
sha256 = h.text; break
path = pkg.get("path", "")
packs.append({
"url": f"{DELL_BASE}/{path}",
"filename": path.split("/")[-1],
"size": int(pkg.get("size", 0)),
"sha256": sha256,
"models": models,
})
return packs
def parse_bios_catalog(xml_path, model_names):
"""Parse DellSDPCatalogPC.xml → list of latest BIOS update dicts for given models."""
tree = ET.parse(xml_path)
root = tree.getroot()
bios = {} # model_key → best entry
for pkg in root.iter(f"{{{SDP_CAT_NS}}}SoftwareDistributionPackage"):
title_elem = pkg.find(f".//{{{SDP_PKG_NS}}}Title")
if title_elem is None or not title_elem.text:
continue
title = title_elem.text
if "BIOS" not in title:
continue
# Find which of our models this BIOS applies to
matched_model = None
for mname in model_names:
for mid in extract_model_ids(mname):
if mid in title:
matched_model = mname
break
if matched_model:
break
if not matched_model:
continue
# Extract version from title (e.g., "...BIOS,1.20.1,1.20.1")
ver_match = re.search(r",(\d+\.\d+\.\d+)", title)
version = ver_match.group(1) if ver_match else "0.0.0"
# Get download URL
origin = pkg.find(f".//{{{SDP_PKG_NS}}}OriginFile")
if origin is None:
continue
entry = {
"title": title,
"version": version,
"filename": origin.get("FileName", ""),
"url": origin.get("OriginUri", ""),
"size": int(origin.get("Size", 0)),
"model": matched_model,
}
# Keep latest version per model
key = matched_model
if key not in bios or version > bios[key]["version"]:
bios[key] = entry
return list(bios.values())
# ---------------------------------------------------------------------------
# Model matching
# ---------------------------------------------------------------------------
def find_dell_packs(our_model_name, dell_packs):
"""Find Dell driver pack(s) matching one of our model names."""
our_ids = extract_model_ids(our_model_name)
our_brand = get_brand(our_model_name)
our_rugged = "rugged" in our_model_name.lower()
if not our_ids:
return []
matches = []
for pack in dell_packs:
for dm in pack["models"]:
dell_ids = extract_model_ids(dm["name"]) | extract_model_ids(dm["display"])
if not (our_ids & dell_ids):
continue
# Brand check: if we specify a brand, Dell must match (or have none)
if our_brand:
dell_brand = get_brand(dm["name"])
if dell_brand and dell_brand != our_brand:
continue
# Rugged check: if Dell explicitly labels pack as Rugged,
# only match our Rugged models (prevents non-rugged 5430 matching
# Rugged 5430 pack). If Dell doesn't say Rugged, allow any match
# (handles 7220/7230 which are Rugged-only but unlabeled in catalog).
dell_rugged = "rugged" in dm["name"].lower() or "rugged" in pack["filename"].lower()
if dell_rugged and not our_rugged:
continue
matches.append(pack)
break
# Deduplicate by URL
seen = set()
return [m for m in matches if m["url"] not in seen and not seen.add(m["url"])]
# ---------------------------------------------------------------------------
# Download + push
# ---------------------------------------------------------------------------
def make_zip_name(filename, dest_dir):
"""Generate a zip filename matching GE convention: win11_<model>_<ver>.zip"""
# Strip extension and version suffix to get base name
base = re.sub(r'[_-]Win1[01][_.].*', '', filename, flags=re.I)
base = re.sub(r'[-_]', '', base).lower()
# Extract version from filename (e.g., A04, A13)
ver_match = re.search(r'_A(\d+)', filename, re.I)
ver = f"a{ver_match.group(1)}" if ver_match else "a00"
return f"win11_{base}_{ver}.zip"
def process_download(args, url, filename, sha256, size, target_dir, label, tmpdir):
"""Download, verify, extract, re-zip, and push one driver pack. Returns True on success.
Each caller should pass a unique tmpdir to avoid collisions in parallel mode."""
local_file = os.path.join(tmpdir, filename)
# Download
print(f" [{label}] Downloading {format_size(size)}...")
r = subprocess.run(["curl", "-L", "-s", "-S",
"--speed-limit", "1000", "--speed-time", "30",
"--retry", "3", "--retry-delay", "5",
"-o", local_file, url])
if r.returncode != 0 or not os.path.exists(local_file):
print(f" [{label}] ERROR: Download failed (curl exit {r.returncode})")
if os.path.exists(local_file): os.remove(local_file)
return False
# Verify hash (if provided)
if sha256:
print(f" [{label}] Verifying SHA256...", end=" ", flush=True)
if not verify_sha256(local_file, sha256):
print("MISMATCH!")
os.remove(local_file)
return False
print("OK")
# Extract locally with 7z (unique subdir per worker)
extract_dir = os.path.join(tmpdir, "extract")
os.makedirs(extract_dir, exist_ok=True)
print(f" [{label}] Extracting...", end=" ", flush=True)
r = subprocess.run(["7z", "x", "-y", f"-o{extract_dir}", local_file],
capture_output=True, text=True)
os.remove(local_file)
if r.returncode != 0:
print(f"FAILED: {r.stderr[:200]}")
subprocess.run(["rm", "-rf", extract_dir])
return False
print("OK")
# Re-zip for PESetup.exe (expects zipped driver packs, not loose files)
zip_name = make_zip_name(filename, target_dir)
zip_path = os.path.join(tmpdir, zip_name)
print(f" [{label}] Zipping as {zip_name}...", end=" ", flush=True)
r = subprocess.run(["zip", "-r", "-q", zip_path, "."],
cwd=extract_dir)
subprocess.run(["rm", "-rf", extract_dir])
if r.returncode != 0:
print("FAILED")
return False
zip_size = os.path.getsize(zip_path)
print(f"OK ({format_size(zip_size)})")
# Push zip to PXE server
print(f" [{label}] Pushing to {target_dir}/{zip_name}...")
ssh_cmd(args.server, f"mkdir -p '{target_dir}'")
r = subprocess.run([
"rsync", "-a",
"-e", f"sshpass -p {PXE_PASS} ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR",
zip_path, f"{PXE_USER}@{args.server}:{target_dir}/"
])
os.remove(zip_path)
if r.returncode != 0:
print(f" [{label}] ERROR: rsync failed")
return False
return True
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Download Dell drivers (+ BIOS) and push to PXE server")
parser.add_argument("--list", action="store_true",
help="Preview without downloading")
parser.add_argument("--bios", action="store_true",
help="Also download BIOS updates")
parser.add_argument("--image",
help="Push directly to image type (e.g. gea-standard)")
parser.add_argument("--server", default=PXE_HOST,
help=f"PXE server IP (default: {PXE_HOST})")
parser.add_argument("--force", action="store_true",
help="Re-download even if already on server")
parser.add_argument("--cache-path",
help="Path to local image dir with Deploy/Control/ and Tools/")
parser.add_argument("--local",
help="Download to local directory (no server needed)")
parser.add_argument("--parallel", type=int, default=1, metavar="N",
help="Process N packs concurrently (default: 1)")
args = parser.parse_args()
# --- Load our model selections ---
control_dir = tools_dir = None
if args.cache_path:
p = Path(args.cache_path)
control_dir = p / "Deploy" / "Control"
tools_dir = p / "Tools" if (p / "Tools").is_dir() else p.parent / "Tools"
else:
for d in sorted(REPO_DIR.iterdir()):
if d.is_dir() and (d / "Deploy" / "Control" / "HardwareDriver.json").exists():
control_dir = d / "Deploy" / "Control"
tools_dir = d / "Tools"
break
if not control_dir or not (control_dir / "HardwareDriver.json").exists():
sys.exit("ERROR: HardwareDriver.json not found. Use --cache-path or ensure a local image dir exists.")
if not (tools_dir / "user_selections.json").exists():
sys.exit("ERROR: user_selections.json not found")
with open(control_dir / "HardwareDriver.json") as f:
hw_drivers = json.load(f)
with open(tools_dir / "user_selections.json") as f:
selections = json.load(f)[0]
os_id = selections["OperatingSystemSelection"]
selected_families = set(m["Id"] for m in selections["HardwareModelSelection"])
# Filter to selected + matching OS
our_entries = [d for d in hw_drivers
if d["family"] in selected_families and os_id in d.get("aOsIds", [])]
# Collect unique model names and their DestinationDirs
model_dest_map = {} # model_name → dest_dir
for entry in our_entries:
dest = resolve_dest_dir(entry["DestinationDir"])
for m in entry["models"].split(","):
m = m.strip()
if m not in model_dest_map:
model_dest_map[m] = dest
base_path = f"{IMAGE_BASE}/{args.image}" if args.image else UPLOAD_DEST
print()
print("=" * 60)
print(" Dell Driver Downloader for PXE Server")
print("=" * 60)
# --- Download Dell catalog ---
with tempfile.TemporaryDirectory(prefix="dell-catalog-") as catdir:
xml_path = download_and_extract_cab(DELL_DRIVER_CATALOG, catdir)
if not xml_path:
sys.exit("ERROR: Could not download Dell driver catalog")
dell_packs = parse_driver_catalog(xml_path)
print(f" Catalog: {len(dell_packs)} Win11 driver packs available")
bios_updates = []
if args.bios:
bios_xml = download_and_extract_cab(DELL_BIOS_CATALOG, catdir)
if bios_xml:
bios_updates = parse_bios_catalog(bios_xml, list(model_dest_map.keys()))
print(f" BIOS: {len(bios_updates)} update(s) found")
# --- Match our models to Dell catalog ---
# Group: dest_dir → list of Dell packs to download
download_plan = [] # list of {dell_pack, dest_dir, our_models}
unmatched = []
seen_urls = set()
dest_seen = {} # dest_dir → set of URLs already planned
for model_name, dest_dir in model_dest_map.items():
matches = find_dell_packs(model_name, dell_packs)
if not matches:
unmatched.append(model_name)
continue
for pack in matches:
if pack["url"] in seen_urls:
continue
seen_urls.add(pack["url"])
download_plan.append({
"pack": pack,
"dest_dir": dest_dir,
"model": model_name,
})
# --- Display plan ---
print()
total_drv_size = sum(d["pack"]["size"] for d in download_plan)
print(f" Drivers: {len(download_plan)} pack(s) to download ({format_size(total_drv_size)})")
print(f" Target: {args.server}:{base_path}")
if unmatched:
print(f" No Dell match: {len(unmatched)} model(s)")
print()
for i, d in enumerate(download_plan, 1):
p = d["pack"]
print(f" {i:3}. {d['model']:<38} {format_size(p['size']):>8} {p['filename']}")
print(f" -> {d['dest_dir']}")
if unmatched:
print()
print(f" Unmatched models (not in Dell public catalog):")
for m in unmatched:
print(f" - {m}")
if bios_updates:
total_bios = sum(b["size"] for b in bios_updates)
print()
print(f" BIOS updates: {len(bios_updates)} ({format_size(total_bios)})")
for b in bios_updates:
print(f" {b['model']:<35} v{b['version']} {b['filename']}")
print()
if args.list:
print(" (--list mode, nothing downloaded)")
return
# --- LOCAL MODE: download to local directory ---
if args.local:
local_dir = Path(args.local)
drv_dir = local_dir / "drivers"
bios_local_dir = local_dir / "bios"
drv_dir.mkdir(parents=True, exist_ok=True)
# Load local manifest
manifest_path = local_dir / "manifest.json"
manifest = json.loads(manifest_path.read_text()) if manifest_path.exists() else {}
# Thread-safe counters and manifest access
_lock = threading.Lock()
counters = {"completed": 0, "skipped": 0, "errors": 0}
# Build GE filename mapping from our HardwareDriver.json entries
ge_filename_map = {} # model_name → GE FileName
for entry in our_entries:
fn = entry.get("FileName") or entry.get("fileName", "")
dest = resolve_dest_dir(entry.get("DestinationDir") or entry.get("destinationDir", ""))
for m in (entry.get("models") or entry.get("modelswminame", "")).split(","):
m = m.strip()
if m and fn:
ge_filename_map[m] = {"filename": fn, "dest_dir": dest}
def _download_one_local(i, d):
"""Download a single driver pack (local mode). Thread-safe."""
pack = d["pack"]
tag = f"[{i}/{len(download_plan)}]"
with _lock:
print(f"{'=' * 60}")
print(f"{tag} {d['model']} ({format_size(pack['size'])})")
print(f"{'=' * 60}")
# Check if already downloaded (manifest or file size match)
local_file = drv_dir / pack["filename"]
if not args.force:
with _lock:
existing_hash = manifest.get("drivers", {}).get(pack["url"])
if existing_hash == pack["sha256"]:
with _lock:
print(f"{tag} Already downloaded (hash matches)")
counters["skipped"] += 1
return
if local_file.exists() and local_file.stat().st_size == pack["size"]:
with _lock:
print(f"{tag} Already downloaded (size matches)")
manifest.setdefault("drivers", {})[pack["url"]] = pack["sha256"]
counters["skipped"] += 1
return
# Download raw .exe to drivers/
with _lock:
print(f"{tag} Downloading {format_size(pack['size'])}...")
r = subprocess.run(["curl", "-L", "-s", "-S",
"--speed-limit", "1000", "--speed-time", "30",
"--retry", "3", "--retry-delay", "5",
"-o", str(local_file), pack["url"]])
if r.returncode != 0 or not local_file.exists():
with _lock:
print(f"{tag} ERROR: Download failed (curl exit {r.returncode})")
counters["errors"] += 1
if local_file.exists(): local_file.unlink()
return
# Verify size first
actual_size = local_file.stat().st_size
if pack["size"] and actual_size != pack["size"]:
with _lock:
print(f"{tag} ERROR: Size mismatch (got {format_size(actual_size)}, expected {format_size(pack['size'])})")
counters["errors"] += 1
local_file.unlink()
return
# Verify hash
if pack["sha256"]:
with _lock:
print(f"{tag} Verifying SHA256...", end=" ", flush=True)
if not verify_sha256(str(local_file), pack["sha256"]):
with _lock:
print("MISMATCH!")
counters["errors"] += 1
local_file.unlink()
return
with _lock:
print("OK")
ge_info = ge_filename_map.get(d["model"], {})
with _lock:
counters["completed"] += 1
manifest.setdefault("drivers", {})[pack["url"]] = pack["sha256"]
manifest.setdefault("mapping", {})[pack["filename"]] = {
"model": d["model"],
"dell_filename": pack["filename"],
"ge_filename": ge_info.get("filename", ""),
"dest_dir": d["dest_dir"],
"sha256": pack["sha256"],
"size": pack["size"],
}
print(f"{tag} Done.")
workers = max(1, args.parallel)
if workers > 1:
print(f" Downloading with {workers} parallel workers")
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
futures = [pool.submit(_download_one_local, i, d)
for i, d in enumerate(download_plan, 1)]
concurrent.futures.wait(futures)
# --- Download BIOS ---
bios_ok = bios_err = 0
if bios_updates:
bios_local_dir.mkdir(parents=True, exist_ok=True)
print(f"{'=' * 60}")
print(f" BIOS Updates -> {bios_local_dir}")
print(f"{'=' * 60}")
def _download_one_bios(b):
nonlocal bios_ok, bios_err
with _lock:
print(f"\n {b['model']} v{b['version']}")
if not args.force:
with _lock:
existing = manifest.get("bios", {}).get(b["model"])
if existing == b["version"]:
with _lock:
print(f" Already downloaded (v{b['version']})")
return
local_file = bios_local_dir / b["filename"]
with _lock:
print(f" [{b['model']}] Downloading {format_size(b['size'])}...")
r = subprocess.run(["curl", "-L", "-s", "-S",
"--speed-limit", "1000", "--speed-time", "30",
"--retry", "3", "--retry-delay", "5",
"-o", str(local_file), b["url"]])
if r.returncode != 0:
with _lock:
print(f" [{b['model']}] ERROR: Download failed")
bios_err += 1
if local_file.exists(): local_file.unlink()
return
with _lock:
bios_ok += 1
manifest.setdefault("bios", {})[b["model"]] = b["version"]
manifest.setdefault("bios_mapping", {})[b["filename"]] = {
"model": b["model"],
"version": b["version"],
"filename": b["filename"],
}
print(f" [{b['model']}] Done.")
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
futures = [pool.submit(_download_one_bios, b) for b in bios_updates]
concurrent.futures.wait(futures)
# Save manifest
manifest_path.write_text(json.dumps(manifest, indent=2))
# --- Summary ---
print()
print(f"{'=' * 60}")
print(f" Summary — Local Download")
print(f"{'=' * 60}")
print(f" Drivers downloaded: {counters['completed']}")
if counters["skipped"]: print(f" Drivers skipped: {counters['skipped']} (already have)")
if counters["errors"]: print(f" Drivers failed: {counters['errors']}")
if bios_updates:
print(f" BIOS downloaded: {bios_ok}")
if bios_err: print(f" BIOS failed: {bios_err}")
print(f" Saved to: {local_dir}")
print(f" Manifest: {manifest_path}")
print()
print(f" To push to server later:")
print(f" python3 download-drivers.py --push-local {local_dir}")
print()
return
# --- REMOTE MODE: download and push to server ---
# --- Verify SSH ---
print(f" Testing SSH to {args.server}...", end=" ", flush=True)
r = ssh_cmd(args.server, "echo OK")
if r.stdout.strip() != "OK":
print("FAILED")
sys.exit(f" Cannot SSH to {PXE_USER}@{args.server}: {r.stderr.strip()}")
print("OK")
print()
# --- Load manifest (tracks what's been downloaded by hash) ---
manifest_path = f"{base_path}/.driver-manifest.json"
r = ssh_cmd(args.server, f"cat '{manifest_path}' 2>/dev/null")
manifest = json.loads(r.stdout) if r.stdout.strip() else {}
# Thread-safe counters and manifest access
_lock = threading.Lock()
counters = {"completed": 0, "skipped": 0, "errors": 0}
# --- Download drivers ---
with tempfile.TemporaryDirectory(prefix="pxe-drivers-") as tmpdir:
def _process_one_remote(i, d):
"""Download, extract, re-zip, and push one driver pack. Thread-safe."""
pack = d["pack"]
target = f"{base_path}/{d['dest_dir']}"
tag = f"[{i}/{len(download_plan)}]"
with _lock:
print(f"{'=' * 60}")
print(f"{tag} {d['model']} ({format_size(pack['size'])})")
print(f"{'=' * 60}")
if not args.force:
with _lock:
existing_hash = manifest.get(d["dest_dir"], {}).get(pack["filename"])
if existing_hash == pack["sha256"]:
with _lock:
print(f"{tag} Up to date (hash matches manifest)")
counters["skipped"] += 1
return
# Each worker gets its own temp subdirectory
worker_tmp = os.path.join(tmpdir, f"worker-{i}")
os.makedirs(worker_tmp, exist_ok=True)
ok = process_download(args, pack["url"], pack["filename"],
pack["sha256"], pack["size"], target,
d["model"], worker_tmp)
with _lock:
if ok:
counters["completed"] += 1
manifest.setdefault(d["dest_dir"], {})[pack["filename"]] = pack["sha256"]
print(f"{tag} Done.")
else:
counters["errors"] += 1
workers = max(1, args.parallel)
if workers > 1:
print(f" Processing with {workers} parallel workers")
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
futures = [pool.submit(_process_one_remote, i, d)
for i, d in enumerate(download_plan, 1)]
concurrent.futures.wait(futures)
# --- Download BIOS (goes to enrollment share, shared across all images) ---
bios_ok = bios_err = 0
bios_dir = "/srv/samba/enrollment/BIOS"
if bios_updates:
print(f"{'=' * 60}")
print(f" BIOS Updates -> {bios_dir}")
print(f"{'=' * 60}")
ssh_cmd(args.server, f"mkdir -p '{bios_dir}'")
models_txt = [] # lines for models.txt manifest
def _process_one_bios(b):
nonlocal bios_ok, bios_err
target = f"{bios_dir}/{b['filename']}"
with _lock:
print(f"\n {b['model']} v{b['version']}")
if not args.force:
with _lock:
existing = manifest.get("BIOS", {}).get(b["model"])
if existing == b["version"]:
with _lock:
print(f" Up to date (v{b['version']})")
models_txt.append(f"{b['model']}|{b['filename']}")
return
# BIOS .exe goes as-is (not extracted)
bios_tmp = os.path.join(tmpdir, f"bios-{b['filename']}")
with _lock:
print(f" [{b['model']}] Downloading {format_size(b['size'])}...")
r = subprocess.run(["curl", "-L", "-s", "-S",
"--speed-limit", "1000", "--speed-time", "30",
"--retry", "3", "--retry-delay", "5",
"-o", bios_tmp, b["url"]])
if r.returncode != 0:
with _lock:
print(f" [{b['model']}] ERROR: Download failed")
bios_err += 1
if os.path.exists(bios_tmp): os.remove(bios_tmp)
return
r = subprocess.run([
"rsync", "-a",
"-e", f"sshpass -p {PXE_PASS} ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR",
bios_tmp, f"{PXE_USER}@{args.server}:{target}"
])
os.remove(bios_tmp)
if r.returncode != 0:
with _lock:
print(f" [{b['model']}] ERROR: Push failed")
bios_err += 1
else:
with _lock:
print(f" [{b['model']}] Done.")
bios_ok += 1
manifest.setdefault("BIOS", {})[b["model"]] = b["version"]
models_txt.append(f"{b['model']}|{b['filename']}")
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
futures = [pool.submit(_process_one_bios, b) for b in bios_updates]
concurrent.futures.wait(futures)
# Generate models.txt for check-bios.cmd
if models_txt:
manifest_content = "# ModelSubstring|BIOSFile\\n" + "\\n".join(models_txt) + "\\n"
ssh_cmd(args.server,
f"printf '{manifest_content}' > '{bios_dir}/models.txt'")
print(f"\n models.txt updated ({len(models_txt)} entries)")
# --- Save manifest ---
completed, skipped, errors = counters["completed"], counters["skipped"], counters["errors"]
if completed > 0 or bios_ok > 0:
manifest_json = json.dumps(manifest, indent=2)
ssh_cmd(args.server,
f"cat > '{manifest_path}' << 'MANIFEST_EOF'\n{manifest_json}\nMANIFEST_EOF")
print(f" Manifest saved to {manifest_path}")
# --- Summary ---
print()
print(f"{'=' * 60}")
print(f" Summary")
print(f"{'=' * 60}")
print(f" Drivers downloaded: {completed}")
if skipped: print(f" Drivers skipped: {skipped} (up to date)")
if errors: print(f" Drivers failed: {errors}")
if bios_updates:
print(f" BIOS downloaded: {bios_ok}")
if bios_err: print(f" BIOS failed: {bios_err}")
print()
if completed > 0 and not args.image:
print(f" Drivers staged in {base_path}/Deploy/Out-of-box Drivers/")
print(f" Use the webapp (http://{args.server}:9009) to import,")
print(f" or re-run with --image <type> to push directly.")
print()
if __name__ == "__main__":
main()