#!/usr/bin/env python3 """ download-drivers.py — Download Dell drivers (+ BIOS) and push to PXE server Downloads driver packs directly from Dell's public catalog (downloads.dell.com). Matches models from user_selections.json / HardwareDriver.json against Dell's DriverPackCatalog. No GE network or Media Creator Lite required. Usage: ./download-drivers.py # download + push selected drivers ./download-drivers.py --list # preview without downloading ./download-drivers.py --bios # also download BIOS updates ./download-drivers.py --image gea-standard # push directly to an image ./download-drivers.py --force # re-download even if on server ./download-drivers.py --parallel 4 # process 4 packs concurrently Requires: curl, 7z, sshpass, rsync """ import argparse import concurrent.futures import hashlib import json import os import re import subprocess import sys import tempfile import threading import xml.etree.ElementTree as ET from pathlib import Path REPO_DIR = Path(__file__).resolve().parent PXE_HOST = "10.9.100.1" PXE_USER = "pxe" PXE_PASS = "pxe" UPLOAD_DEST = "/home/pxe/image-upload" IMAGE_BASE = "/srv/samba/winpeapps" DELL_DRIVER_CATALOG = "https://downloads.dell.com/catalog/DriverPackCatalog.cab" DELL_BIOS_CATALOG = "https://downloads.dell.com/catalog/DellSDPCatalogPC.cab" DELL_BASE = "https://downloads.dell.com" NS = {"d": "openmanage/cm/dm"} SDP_CAT_NS = "http://schemas.microsoft.com/sms/2005/04/CorporatePublishing/SystemsManagementCatalog.xsd" SDP_PKG_NS = "http://schemas.microsoft.com/wsus/2005/04/CorporatePublishing/SoftwareDistributionPackage.xsd" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def format_size(n): if n >= 1024**3: return f"{n / 1024**3:.1f} GB" if n >= 1024**2: return f"{n / 1024**2:.0f} MB" return f"{n / 1024:.0f} KB" def resolve_dest_dir(d): """Convert *destinationdir*\\Deploy\\... to Deploy/...""" return d.replace("*destinationdir*\\", "").replace("*destinationdir*", "").replace("\\", "/") def ssh_cmd(host, cmd): return subprocess.run( ["sshpass", "-p", PXE_PASS, "ssh", "-o", "StrictHostKeyChecking=no", "-o", "LogLevel=ERROR", f"{PXE_USER}@{host}", cmd], capture_output=True, text=True) def verify_sha256(filepath, expected): sha = hashlib.sha256() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): sha.update(chunk) return sha.hexdigest().upper() == expected.upper() def extract_model_ids(name): """Extract model identifiers like '5450', 'PC14250', 'QCM1250'.""" ids = set(re.findall(r'\b([A-Z]*\d[\w]{2,})\b', name, re.I)) # Dell uses Qx* codenames where GE uses QC*/QB* (e.g. QxM1250 = QCM1250) extras = set() for mid in ids: if re.match(r'^Q[A-Z][A-Z]\d', mid, re.I): extras.add("Qx" + mid[2:]) # QCM1250 -> QxM1250 elif re.match(r'^Qx[A-Z]\d', mid, re.I): pass # already in Qx form, will match directly return ids | extras def get_brand(name): lower = name.lower() for b in ["latitude", "precision", "optiplex", "pro max", "pro"]: if b in lower: return b return None # --------------------------------------------------------------------------- # Catalog download + parsing # --------------------------------------------------------------------------- def download_and_extract_cab(url, tmpdir): """Download a .cab, extract with 7z, return path to XML inside.""" cab = os.path.join(tmpdir, os.path.basename(url)) print(f" Fetching {os.path.basename(url)}...", end=" ", flush=True) r = subprocess.run(["wget", "-q", "-O", cab, url]) if r.returncode != 0: print("FAILED"); return None subprocess.run(["7z", "x", "-y", f"-o{tmpdir}", cab], capture_output=True, text=True) os.remove(cab) xml_name = os.path.basename(url).replace(".cab", ".xml") xml_path = os.path.join(tmpdir, xml_name) if os.path.exists(xml_path): print("OK"); return xml_path print("FAILED (XML not found)"); return None def parse_driver_catalog(xml_path, os_filter=None): """Parse DriverPackCatalog.xml → list of driver pack dicts. os_filter: list of OS prefixes to match, e.g. ["Windows10", "Windows11"]. Defaults to ["Windows10", "Windows11"] (both). """ if os_filter is None: os_filter = ["Windows10", "Windows11"] tree = ET.parse(xml_path) packs = [] for pkg in tree.getroot().findall(".//d:DriverPackage", NS): if pkg.get("type") != "win": continue os_codes = [o.get("osCode", "") for o in pkg.findall(".//d:OperatingSystem", NS)] if not any(code.startswith(prefix) for code in os_codes for prefix in os_filter): continue models = [] for m in pkg.findall(".//d:Model", NS): d = m.find("d:Display", NS) models.append({ "name": m.get("name", ""), "display": d.text.strip() if d is not None and d.text else "" }) sha256 = "" for h in pkg.findall(".//d:Cryptography/d:Hash", NS): if h.get("algorithm") == "SHA256": sha256 = h.text; break path = pkg.get("path", "") packs.append({ "url": f"{DELL_BASE}/{path}", "filename": path.split("/")[-1], "size": int(pkg.get("size", 0)), "sha256": sha256, "models": models, }) return packs def parse_bios_catalog(xml_path, model_names): """Parse DellSDPCatalogPC.xml → list of latest BIOS update dicts for given models.""" tree = ET.parse(xml_path) root = tree.getroot() bios = {} # model_key → best entry for pkg in root.iter(f"{{{SDP_CAT_NS}}}SoftwareDistributionPackage"): title_elem = pkg.find(f".//{{{SDP_PKG_NS}}}Title") if title_elem is None or not title_elem.text: continue title = title_elem.text if "BIOS" not in title: continue # Find which of our models this BIOS applies to matched_model = None for mname in model_names: for mid in extract_model_ids(mname): if mid in title: matched_model = mname break if matched_model: break if not matched_model: continue # Extract version from title (e.g., "...BIOS,1.20.1,1.20.1") ver_match = re.search(r",(\d+\.\d+\.\d+)", title) version = ver_match.group(1) if ver_match else "0.0.0" # Get download URL origin = pkg.find(f".//{{{SDP_PKG_NS}}}OriginFile") if origin is None: continue entry = { "title": title, "version": version, "filename": origin.get("FileName", ""), "url": origin.get("OriginUri", ""), "size": int(origin.get("Size", 0)), "model": matched_model, } # Keep latest version per model key = matched_model if key not in bios or version > bios[key]["version"]: bios[key] = entry return list(bios.values()) # --------------------------------------------------------------------------- # Model matching # --------------------------------------------------------------------------- def find_dell_packs(our_model_name, dell_packs): """Find Dell driver pack(s) matching one of our model names.""" our_ids = extract_model_ids(our_model_name) our_brand = get_brand(our_model_name) our_rugged = "rugged" in our_model_name.lower() if not our_ids: return [] matches = [] for pack in dell_packs: for dm in pack["models"]: dell_ids = extract_model_ids(dm["name"]) | extract_model_ids(dm["display"]) if not (our_ids & dell_ids): continue # Brand check: if we specify a brand, Dell must match (or have none) if our_brand: dell_brand = get_brand(dm["name"]) if dell_brand and dell_brand != our_brand: continue # Rugged check: if Dell explicitly labels pack as Rugged, # only match our Rugged models (prevents non-rugged 5430 matching # Rugged 5430 pack). If Dell doesn't say Rugged, allow any match # (handles 7220/7230 which are Rugged-only but unlabeled in catalog). dell_rugged = "rugged" in dm["name"].lower() or "rugged" in pack["filename"].lower() if dell_rugged and not our_rugged: continue matches.append(pack) break # Deduplicate by URL seen = set() return [m for m in matches if m["url"] not in seen and not seen.add(m["url"])] # --------------------------------------------------------------------------- # Download + push # --------------------------------------------------------------------------- def make_zip_name(filename, dest_dir): """Generate a zip filename matching GE convention: win11__.zip""" # Strip extension and version suffix to get base name base = re.sub(r'[_-]Win1[01][_.].*', '', filename, flags=re.I) base = re.sub(r'[-_]', '', base).lower() # Extract version from filename (e.g., A04, A13) ver_match = re.search(r'_A(\d+)', filename, re.I) ver = f"a{ver_match.group(1)}" if ver_match else "a00" return f"win11_{base}_{ver}.zip" def process_download(args, url, filename, sha256, size, target_dir, label, tmpdir): """Download, verify, extract, re-zip, and push one driver pack. Returns True on success. Each caller should pass a unique tmpdir to avoid collisions in parallel mode.""" local_file = os.path.join(tmpdir, filename) # Download print(f" [{label}] Downloading {format_size(size)}...") r = subprocess.run(["curl", "-L", "-s", "-S", "--speed-limit", "1000", "--speed-time", "30", "--retry", "3", "--retry-delay", "5", "-o", local_file, url]) if r.returncode != 0 or not os.path.exists(local_file): print(f" [{label}] ERROR: Download failed (curl exit {r.returncode})") if os.path.exists(local_file): os.remove(local_file) return False # Verify hash (if provided) if sha256: print(f" [{label}] Verifying SHA256...", end=" ", flush=True) if not verify_sha256(local_file, sha256): print("MISMATCH!") os.remove(local_file) return False print("OK") # Extract locally with 7z (unique subdir per worker) extract_dir = os.path.join(tmpdir, "extract") os.makedirs(extract_dir, exist_ok=True) print(f" [{label}] Extracting...", end=" ", flush=True) r = subprocess.run(["7z", "x", "-y", f"-o{extract_dir}", local_file], capture_output=True, text=True) os.remove(local_file) if r.returncode != 0: print(f"FAILED: {r.stderr[:200]}") subprocess.run(["rm", "-rf", extract_dir]) return False print("OK") # Re-zip for PESetup.exe (expects zipped driver packs, not loose files) zip_name = make_zip_name(filename, target_dir) zip_path = os.path.join(tmpdir, zip_name) print(f" [{label}] Zipping as {zip_name}...", end=" ", flush=True) r = subprocess.run(["zip", "-r", "-q", zip_path, "."], cwd=extract_dir) subprocess.run(["rm", "-rf", extract_dir]) if r.returncode != 0: print("FAILED") return False zip_size = os.path.getsize(zip_path) print(f"OK ({format_size(zip_size)})") # Push zip to PXE server print(f" [{label}] Pushing to {target_dir}/{zip_name}...") ssh_cmd(args.server, f"mkdir -p '{target_dir}'") r = subprocess.run([ "rsync", "-a", "-e", f"sshpass -p {PXE_PASS} ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR", zip_path, f"{PXE_USER}@{args.server}:{target_dir}/" ]) os.remove(zip_path) if r.returncode != 0: print(f" [{label}] ERROR: rsync failed") return False return True # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Download Dell drivers (+ BIOS) and push to PXE server") parser.add_argument("--list", action="store_true", help="Preview without downloading") parser.add_argument("--bios", action="store_true", help="Also download BIOS updates") parser.add_argument("--image", help="Push directly to image type (e.g. gea-standard)") parser.add_argument("--server", default=PXE_HOST, help=f"PXE server IP (default: {PXE_HOST})") parser.add_argument("--force", action="store_true", help="Re-download even if already on server") parser.add_argument("--cache-path", help="Path to local image dir with Deploy/Control/ and Tools/") parser.add_argument("--local", help="Download to local directory (no server needed)") parser.add_argument("--parallel", type=int, default=1, metavar="N", help="Process N packs concurrently (default: 1)") args = parser.parse_args() # --- Load our model selections --- control_dir = tools_dir = None if args.cache_path: p = Path(args.cache_path) control_dir = p / "Deploy" / "Control" tools_dir = p / "Tools" if (p / "Tools").is_dir() else p.parent / "Tools" else: for d in sorted(REPO_DIR.iterdir()): if d.is_dir() and (d / "Deploy" / "Control" / "HardwareDriver.json").exists(): control_dir = d / "Deploy" / "Control" tools_dir = d / "Tools" break if not control_dir or not (control_dir / "HardwareDriver.json").exists(): sys.exit("ERROR: HardwareDriver.json not found. Use --cache-path or ensure a local image dir exists.") if not (tools_dir / "user_selections.json").exists(): sys.exit("ERROR: user_selections.json not found") with open(control_dir / "HardwareDriver.json") as f: hw_drivers = json.load(f) with open(tools_dir / "user_selections.json") as f: selections = json.load(f)[0] os_id = selections["OperatingSystemSelection"] selected_families = set(m["Id"] for m in selections["HardwareModelSelection"]) # Filter to selected + matching OS our_entries = [d for d in hw_drivers if d["family"] in selected_families and os_id in d.get("aOsIds", [])] # Collect unique model names and their DestinationDirs model_dest_map = {} # model_name → dest_dir for entry in our_entries: dest = resolve_dest_dir(entry["DestinationDir"]) for m in entry["models"].split(","): m = m.strip() if m not in model_dest_map: model_dest_map[m] = dest base_path = f"{IMAGE_BASE}/{args.image}" if args.image else UPLOAD_DEST print() print("=" * 60) print(" Dell Driver Downloader for PXE Server") print("=" * 60) # --- Download Dell catalog --- with tempfile.TemporaryDirectory(prefix="dell-catalog-") as catdir: xml_path = download_and_extract_cab(DELL_DRIVER_CATALOG, catdir) if not xml_path: sys.exit("ERROR: Could not download Dell driver catalog") dell_packs = parse_driver_catalog(xml_path) print(f" Catalog: {len(dell_packs)} Win11 driver packs available") bios_updates = [] if args.bios: bios_xml = download_and_extract_cab(DELL_BIOS_CATALOG, catdir) if bios_xml: bios_updates = parse_bios_catalog(bios_xml, list(model_dest_map.keys())) print(f" BIOS: {len(bios_updates)} update(s) found") # --- Match our models to Dell catalog --- # Group: dest_dir → list of Dell packs to download download_plan = [] # list of {dell_pack, dest_dir, our_models} unmatched = [] seen_urls = set() dest_seen = {} # dest_dir → set of URLs already planned for model_name, dest_dir in model_dest_map.items(): matches = find_dell_packs(model_name, dell_packs) if not matches: unmatched.append(model_name) continue for pack in matches: if pack["url"] in seen_urls: continue seen_urls.add(pack["url"]) download_plan.append({ "pack": pack, "dest_dir": dest_dir, "model": model_name, }) # --- Display plan --- print() total_drv_size = sum(d["pack"]["size"] for d in download_plan) print(f" Drivers: {len(download_plan)} pack(s) to download ({format_size(total_drv_size)})") print(f" Target: {args.server}:{base_path}") if unmatched: print(f" No Dell match: {len(unmatched)} model(s)") print() for i, d in enumerate(download_plan, 1): p = d["pack"] print(f" {i:3}. {d['model']:<38} {format_size(p['size']):>8} {p['filename']}") print(f" -> {d['dest_dir']}") if unmatched: print() print(f" Unmatched models (not in Dell public catalog):") for m in unmatched: print(f" - {m}") if bios_updates: total_bios = sum(b["size"] for b in bios_updates) print() print(f" BIOS updates: {len(bios_updates)} ({format_size(total_bios)})") for b in bios_updates: print(f" {b['model']:<35} v{b['version']} {b['filename']}") print() if args.list: print(" (--list mode, nothing downloaded)") return # --- LOCAL MODE: download to local directory --- if args.local: local_dir = Path(args.local) drv_dir = local_dir / "drivers" bios_local_dir = local_dir / "bios" drv_dir.mkdir(parents=True, exist_ok=True) # Load local manifest manifest_path = local_dir / "manifest.json" manifest = json.loads(manifest_path.read_text()) if manifest_path.exists() else {} # Thread-safe counters and manifest access _lock = threading.Lock() counters = {"completed": 0, "skipped": 0, "errors": 0} # Build GE filename mapping from our HardwareDriver.json entries ge_filename_map = {} # model_name → GE FileName for entry in our_entries: fn = entry.get("FileName") or entry.get("fileName", "") dest = resolve_dest_dir(entry.get("DestinationDir") or entry.get("destinationDir", "")) for m in (entry.get("models") or entry.get("modelswminame", "")).split(","): m = m.strip() if m and fn: ge_filename_map[m] = {"filename": fn, "dest_dir": dest} def _download_one_local(i, d): """Download a single driver pack (local mode). Thread-safe.""" pack = d["pack"] tag = f"[{i}/{len(download_plan)}]" with _lock: print(f"{'=' * 60}") print(f"{tag} {d['model']} ({format_size(pack['size'])})") print(f"{'=' * 60}") # Check if already downloaded (manifest or file size match) local_file = drv_dir / pack["filename"] if not args.force: with _lock: existing_hash = manifest.get("drivers", {}).get(pack["url"]) if existing_hash == pack["sha256"]: with _lock: print(f"{tag} Already downloaded (hash matches)") counters["skipped"] += 1 return if local_file.exists() and local_file.stat().st_size == pack["size"]: with _lock: print(f"{tag} Already downloaded (size matches)") manifest.setdefault("drivers", {})[pack["url"]] = pack["sha256"] counters["skipped"] += 1 return # Download raw .exe to drivers/ with _lock: print(f"{tag} Downloading {format_size(pack['size'])}...") r = subprocess.run(["curl", "-L", "-s", "-S", "--speed-limit", "1000", "--speed-time", "30", "--retry", "3", "--retry-delay", "5", "-o", str(local_file), pack["url"]]) if r.returncode != 0 or not local_file.exists(): with _lock: print(f"{tag} ERROR: Download failed (curl exit {r.returncode})") counters["errors"] += 1 if local_file.exists(): local_file.unlink() return # Verify size first actual_size = local_file.stat().st_size if pack["size"] and actual_size != pack["size"]: with _lock: print(f"{tag} ERROR: Size mismatch (got {format_size(actual_size)}, expected {format_size(pack['size'])})") counters["errors"] += 1 local_file.unlink() return # Verify hash if pack["sha256"]: with _lock: print(f"{tag} Verifying SHA256...", end=" ", flush=True) if not verify_sha256(str(local_file), pack["sha256"]): with _lock: print("MISMATCH!") counters["errors"] += 1 local_file.unlink() return with _lock: print("OK") ge_info = ge_filename_map.get(d["model"], {}) with _lock: counters["completed"] += 1 manifest.setdefault("drivers", {})[pack["url"]] = pack["sha256"] manifest.setdefault("mapping", {})[pack["filename"]] = { "model": d["model"], "dell_filename": pack["filename"], "ge_filename": ge_info.get("filename", ""), "dest_dir": d["dest_dir"], "sha256": pack["sha256"], "size": pack["size"], } print(f"{tag} Done.") workers = max(1, args.parallel) if workers > 1: print(f" Downloading with {workers} parallel workers") with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool: futures = [pool.submit(_download_one_local, i, d) for i, d in enumerate(download_plan, 1)] concurrent.futures.wait(futures) # --- Download BIOS --- bios_ok = bios_err = 0 if bios_updates: bios_local_dir.mkdir(parents=True, exist_ok=True) print(f"{'=' * 60}") print(f" BIOS Updates -> {bios_local_dir}") print(f"{'=' * 60}") def _download_one_bios(b): nonlocal bios_ok, bios_err with _lock: print(f"\n {b['model']} v{b['version']}") if not args.force: with _lock: existing = manifest.get("bios", {}).get(b["model"]) if existing == b["version"]: with _lock: print(f" Already downloaded (v{b['version']})") return local_file = bios_local_dir / b["filename"] with _lock: print(f" [{b['model']}] Downloading {format_size(b['size'])}...") r = subprocess.run(["curl", "-L", "-s", "-S", "--speed-limit", "1000", "--speed-time", "30", "--retry", "3", "--retry-delay", "5", "-o", str(local_file), b["url"]]) if r.returncode != 0: with _lock: print(f" [{b['model']}] ERROR: Download failed") bios_err += 1 if local_file.exists(): local_file.unlink() return with _lock: bios_ok += 1 manifest.setdefault("bios", {})[b["model"]] = b["version"] manifest.setdefault("bios_mapping", {})[b["filename"]] = { "model": b["model"], "version": b["version"], "filename": b["filename"], } print(f" [{b['model']}] Done.") with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool: futures = [pool.submit(_download_one_bios, b) for b in bios_updates] concurrent.futures.wait(futures) # Save manifest manifest_path.write_text(json.dumps(manifest, indent=2)) # --- Summary --- print() print(f"{'=' * 60}") print(f" Summary — Local Download") print(f"{'=' * 60}") print(f" Drivers downloaded: {counters['completed']}") if counters["skipped"]: print(f" Drivers skipped: {counters['skipped']} (already have)") if counters["errors"]: print(f" Drivers failed: {counters['errors']}") if bios_updates: print(f" BIOS downloaded: {bios_ok}") if bios_err: print(f" BIOS failed: {bios_err}") print(f" Saved to: {local_dir}") print(f" Manifest: {manifest_path}") print() print(f" To push to server later:") print(f" python3 download-drivers.py --push-local {local_dir}") print() return # --- REMOTE MODE: download and push to server --- # --- Verify SSH --- print(f" Testing SSH to {args.server}...", end=" ", flush=True) r = ssh_cmd(args.server, "echo OK") if r.stdout.strip() != "OK": print("FAILED") sys.exit(f" Cannot SSH to {PXE_USER}@{args.server}: {r.stderr.strip()}") print("OK") print() # --- Load manifest (tracks what's been downloaded by hash) --- manifest_path = f"{base_path}/.driver-manifest.json" r = ssh_cmd(args.server, f"cat '{manifest_path}' 2>/dev/null") manifest = json.loads(r.stdout) if r.stdout.strip() else {} # Thread-safe counters and manifest access _lock = threading.Lock() counters = {"completed": 0, "skipped": 0, "errors": 0} # --- Download drivers --- with tempfile.TemporaryDirectory(prefix="pxe-drivers-") as tmpdir: def _process_one_remote(i, d): """Download, extract, re-zip, and push one driver pack. Thread-safe.""" pack = d["pack"] target = f"{base_path}/{d['dest_dir']}" tag = f"[{i}/{len(download_plan)}]" with _lock: print(f"{'=' * 60}") print(f"{tag} {d['model']} ({format_size(pack['size'])})") print(f"{'=' * 60}") if not args.force: with _lock: existing_hash = manifest.get(d["dest_dir"], {}).get(pack["filename"]) if existing_hash == pack["sha256"]: with _lock: print(f"{tag} Up to date (hash matches manifest)") counters["skipped"] += 1 return # Each worker gets its own temp subdirectory worker_tmp = os.path.join(tmpdir, f"worker-{i}") os.makedirs(worker_tmp, exist_ok=True) ok = process_download(args, pack["url"], pack["filename"], pack["sha256"], pack["size"], target, d["model"], worker_tmp) with _lock: if ok: counters["completed"] += 1 manifest.setdefault(d["dest_dir"], {})[pack["filename"]] = pack["sha256"] print(f"{tag} Done.") else: counters["errors"] += 1 workers = max(1, args.parallel) if workers > 1: print(f" Processing with {workers} parallel workers") with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool: futures = [pool.submit(_process_one_remote, i, d) for i, d in enumerate(download_plan, 1)] concurrent.futures.wait(futures) # --- Download BIOS (goes to enrollment share, shared across all images) --- bios_ok = bios_err = 0 bios_dir = "/srv/samba/enrollment/BIOS" if bios_updates: print(f"{'=' * 60}") print(f" BIOS Updates -> {bios_dir}") print(f"{'=' * 60}") ssh_cmd(args.server, f"mkdir -p '{bios_dir}'") models_txt = [] # lines for models.txt manifest def _process_one_bios(b): nonlocal bios_ok, bios_err target = f"{bios_dir}/{b['filename']}" with _lock: print(f"\n {b['model']} v{b['version']}") if not args.force: with _lock: existing = manifest.get("BIOS", {}).get(b["model"]) if existing == b["version"]: with _lock: print(f" Up to date (v{b['version']})") models_txt.append(f"{b['model']}|{b['filename']}") return # BIOS .exe goes as-is (not extracted) bios_tmp = os.path.join(tmpdir, f"bios-{b['filename']}") with _lock: print(f" [{b['model']}] Downloading {format_size(b['size'])}...") r = subprocess.run(["curl", "-L", "-s", "-S", "--speed-limit", "1000", "--speed-time", "30", "--retry", "3", "--retry-delay", "5", "-o", bios_tmp, b["url"]]) if r.returncode != 0: with _lock: print(f" [{b['model']}] ERROR: Download failed") bios_err += 1 if os.path.exists(bios_tmp): os.remove(bios_tmp) return r = subprocess.run([ "rsync", "-a", "-e", f"sshpass -p {PXE_PASS} ssh -o StrictHostKeyChecking=no -o LogLevel=ERROR", bios_tmp, f"{PXE_USER}@{args.server}:{target}" ]) os.remove(bios_tmp) if r.returncode != 0: with _lock: print(f" [{b['model']}] ERROR: Push failed") bios_err += 1 else: with _lock: print(f" [{b['model']}] Done.") bios_ok += 1 manifest.setdefault("BIOS", {})[b["model"]] = b["version"] models_txt.append(f"{b['model']}|{b['filename']}") with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool: futures = [pool.submit(_process_one_bios, b) for b in bios_updates] concurrent.futures.wait(futures) # Generate models.txt for check-bios.cmd if models_txt: manifest_content = "# ModelSubstring|BIOSFile\\n" + "\\n".join(models_txt) + "\\n" ssh_cmd(args.server, f"printf '{manifest_content}' > '{bios_dir}/models.txt'") print(f"\n models.txt updated ({len(models_txt)} entries)") # --- Save manifest --- completed, skipped, errors = counters["completed"], counters["skipped"], counters["errors"] if completed > 0 or bios_ok > 0: manifest_json = json.dumps(manifest, indent=2) ssh_cmd(args.server, f"cat > '{manifest_path}' << 'MANIFEST_EOF'\n{manifest_json}\nMANIFEST_EOF") print(f" Manifest saved to {manifest_path}") # --- Summary --- print() print(f"{'=' * 60}") print(f" Summary") print(f"{'=' * 60}") print(f" Drivers downloaded: {completed}") if skipped: print(f" Drivers skipped: {skipped} (up to date)") if errors: print(f" Drivers failed: {errors}") if bios_updates: print(f" BIOS downloaded: {bios_ok}") if bios_err: print(f" BIOS failed: {bios_err}") print() if completed > 0 and not args.image: print(f" Drivers staged in {base_path}/Deploy/Out-of-box Drivers/") print(f" Use the webapp (http://{args.server}:9009) to import,") print(f" or re-run with --image to push directly.") print() if __name__ == "__main__": main()