Phase 1a of a multi-session refactor toward a clean blueprint
structure. Pulls the helper code that lived alongside the routes in
the 1621-line app.py into focused modules. app.py is now 625 lines
of mostly routes plus a small Flask wiring header. Behaviour is
unchanged: smoke-tested against the 8 main GET routes (200 OK).
New modules:
- config.py env vars + IMAGE_TYPES + FRIENDLY_NAMES +
SHARED_DEPLOY_* taxonomy + unattend XML
namespaces.
- services/audit.py audit log file handler + audit() helper.
- services/csrf.py session CSRF token + before_request validator
wired via init_csrf(app).
- services/fs.py image_root / deploy_path / unattend_path /
control_path / tools_path + load_json /
save_json + resolve_destination.
- services/system.py service_status / find_usb_mounts /
find_upload_sources.
- services/images.py image_status + load_image_config.
- services/deploy.py import_deploy + _merge_tree +
_replace_with_symlink + allowed_import_source.
- services/unattend.py parse_unattend / build_unattend_xml /
extract_form_data and the qn / qwcm / settings
pass helpers.
- services/wim.py extract_startnet / update_startnet / list_files
wrapping wimextract / wimupdate / wimdir.
Endpoint names kept stable (dashboard, clonezilla_backups, etc.) so
existing url_for(...) calls in templates are unchanged. Phase 1b
(Flask blueprints with ".endpoint" naming) deferred to a future
session because it requires updating ~30 url_for sites in templates
and is mostly cosmetic.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
124 lines
4.5 KiB
Python
124 lines
4.5 KiB
Python
"""Per-image-type state probes: status + config (drivers / OS / packages / models)."""
|
|
|
|
import os
|
|
|
|
import config
|
|
from services import fs
|
|
|
|
|
|
def image_status(image_type):
|
|
"""Return a dict describing the state of an image type."""
|
|
dp = fs.deploy_path(image_type)
|
|
up = fs.unattend_path(image_type)
|
|
has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False
|
|
has_unattend = os.path.isfile(up)
|
|
return {
|
|
"image_type": image_type,
|
|
"friendly_name": config.FRIENDLY_NAMES.get(image_type, image_type),
|
|
"deploy_path": dp,
|
|
"has_content": has_content,
|
|
"has_unattend": has_unattend,
|
|
}
|
|
|
|
|
|
def load_image_config(image_type):
|
|
"""Load all JSON configs for an image and check on-disk presence."""
|
|
ctrl = fs.control_path(image_type)
|
|
tools = fs.tools_path(image_type)
|
|
|
|
# --- Drivers (merge HardwareDriver.json + hw_drivers.json) ---
|
|
hw_driver_file = os.path.join(ctrl, "HardwareDriver.json")
|
|
hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json")
|
|
drivers_raw = fs.load_json(hw_driver_file)
|
|
extra_raw = fs.load_json(hw_drivers_extra)
|
|
|
|
seen_files = set()
|
|
drivers = []
|
|
for d in drivers_raw + extra_raw:
|
|
fname = (d.get("FileName") or d.get("fileName") or "").lower()
|
|
if fname and fname in seen_files:
|
|
continue
|
|
if fname:
|
|
seen_files.add(fname)
|
|
drivers.append(d)
|
|
|
|
for d in drivers:
|
|
fname = d.get("FileName") or d.get("fileName") or ""
|
|
dest = d.get("DestinationDir") or d.get("destinationDir") or ""
|
|
resolved = fs.resolve_destination(dest, image_type)
|
|
if resolved and fname:
|
|
d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
|
|
else:
|
|
d["_on_disk"] = False
|
|
|
|
# --- Operating Systems ---
|
|
os_file = os.path.join(ctrl, "OperatingSystem.json")
|
|
operating_systems = fs.load_json(os_file)
|
|
for entry in operating_systems:
|
|
osv = entry.get("operatingSystemVersion", {})
|
|
wim = osv.get("wim", {})
|
|
dest = wim.get("DestinationDir") or wim.get("destinationDir") or ""
|
|
resolved = fs.resolve_destination(dest, image_type)
|
|
if resolved:
|
|
entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim"))
|
|
else:
|
|
entry["_on_disk"] = False
|
|
|
|
# --- Packages ---
|
|
pkg_file = os.path.join(ctrl, "packages.json")
|
|
packages = fs.load_json(pkg_file)
|
|
for p in packages:
|
|
fname = p.get("fileName") or p.get("FileName") or ""
|
|
dest = p.get("destinationDir") or p.get("DestinationDir") or ""
|
|
resolved = fs.resolve_destination(dest, image_type)
|
|
if resolved and fname:
|
|
p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname))
|
|
else:
|
|
p["_on_disk"] = False
|
|
|
|
# --- Hardware Models (user_selections.json) ---
|
|
us_file = os.path.join(tools, "user_selections.json")
|
|
us_raw = fs.load_json(us_file)
|
|
us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {}
|
|
hardware_models = us_data.get("HardwareModelSelection", [])
|
|
os_selection = str(us_data.get("OperatingSystemSelection", ""))
|
|
|
|
family_lookup = {}
|
|
for d in drivers:
|
|
family = d.get("family", "")
|
|
if family:
|
|
family_lookup[family] = d
|
|
|
|
for hm in hardware_models:
|
|
family_id = hm.get("Id", "")
|
|
matched = family_lookup.get(family_id)
|
|
hm["_on_disk"] = matched["_on_disk"] if matched else False
|
|
|
|
# --- Orphan drivers: zip files on disk not referenced in any JSON ---
|
|
orphan_drivers = []
|
|
oob_dir = os.path.join(fs.deploy_path(image_type), "Out-of-box Drivers")
|
|
try:
|
|
oob_dir = os.path.realpath(oob_dir)
|
|
except OSError:
|
|
pass
|
|
registered_files = set()
|
|
for d in drivers:
|
|
fname = d.get("FileName") or d.get("fileName") or ""
|
|
if fname:
|
|
registered_files.add(fname.lower())
|
|
if os.path.isdir(oob_dir):
|
|
for dirpath, _dirnames, filenames in os.walk(oob_dir):
|
|
for fn in filenames:
|
|
if fn.lower().endswith(".zip") and fn.lower() not in registered_files:
|
|
rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir)
|
|
orphan_drivers.append({"fileName": fn, "relPath": rel})
|
|
|
|
return {
|
|
"hardware_models": hardware_models,
|
|
"drivers": drivers,
|
|
"operating_systems": operating_systems,
|
|
"packages": packages,
|
|
"orphan_drivers": orphan_drivers,
|
|
"os_selection": os_selection,
|
|
}
|