"""Per-image-type state probes: status + config (drivers / OS / packages / models).""" import os import re import shutil import config from services import fs # File-name sanitizer for uploads. Strips anything outside [A-Za-z0-9._-] # so a malicious filename can't break out of the destination directory. _SAFE_NAME = re.compile(r"[^A-Za-z0-9._-]") def _safe_filename(name: str) -> str: base = os.path.basename(name or "") cleaned = _SAFE_NAME.sub("_", base) return cleaned or "upload.bin" def image_status(image_type): """Return a dict describing the state of an image type.""" dp = fs.deploy_path(image_type) up = fs.unattend_path(image_type) has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False has_unattend = os.path.isfile(up) return { "image_type": image_type, "friendly_name": config.FRIENDLY_NAMES.get(image_type, image_type), "deploy_path": dp, "has_content": has_content, "has_unattend": has_unattend, } def load_image_config(image_type): """Load all JSON configs for an image and check on-disk presence.""" ctrl = fs.control_path(image_type) tools = fs.tools_path(image_type) # --- Drivers (merge HardwareDriver.json + hw_drivers.json) --- hw_driver_file = os.path.join(ctrl, "HardwareDriver.json") hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json") drivers_raw = fs.load_json(hw_driver_file) extra_raw = fs.load_json(hw_drivers_extra) seen_files = set() drivers = [] for d in drivers_raw + extra_raw: fname = (d.get("FileName") or d.get("fileName") or "").lower() if fname and fname in seen_files: continue if fname: seen_files.add(fname) drivers.append(d) for d in drivers: fname = d.get("FileName") or d.get("fileName") or "" dest = d.get("DestinationDir") or d.get("destinationDir") or "" resolved = fs.resolve_destination(dest, image_type) if resolved and fname: d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) else: d["_on_disk"] = False # --- Operating Systems --- os_file = os.path.join(ctrl, "OperatingSystem.json") operating_systems = fs.load_json(os_file) for entry in operating_systems: osv = entry.get("operatingSystemVersion", {}) wim = osv.get("wim", {}) dest = wim.get("DestinationDir") or wim.get("destinationDir") or "" resolved = fs.resolve_destination(dest, image_type) if resolved: entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim")) else: entry["_on_disk"] = False # --- Packages --- pkg_file = os.path.join(ctrl, "packages.json") packages = fs.load_json(pkg_file) for p in packages: fname = p.get("fileName") or p.get("FileName") or "" dest = p.get("destinationDir") or p.get("DestinationDir") or "" resolved = fs.resolve_destination(dest, image_type) if resolved and fname: p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) else: p["_on_disk"] = False # --- Hardware Models (user_selections.json) --- us_file = os.path.join(tools, "user_selections.json") us_raw = fs.load_json(us_file) us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} hardware_models = us_data.get("HardwareModelSelection", []) os_selection = str(us_data.get("OperatingSystemSelection", "")) family_lookup = {} for d in drivers: family = d.get("family", "") if family: family_lookup[family] = d for hm in hardware_models: family_id = hm.get("Id", "") matched = family_lookup.get(family_id) hm["_on_disk"] = matched["_on_disk"] if matched else False # --- Orphan drivers: zip files on disk not referenced in any JSON --- orphan_drivers = [] oob_dir = os.path.join(fs.deploy_path(image_type), "Out-of-box Drivers") try: oob_dir = os.path.realpath(oob_dir) except OSError: pass registered_files = set() for d in drivers: fname = d.get("FileName") or d.get("fileName") or "" if fname: registered_files.add(fname.lower()) if os.path.isdir(oob_dir): for dirpath, _dirnames, filenames in os.walk(oob_dir): for fn in filenames: if fn.lower().endswith(".zip") and fn.lower() not in registered_files: rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir) orphan_drivers.append({"fileName": fn, "relPath": rel}) return { "hardware_models": hardware_models, "drivers": drivers, "operating_systems": operating_systems, "packages": packages, "orphan_drivers": orphan_drivers, "os_selection": os_selection, } # --------------------------------------------------------------------------- # Driver / package upload + orphan adoption # --------------------------------------------------------------------------- DRIVERS_SUBDIR = "Out-of-box Drivers" PACKAGES_SUBDIR = "Packages" def _drivers_dir(image_type: str) -> str: return os.path.join(fs.deploy_path(image_type), DRIVERS_SUBDIR) def _packages_dir(image_type: str) -> str: return os.path.join(fs.deploy_path(image_type), PACKAGES_SUBDIR) def upload_driver(image_type: str, uploaded_file, family: str = "", destination_dir: str = "", overwrite: bool = False) -> dict: """Save an uploaded driver .zip into the image's Out-of-box Drivers dir. When family + destination_dir are provided, also append a HardwareDriver .json entry so the driver is recognized at deploy time. Returns a dict describing what landed on disk + whether an entry was registered.""" fname = _safe_filename(uploaded_file.filename) if not fname.lower().endswith(".zip"): raise ValueError("Driver upload must be a .zip file") dst_dir = _drivers_dir(image_type) os.makedirs(dst_dir, exist_ok=True) dst_path = os.path.join(dst_dir, fname) if os.path.exists(dst_path) and not overwrite: raise FileExistsError(f"{fname} already exists in {DRIVERS_SUBDIR}/") uploaded_file.save(dst_path) registered = False if family and destination_dir: adopt_orphan(image_type, fname, family, destination_dir) registered = True return { "filename": fname, "path": dst_path, "registered": registered, } def adopt_orphan(image_type: str, filename: str, family: str, destination_dir: str) -> dict: """Append a HardwareDriver.json entry for an existing .zip in the image's Out-of-box Drivers dir, so it stops showing up as an orphan + gets deployed for the named hardware family. Idempotent: a second adopt with the same filename is a no-op.""" safe_name = _safe_filename(filename) drivers_path = _drivers_dir(image_type) if not os.path.isfile(os.path.join(drivers_path, safe_name)): raise FileNotFoundError(f"{safe_name} not found in {DRIVERS_SUBDIR}/") if not family: raise ValueError("family is required (matches HardwareModelSelection.Id)") if not destination_dir: raise ValueError("destination_dir is required") ctrl = fs.control_path(image_type) hw_file = os.path.join(ctrl, "HardwareDriver.json") entries = fs.load_json(hw_file) for e in entries: if (e.get("FileName") or e.get("fileName") or "").lower() == safe_name.lower(): return {"filename": safe_name, "already_registered": True} entries.append({ "FileName": safe_name, "DestinationDir": destination_dir, "family": family, }) fs.save_json(hw_file, entries) return {"filename": safe_name, "already_registered": False} def remove_orphans(image_type: str, filenames: list[str]) -> dict: """Delete the named files from Out-of-box Drivers/. Caller is responsible for confirming this is what the user wants (orphan files have no JSON entry, so deleting them is safe in the sense that nothing references them; but they may be stash for future adoption).""" drivers_path = _drivers_dir(image_type) removed, missing = [], [] for fn in filenames: safe = _safe_filename(fn) path = os.path.join(drivers_path, safe) if not os.path.isfile(path): missing.append(safe) continue try: os.unlink(path) removed.append(safe) except OSError: missing.append(safe) return {"removed": removed, "missing": missing} def upload_package(image_type: str, uploaded_file, destination_dir: str = "", overwrite: bool = False) -> dict: """Save an uploaded package (any extension) into the image's Packages dir, and append an entry to packages.json when destination_dir is set so it deploys at imaging time.""" fname = _safe_filename(uploaded_file.filename) dst_dir = _packages_dir(image_type) os.makedirs(dst_dir, exist_ok=True) dst_path = os.path.join(dst_dir, fname) if os.path.exists(dst_path) and not overwrite: raise FileExistsError(f"{fname} already exists in {PACKAGES_SUBDIR}/") uploaded_file.save(dst_path) registered = False if destination_dir: ctrl = fs.control_path(image_type) pkg_file = os.path.join(ctrl, "packages.json") entries = fs.load_json(pkg_file) already = any( (e.get("fileName") or e.get("FileName") or "").lower() == fname.lower() for e in entries ) if not already: entries.append({ "fileName": fname, "destinationDir": destination_dir, }) fs.save_json(pkg_file, entries) registered = True return { "filename": fname, "path": dst_path, "registered": registered, }