#!/usr/bin/env python3 """Flask web application for managing a GE Aerospace PXE server.""" import json import logging import os import secrets import shutil import subprocess import tempfile from datetime import datetime from pathlib import Path from flask import ( Flask, abort, flash, jsonify, redirect, render_template, request, send_file, session, url_for, ) from lxml import etree from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod") app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 * 1024 # 16 GB max upload # --------------------------------------------------------------------------- # Audit logging # --------------------------------------------------------------------------- AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log") audit_logger = logging.getLogger("pxe_audit") audit_logger.setLevel(logging.INFO) _audit_handler = logging.FileHandler(AUDIT_LOG, mode="a") _audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) audit_logger.addHandler(_audit_handler) def audit(action, detail=""): """Write an entry to the audit log.""" ip = request.remote_addr if request else "system" audit_logger.info(f"[{ip}] {action}: {detail}") # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- SAMBA_SHARE = os.environ.get("SAMBA_SHARE", "/srv/samba/winpeapps") CLONEZILLA_SHARE = os.environ.get("CLONEZILLA_SHARE", "/srv/samba/clonezilla") BLANCCO_REPORTS = os.environ.get("BLANCCO_REPORTS", "/srv/samba/blancco-reports") ENROLLMENT_SHARE = os.environ.get("ENROLLMENT_SHARE", "/srv/samba/enrollment") UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/home/pxe/image-upload") SHARED_DIR = os.path.join(SAMBA_SHARE, "_shared") # Subdirs inside Deploy/ shared across ALL image types SHARED_DEPLOY_GLOBAL = ["Out-of-box Drivers"] # Subdirs inside Deploy/ shared within the same image family (by prefix) SHARED_DEPLOY_SCOPED = { "gea-": ["Operating Systems", "Packages"], "ge-": ["Operating Systems", "Packages"], } # Sibling dirs at image root shared within the same image family SHARED_ROOT_DIRS = { "gea-": ["Sources"], "ge-": ["Sources"], } WEB_ROOT = os.environ.get("WEB_ROOT", "/var/www/html") BOOT_WIM = os.path.join(WEB_ROOT, "win11", "sources", "boot.wim") IMAGE_TYPES = [ "gea-standard", "gea-engineer", "gea-shopfloor", "gea-shopfloor-mce", "ge-standard", "ge-engineer", "ge-shopfloor-lockdown", "ge-shopfloor-mce", ] FRIENDLY_NAMES = { "gea-standard": "GE Aerospace Standard", "gea-engineer": "GE Aerospace Engineer", "gea-shopfloor": "GE Aerospace Shop Floor", "gea-shopfloor-mce": "GE Aerospace Shop Floor MCE", "ge-standard": "GE Legacy Standard", "ge-engineer": "GE Legacy Engineer", "ge-shopfloor-lockdown": "GE Legacy Shop Floor Lockdown", "ge-shopfloor-mce": "GE Legacy Shop Floor MCE", } # --------------------------------------------------------------------------- # CSRF protection # --------------------------------------------------------------------------- def generate_csrf_token(): """Return the CSRF token for the current session, creating one if needed.""" if "_csrf_token" not in session: session["_csrf_token"] = secrets.token_hex(32) return session["_csrf_token"] @app.context_processor def inject_csrf_token(): """Make csrf_token() available in all templates.""" return {"csrf_token": generate_csrf_token} @app.before_request def validate_csrf(): """Reject POST requests with a missing or invalid CSRF token.""" if request.method != "POST": return token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token") if not token or token != generate_csrf_token(): abort(403) NS = "urn:schemas-microsoft-com:unattend" WCM = "http://schemas.microsoft.com/WMIConfig/2002/State" NSMAP = {None: NS, "wcm": WCM} # Convenience qualified-name helpers def qn(tag): """Return a tag qualified with the default unattend namespace.""" return f"{{{NS}}}{tag}" def qwcm(attr): """Return an attribute qualified with the wcm namespace.""" return f"{{{WCM}}}{attr}" # --------------------------------------------------------------------------- # Utility helpers # --------------------------------------------------------------------------- def image_root(image_type): """Return the root directory for an image type.""" return os.path.join(SAMBA_SHARE, image_type) def deploy_path(image_type): """Return the Deploy directory for an image type.""" return os.path.join(SAMBA_SHARE, image_type, "Deploy") def unattend_path(image_type): """Return the unattend.xml path for an image type.""" return os.path.join(deploy_path(image_type), "FlatUnattendW10.xml") def control_path(image_type): """Return the Deploy/Control directory for an image type.""" return os.path.join(deploy_path(image_type), "Control") def tools_path(image_type): """Return the Tools directory for an image type.""" return os.path.join(SAMBA_SHARE, image_type, "Tools") def _load_json(filepath): """Parse a JSON file and return its contents, or [] on failure.""" try: with open(filepath, "r", encoding="utf-8-sig") as fh: return json.load(fh) except (OSError, json.JSONDecodeError): return [] def _save_json(filepath, data): """Write data as pretty-printed JSON.""" os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, "w", encoding="utf-8") as fh: json.dump(data, fh, indent=2, ensure_ascii=False) fh.write("\n") def _resolve_destination(dest_dir, image_type): """Convert a Windows *destinationdir* path to a Linux filesystem path. Replaces the ``*destinationdir*`` placeholder and backslashes, then prepends ``SAMBA_SHARE/image_type/`` and resolves symlinks. """ if not dest_dir: return "" # Replace the placeholder (case-insensitive) path = dest_dir lower = path.lower() idx = lower.find("*destinationdir*") if idx != -1: path = path[idx + len("*destinationdir*"):] # Backslash → forward slash, strip leading slash path = path.replace("\\", "/").lstrip("/") full = os.path.join(SAMBA_SHARE, image_type, path) # Resolve symlinks so shared dirs are found try: full = os.path.realpath(full) except OSError: pass return full def _load_image_config(image_type): """Load all JSON configs for an image and check on-disk presence.""" ctrl = control_path(image_type) tools = tools_path(image_type) # --- Drivers (merge HardwareDriver.json + hw_drivers.json) --- hw_driver_file = os.path.join(ctrl, "HardwareDriver.json") hw_drivers_extra = os.path.join(ctrl, "hw_drivers.json") drivers_raw = _load_json(hw_driver_file) extra_raw = _load_json(hw_drivers_extra) # Merge: dedup by FileName (case-insensitive) seen_files = set() drivers = [] for d in drivers_raw + extra_raw: fname = (d.get("FileName") or d.get("fileName") or "").lower() if fname and fname in seen_files: continue if fname: seen_files.add(fname) drivers.append(d) # Check disk presence for each driver for d in drivers: fname = d.get("FileName") or d.get("fileName") or "" dest = d.get("DestinationDir") or d.get("destinationDir") or "" resolved = _resolve_destination(dest, image_type) if resolved and fname: d["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) else: d["_on_disk"] = False # --- Operating Systems --- os_file = os.path.join(ctrl, "OperatingSystem.json") operating_systems = _load_json(os_file) for entry in operating_systems: osv = entry.get("operatingSystemVersion", {}) wim = osv.get("wim", {}) dest = wim.get("DestinationDir") or wim.get("destinationDir") or "" resolved = _resolve_destination(dest, image_type) if resolved: entry["_on_disk"] = os.path.isfile(os.path.join(resolved, "install.wim")) else: entry["_on_disk"] = False # --- Packages --- pkg_file = os.path.join(ctrl, "packages.json") packages = _load_json(pkg_file) for p in packages: fname = p.get("fileName") or p.get("FileName") or "" dest = p.get("destinationDir") or p.get("DestinationDir") or "" resolved = _resolve_destination(dest, image_type) if resolved and fname: p["_on_disk"] = os.path.isfile(os.path.join(resolved, fname)) else: p["_on_disk"] = False # --- Hardware Models (user_selections.json) --- us_file = os.path.join(tools, "user_selections.json") us_raw = _load_json(us_file) us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} hardware_models = us_data.get("HardwareModelSelection", []) os_selection = str(us_data.get("OperatingSystemSelection", "")) # Build a lookup: family → driver entry (for disk-presence on models) family_lookup = {} for d in drivers: family = d.get("family", "") if family: family_lookup[family] = d for hm in hardware_models: family_id = hm.get("Id", "") matched = family_lookup.get(family_id) hm["_on_disk"] = matched["_on_disk"] if matched else False # --- Orphan drivers: zip files on disk not referenced in any JSON --- orphan_drivers = [] oob_dir = os.path.join(deploy_path(image_type), "Out-of-box Drivers") # Resolve symlinks try: oob_dir = os.path.realpath(oob_dir) except OSError: pass registered_files = set() for d in drivers: fname = d.get("FileName") or d.get("fileName") or "" if fname: registered_files.add(fname.lower()) if os.path.isdir(oob_dir): for dirpath, _dirnames, filenames in os.walk(oob_dir): for fn in filenames: if fn.lower().endswith(".zip") and fn.lower() not in registered_files: rel = os.path.relpath(os.path.join(dirpath, fn), oob_dir) orphan_drivers.append({"fileName": fn, "relPath": rel}) return { "hardware_models": hardware_models, "drivers": drivers, "operating_systems": operating_systems, "packages": packages, "orphan_drivers": orphan_drivers, "os_selection": os_selection, } def image_status(image_type): """Return a dict describing the state of an image type.""" dp = deploy_path(image_type) up = unattend_path(image_type) has_content = os.path.isdir(dp) and any(os.scandir(dp)) if os.path.isdir(dp) else False has_unattend = os.path.isfile(up) return { "image_type": image_type, "friendly_name": FRIENDLY_NAMES.get(image_type, image_type), "deploy_path": dp, "has_content": has_content, "has_unattend": has_unattend, } def service_status(service_name): """Check whether a systemd service is active.""" try: result = subprocess.run( ["systemctl", "is-active", service_name], capture_output=True, text=True, timeout=5, ) state = result.stdout.strip() return {"name": service_name, "active": state == "active", "state": state} except Exception as exc: return {"name": service_name, "active": False, "state": str(exc)} def find_usb_mounts(): """Return a list of mount-point paths that look like removable media.""" mounts = [] try: with open("/proc/mounts", "r") as fh: for line in fh: parts = line.split() if len(parts) >= 2: mount_point = parts[1] if mount_point.startswith(("/mnt/", "/media/")): if os.path.isdir(mount_point): mounts.append(mount_point) except OSError: pass return sorted(set(mounts)) def find_upload_sources(): """Return sub-directories inside UPLOAD_DIR that look like image content.""" sources = [] if os.path.isdir(UPLOAD_DIR): # Include the upload dir itself if it has content try: entries = os.listdir(UPLOAD_DIR) if entries: sources.append(UPLOAD_DIR) # Also include immediate subdirectories for entry in entries: full = os.path.join(UPLOAD_DIR, entry) if os.path.isdir(full): sources.append(full) except OSError: pass return sources def _import_deploy(src_deploy, dst_deploy, target="", move=False): """Import Deploy directory contents, merging shared subdirs into _shared. When move=True, files are moved instead of copied (saves disk space).""" # Build list of scoped shared dirs for this target scoped_shared = [] prefix_key = "" for prefix, dirs in SHARED_DEPLOY_SCOPED.items(): if target.startswith(prefix): scoped_shared = dirs prefix_key = prefix break _transfer = shutil.move if move else shutil.copy2 _transfer_tree = shutil.move if move else shutil.copytree os.makedirs(dst_deploy, exist_ok=True) for item in os.listdir(src_deploy): src_item = os.path.join(src_deploy, item) dst_item = os.path.join(dst_deploy, item) if not os.path.isdir(src_item): _transfer(src_item, dst_item) continue # Global shared (e.g., Out-of-box Drivers) — one copy for all if item in SHARED_DEPLOY_GLOBAL: shared_dest = os.path.join(SHARED_DIR, item) os.makedirs(shared_dest, exist_ok=True) _merge_tree(src_item, shared_dest, move=move) _replace_with_symlink(dst_item, shared_dest) continue # Scoped shared (e.g., Operating Systems) — per family prefix if item in scoped_shared: shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}") os.makedirs(shared_dest, exist_ok=True) _merge_tree(src_item, shared_dest, move=move) _replace_with_symlink(dst_item, shared_dest) continue # Normal transfer — merge to preserve existing custom content if os.path.isdir(dst_item): _merge_tree(src_item, dst_item, move=move) else: _transfer_tree(src_item, dst_item) def _replace_with_symlink(link_path, target_path): """Replace a file/dir/symlink at link_path with a symlink to target_path.""" if os.path.islink(link_path): os.remove(link_path) elif os.path.isdir(link_path): shutil.rmtree(link_path) os.symlink(target_path, link_path) def _merge_tree(src, dst, move=False): """Recursively merge src tree into dst, overwriting existing files. When move=True, files are moved instead of copied.""" _transfer = shutil.move if move else shutil.copy2 _transfer_tree = shutil.move if move else shutil.copytree for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): if os.path.isdir(d): _merge_tree(s, d, move=move) else: if os.path.exists(d): os.remove(d) _transfer_tree(s, d) else: os.makedirs(os.path.dirname(d), exist_ok=True) _transfer(s, d) def allowed_import_source(source): """Check if a source path is a valid import location (USB or upload dir).""" usb = find_usb_mounts() if any(source == m or source.startswith(m + "/") for m in usb): return True if source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/"): return os.path.isdir(source) return False # --------------------------------------------------------------------------- # XML helpers — parse / build unattend.xml # --------------------------------------------------------------------------- UNATTEND_TEMPLATE = """\ """ def _find_or_create(parent, tag): """Find the first child with *tag* or create it.""" el = parent.find(tag, namespaces={"": NS}) if el is None: el = etree.SubElement(parent, qn(tag.split("}")[-1]) if "}" not in tag else tag) return el def _settings_pass(root, pass_name): """Return the element, creating if needed.""" for s in root.findall(qn("settings")): if s.get("pass") == pass_name: return s s = etree.SubElement(root, qn("settings")) s.set("pass", pass_name) return s def parse_unattend(xml_path): """Parse an unattend.xml and return a dict of editable data.""" data = { "driver_paths": [], "computer_name": "", "registered_organization": "", "registered_owner": "", "time_zone": "", "specialize_commands": [], "oobe": { "HideEULAPage": "true", "HideOEMRegistrationScreen": "true", "HideOnlineAccountScreens": "true", "HideWirelessSetupInOOBE": "true", "HideLocalAccountScreen": "true", "NetworkLocation": "Work", "ProtectYourPC": "3", "SkipUserOOBE": "true", "SkipMachineOOBE": "true", }, "firstlogon_commands": [], "user_accounts": [], "autologon": { "enabled": "", "username": "", "password": "", "plain_text": "true", "logon_count": "", }, "intl": { "input_locale": "", "system_locale": "", "ui_language": "", "user_locale": "", }, "oobe_timezone": "", "raw_xml": "", } if not os.path.isfile(xml_path): data["raw_xml"] = UNATTEND_TEMPLATE return data with open(xml_path, "r", encoding="utf-8") as fh: raw = fh.read() data["raw_xml"] = raw try: root = etree.fromstring(raw.encode("utf-8")) except etree.XMLSyntaxError: return data ns = {"u": NS} # --- offlineServicing: DriverPaths --- for dp_el in root.xpath( "u:settings[@pass='offlineServicing']//u:PathAndCredentials/u:Path", namespaces=ns, ): if dp_el.text: data["driver_paths"].append(dp_el.text.strip()) # --- specialize: Shell-Setup --- for comp in root.xpath( "u:settings[@pass='specialize']/u:component", namespaces=ns, ): comp_name = comp.get("name", "") if "Shell-Setup" in comp_name: for tag, key in [ ("ComputerName", "computer_name"), ("RegisteredOrganization", "registered_organization"), ("RegisteredOwner", "registered_owner"), ("TimeZone", "time_zone"), ]: el = comp.find(qn(tag)) if el is not None and el.text: data[key] = el.text.strip() # --- specialize: RunSynchronous commands --- for cmd in root.xpath( "u:settings[@pass='specialize']//u:RunSynchronousCommand", namespaces=ns, ): order_el = cmd.find(qn("Order")) path_el = cmd.find(qn("Path")) desc_el = cmd.find(qn("Description")) data["specialize_commands"].append({ "order": order_el.text.strip() if order_el is not None and order_el.text else "", "path": path_el.text.strip() if path_el is not None and path_el.text else "", "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", }) # --- oobeSystem --- for comp in root.xpath( "u:settings[@pass='oobeSystem']/u:component", namespaces=ns, ): comp_name = comp.get("name", "") # International-Core component if "International-Core" in comp_name: for tag, key in [ ("InputLocale", "input_locale"), ("SystemLocale", "system_locale"), ("UILanguage", "ui_language"), ("UserLocale", "user_locale"), ]: el = comp.find(qn(tag)) if el is not None and el.text: data["intl"][key] = el.text.strip() if "OOBE" in comp_name or "Shell-Setup" in comp_name: oobe_el = comp.find(qn("OOBE")) if oobe_el is not None: for child in oobe_el: local = etree.QName(child).localname if local in data["oobe"] and child.text: data["oobe"][local] = child.text.strip() # UserAccounts / LocalAccounts ua = comp.find(qn("UserAccounts")) if ua is not None: la_container = ua.find(qn("LocalAccounts")) if la_container is not None: for acct in la_container.findall(qn("LocalAccount")): name_el = acct.find(qn("Name")) group_el = acct.find(qn("Group")) display_el = acct.find(qn("DisplayName")) pw_el = acct.find(qn("Password")) pw_val = "" pw_plain = "true" if pw_el is not None: v = pw_el.find(qn("Value")) p = pw_el.find(qn("PlainText")) if v is not None and v.text: pw_val = v.text.strip() if p is not None and p.text: pw_plain = p.text.strip() data["user_accounts"].append({ "name": name_el.text.strip() if name_el is not None and name_el.text else "", "password": pw_val, "plain_text": pw_plain, "group": group_el.text.strip() if group_el is not None and group_el.text else "Administrators", "display_name": display_el.text.strip() if display_el is not None and display_el.text else "", }) # AutoLogon al = comp.find(qn("AutoLogon")) if al is not None: enabled_el = al.find(qn("Enabled")) user_el = al.find(qn("Username")) count_el = al.find(qn("LogonCount")) pw_el = al.find(qn("Password")) pw_val = "" pw_plain = "true" if pw_el is not None: v = pw_el.find(qn("Value")) p = pw_el.find(qn("PlainText")) if v is not None and v.text: pw_val = v.text.strip() if p is not None and p.text: pw_plain = p.text.strip() data["autologon"] = { "enabled": enabled_el.text.strip() if enabled_el is not None and enabled_el.text else "", "username": user_el.text.strip() if user_el is not None and user_el.text else "", "password": pw_val, "plain_text": pw_plain, "logon_count": count_el.text.strip() if count_el is not None and count_el.text else "", } # FirstLogonCommands flc = comp.find(qn("FirstLogonCommands")) if flc is not None: for sync in flc.findall(qn("SynchronousCommand")): order_el = sync.find(qn("Order")) cl_el = sync.find(qn("CommandLine")) desc_el = sync.find(qn("Description")) data["firstlogon_commands"].append({ "order": order_el.text.strip() if order_el is not None and order_el.text else "", "commandline": cl_el.text.strip() if cl_el is not None and cl_el.text else "", "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", }) # TimeZone (oobeSystem pass) tz_el = comp.find(qn("TimeZone")) if tz_el is not None and tz_el.text: data["oobe_timezone"] = tz_el.text.strip() return data def build_unattend_xml(form_data): """Build a complete unattend.xml string from form data dict.""" root = etree.Element(qn("unattend"), nsmap=NSMAP) # --- windowsPE (empty) --- _settings_pass(root, "windowsPE") # --- offlineServicing: DriverPaths --- offline = _settings_pass(root, "offlineServicing") driver_paths = form_data.get("driver_paths", []) if driver_paths: comp = etree.SubElement(offline, qn("component")) comp.set("name", "Microsoft-Windows-PnpCustomizationsNonWinPE") comp.set("processorArchitecture", "amd64") comp.set("publicKeyToken", "31bf3856ad364e35") comp.set("language", "neutral") comp.set("versionScope", "nonSxS") dp_container = etree.SubElement(comp, qn("DriverPaths")) for idx, dp in enumerate(driver_paths, start=1): if not dp.strip(): continue pac = etree.SubElement(dp_container, qn("PathAndCredentials")) pac.set(qwcm("action"), "add") pac.set(qwcm("keyValue"), str(idx)) path_el = etree.SubElement(pac, qn("Path")) path_el.text = dp.strip() # --- specialize --- spec = _settings_pass(root, "specialize") # Shell-Setup component shell_comp = etree.SubElement(spec, qn("component")) shell_comp.set("name", "Microsoft-Windows-Shell-Setup") shell_comp.set("processorArchitecture", "amd64") shell_comp.set("publicKeyToken", "31bf3856ad364e35") shell_comp.set("language", "neutral") shell_comp.set("versionScope", "nonSxS") for tag, key in [ ("ComputerName", "computer_name"), ("RegisteredOrganization", "registered_organization"), ("RegisteredOwner", "registered_owner"), ("TimeZone", "time_zone"), ]: val = form_data.get(key, "").strip() if val: el = etree.SubElement(shell_comp, qn(tag)) el.text = val # Deployment / RunSynchronous commands spec_cmds = form_data.get("specialize_commands", []) if spec_cmds: deploy_comp = etree.SubElement(spec, qn("component")) deploy_comp.set("name", "Microsoft-Windows-Deployment") deploy_comp.set("processorArchitecture", "amd64") deploy_comp.set("publicKeyToken", "31bf3856ad364e35") deploy_comp.set("language", "neutral") deploy_comp.set("versionScope", "nonSxS") rs = etree.SubElement(deploy_comp, qn("RunSynchronous")) for idx, cmd in enumerate(spec_cmds, start=1): if not cmd.get("path", "").strip(): continue rsc = etree.SubElement(rs, qn("RunSynchronousCommand")) rsc.set(qwcm("action"), "add") order_el = etree.SubElement(rsc, qn("Order")) order_el.text = str(idx) path_el = etree.SubElement(rsc, qn("Path")) path_el.text = cmd["path"].strip() desc_el = etree.SubElement(rsc, qn("Description")) desc_el.text = cmd.get("description", "").strip() # --- oobeSystem --- oobe_settings = _settings_pass(root, "oobeSystem") # International-Core component (before Shell-Setup) intl = form_data.get("intl", {}) if any(v.strip() for v in intl.values() if v): intl_comp = etree.SubElement(oobe_settings, qn("component")) intl_comp.set("name", "Microsoft-Windows-International-Core") intl_comp.set("processorArchitecture", "amd64") intl_comp.set("publicKeyToken", "31bf3856ad364e35") intl_comp.set("language", "neutral") intl_comp.set("versionScope", "nonSxS") for tag, key in [ ("InputLocale", "input_locale"), ("SystemLocale", "system_locale"), ("UILanguage", "ui_language"), ("UserLocale", "user_locale"), ]: val = intl.get(key, "").strip() if val: el = etree.SubElement(intl_comp, qn(tag)) el.text = val # Shell-Setup component oobe_comp = etree.SubElement(oobe_settings, qn("component")) oobe_comp.set("name", "Microsoft-Windows-Shell-Setup") oobe_comp.set("processorArchitecture", "amd64") oobe_comp.set("publicKeyToken", "31bf3856ad364e35") oobe_comp.set("language", "neutral") oobe_comp.set("versionScope", "nonSxS") oobe_el = etree.SubElement(oobe_comp, qn("OOBE")) oobe_data = form_data.get("oobe", {}) for key in [ "HideEULAPage", "HideOEMRegistrationScreen", "HideOnlineAccountScreens", "HideWirelessSetupInOOBE", "HideLocalAccountScreen", "NetworkLocation", "ProtectYourPC", "SkipUserOOBE", "SkipMachineOOBE", ]: val = oobe_data.get(key, "") if val: child = etree.SubElement(oobe_el, qn(key)) child.text = str(val) # UserAccounts / LocalAccounts accounts = form_data.get("user_accounts", []) if accounts: ua = etree.SubElement(oobe_comp, qn("UserAccounts")) la_container = etree.SubElement(ua, qn("LocalAccounts")) for acct in accounts: if not acct.get("name", "").strip(): continue la = etree.SubElement(la_container, qn("LocalAccount")) la.set(qwcm("action"), "add") pw = etree.SubElement(la, qn("Password")) pw_val = etree.SubElement(pw, qn("Value")) pw_val.text = acct.get("password", "") pw_plain = etree.SubElement(pw, qn("PlainText")) pw_plain.text = acct.get("plain_text", "true") name_el = etree.SubElement(la, qn("Name")) name_el.text = acct["name"].strip() group_el = etree.SubElement(la, qn("Group")) group_el.text = acct.get("group", "Administrators").strip() display_el = etree.SubElement(la, qn("DisplayName")) display_el.text = acct.get("display_name", acct["name"]).strip() # AutoLogon autologon = form_data.get("autologon", {}) if autologon.get("username", "").strip(): al = etree.SubElement(oobe_comp, qn("AutoLogon")) al_pw = etree.SubElement(al, qn("Password")) al_pw_val = etree.SubElement(al_pw, qn("Value")) al_pw_val.text = autologon.get("password", "") al_pw_plain = etree.SubElement(al_pw, qn("PlainText")) al_pw_plain.text = autologon.get("plain_text", "true") al_enabled = etree.SubElement(al, qn("Enabled")) al_enabled.text = autologon.get("enabled", "true") al_user = etree.SubElement(al, qn("Username")) al_user.text = autologon["username"].strip() logon_count = autologon.get("logon_count", "").strip() if logon_count: al_count = etree.SubElement(al, qn("LogonCount")) al_count.text = logon_count # FirstLogonCommands fl_cmds = form_data.get("firstlogon_commands", []) if fl_cmds: flc = etree.SubElement(oobe_comp, qn("FirstLogonCommands")) for idx, cmd in enumerate(fl_cmds, start=1): if not cmd.get("commandline", "").strip(): continue sc = etree.SubElement(flc, qn("SynchronousCommand")) sc.set(qwcm("action"), "add") order_el = etree.SubElement(sc, qn("Order")) order_el.text = str(idx) cl_el = etree.SubElement(sc, qn("CommandLine")) cl_el.text = cmd["commandline"].strip() desc_el = etree.SubElement(sc, qn("Description")) desc_el.text = cmd.get("description", "").strip() # TimeZone (oobeSystem pass) oobe_tz = form_data.get("oobe_timezone", "").strip() if oobe_tz: tz_el = etree.SubElement(oobe_comp, qn("TimeZone")) tz_el.text = oobe_tz xml_bytes = etree.tostring( root, pretty_print=True, xml_declaration=True, encoding="utf-8", ) return xml_bytes.decode("utf-8") def _extract_form_data(form): """Pull structured data from the submitted form.""" data = {} # Driver paths dp_list = form.getlist("driver_path[]") data["driver_paths"] = [p for p in dp_list if p.strip()] # Machine settings data["computer_name"] = form.get("computer_name", "") data["registered_organization"] = form.get("registered_organization", "") data["registered_owner"] = form.get("registered_owner", "") data["time_zone"] = form.get("time_zone", "") # Specialize commands spec_paths = form.getlist("spec_cmd_path[]") spec_descs = form.getlist("spec_cmd_desc[]") data["specialize_commands"] = [] for i in range(len(spec_paths)): if spec_paths[i].strip(): data["specialize_commands"].append({ "path": spec_paths[i], "description": spec_descs[i] if i < len(spec_descs) else "", }) # OOBE settings data["oobe"] = {} for key in [ "HideEULAPage", "HideOEMRegistrationScreen", "HideOnlineAccountScreens", "HideWirelessSetupInOOBE", "HideLocalAccountScreen", "SkipUserOOBE", "SkipMachineOOBE", ]: data["oobe"][key] = form.get(f"oobe_{key}", "false") data["oobe"]["NetworkLocation"] = form.get("oobe_NetworkLocation", "Work") data["oobe"]["ProtectYourPC"] = form.get("oobe_ProtectYourPC", "3") # First logon commands fl_cls = form.getlist("fl_cmd_commandline[]") fl_descs = form.getlist("fl_cmd_desc[]") data["firstlogon_commands"] = [] for i in range(len(fl_cls)): if fl_cls[i].strip(): data["firstlogon_commands"].append({ "commandline": fl_cls[i], "description": fl_descs[i] if i < len(fl_descs) else "", }) # User accounts accounts = [] i = 0 while form.get(f"account_name_{i}"): accounts.append({ "name": form.get(f"account_name_{i}", ""), "password": form.get(f"account_password_{i}", ""), "plain_text": form.get(f"account_plaintext_{i}", "true"), "group": form.get(f"account_group_{i}", "Administrators"), "display_name": form.get(f"account_display_{i}", ""), }) i += 1 data["user_accounts"] = accounts # AutoLogon data["autologon"] = { "enabled": form.get("autologon_enabled", ""), "username": form.get("autologon_username", ""), "password": form.get("autologon_password", ""), "plain_text": form.get("autologon_plaintext", "true"), "logon_count": form.get("autologon_logoncount", ""), } # International settings data["intl"] = { "input_locale": form.get("intl_input_locale", ""), "system_locale": form.get("intl_system_locale", ""), "ui_language": form.get("intl_ui_language", ""), "user_locale": form.get("intl_user_locale", ""), } # OOBE TimeZone data["oobe_timezone"] = form.get("oobe_timezone", "") return data # --------------------------------------------------------------------------- # Routes — pages # --------------------------------------------------------------------------- @app.route("/") def dashboard(): images = [image_status(it) for it in IMAGE_TYPES] services = [service_status(s) for s in ("dnsmasq", "apache2", "smbd")] return render_template( "dashboard.html", images=images, services=services, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/images/import", methods=["GET", "POST"]) def images_import(): usb_mounts = find_usb_mounts() upload_sources = find_upload_sources() images = [image_status(it) for it in IMAGE_TYPES] if request.method == "POST": source = request.form.get("source", "") target = request.form.get("target", "") if not source or not target: flash("Please select both a source and a target image type.", "danger") return redirect(url_for("images_import")) if target not in IMAGE_TYPES: flash("Invalid target image type.", "danger") return redirect(url_for("images_import")) if not allowed_import_source(source): flash("Source path is not a valid import location.", "danger") return redirect(url_for("images_import")) if not os.path.isdir(source): flash(f"Source path does not exist: {source}", "danger") return redirect(url_for("images_import")) root = image_root(target) dest = deploy_path(target) try: os.makedirs(dest, exist_ok=True) src_items = os.listdir(source) # Move files from network upload to save disk space; copy from USB use_move = source == UPLOAD_DIR or source.startswith(UPLOAD_DIR + "/") _transfer = shutil.move if use_move else shutil.copy2 _transfer_tree = shutil.move if use_move else shutil.copytree # Detect layout: if source has Deploy/, Sources/, Tools/ at top # level, it's the full image root structure (USB-style). # Otherwise treat it as Deploy/ contents directly. top_dirs = {d for d in src_items if os.path.isdir(os.path.join(source, d))} full_layout = "Deploy" in top_dirs if full_layout: # Determine which root-level dirs are shared for this target shared_root = [] for prefix, dirs in SHARED_ROOT_DIRS.items(): if target.startswith(prefix): shared_root = dirs break # Full image root: import Deploy contents + sibling dirs for item in src_items: src_item = os.path.join(source, item) if item == "Deploy": _import_deploy(src_item, dest, target, move=use_move) elif os.path.isdir(src_item) and item in shared_root: # Shared sibling: merge into _shared/{prefix}{item} # and symlink from image root prefix_key = target.split("-")[0] + "-" shared_dest = os.path.join(SHARED_DIR, f"{prefix_key}{item}") os.makedirs(shared_dest, exist_ok=True) _merge_tree(src_item, shared_dest, move=use_move) dst_item = os.path.join(root, item) if os.path.islink(dst_item): os.remove(dst_item) elif os.path.isdir(dst_item): shutil.rmtree(dst_item) os.symlink(shared_dest, dst_item) elif os.path.isdir(src_item): # Non-shared sibling dirs (Tools) go into image root dst_item = os.path.join(root, item) if os.path.exists(dst_item): shutil.rmtree(dst_item) _transfer_tree(src_item, dst_item) else: _transfer(src_item, os.path.join(root, item)) else: # Flat layout: treat source as Deploy contents _import_deploy(source, dest, target, move=use_move) # Ensure Media.tag exists (FlatSetupLoader.exe drive detection) control_dir = os.path.join(dest, "Control") os.makedirs(control_dir, exist_ok=True) media_tag = os.path.join(control_dir, "Media.tag") Path(media_tag).touch() audit("IMAGE_IMPORT", f"{source} -> {target}") flash( f"Successfully imported content to {FRIENDLY_NAMES.get(target, target)}.", "success", ) except Exception as exc: flash(f"Import failed: {exc}", "danger") return redirect(url_for("images_import")) return render_template( "import.html", usb_mounts=usb_mounts, upload_sources=upload_sources, images=images, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/images//unattend", methods=["GET", "POST"]) def unattend_editor(image_type): if image_type not in IMAGE_TYPES: flash("Unknown image type.", "danger") return redirect(url_for("dashboard")) xml_file = unattend_path(image_type) if request.method == "POST": save_mode = request.form.get("save_mode", "form") if save_mode == "raw": raw_xml = request.form.get("raw_xml", "") # Validate the XML before saving try: etree.fromstring(raw_xml.encode("utf-8")) except etree.XMLSyntaxError as exc: flash(f"Invalid XML: {exc}", "danger") data = parse_unattend(xml_file) data["raw_xml"] = raw_xml return render_template( "unattend_editor.html", image_type=image_type, friendly_name=FRIENDLY_NAMES.get(image_type, image_type), data=data, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) xml_content = raw_xml else: form_data = _extract_form_data(request.form) xml_content = build_unattend_xml(form_data) # Write to disk try: os.makedirs(os.path.dirname(xml_file), exist_ok=True) with open(xml_file, "w", encoding="utf-8") as fh: fh.write(xml_content) audit("UNATTEND_SAVE", f"{image_type} ({save_mode})") flash("unattend.xml saved successfully.", "success") except Exception as exc: flash(f"Failed to save: {exc}", "danger") return redirect(url_for("unattend_editor", image_type=image_type)) data = parse_unattend(xml_file) return render_template( "unattend_editor.html", image_type=image_type, friendly_name=FRIENDLY_NAMES.get(image_type, image_type), data=data, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/images//config") def image_config(image_type): if image_type not in IMAGE_TYPES: flash("Unknown image type.", "danger") return redirect(url_for("dashboard")) config = _load_image_config(image_type) return render_template( "image_config.html", image_type=image_type, friendly_name=FRIENDLY_NAMES.get(image_type, image_type), config=config, ) @app.route("/images//config/save", methods=["POST"]) def image_config_save(image_type): if image_type not in IMAGE_TYPES: flash("Unknown image type.", "danger") return redirect(url_for("dashboard")) section = request.form.get("section", "") payload = request.form.get("payload", "[]") try: data = json.loads(payload) except json.JSONDecodeError: flash("Invalid JSON payload.", "danger") return redirect(url_for("image_config", image_type=image_type)) ctrl = control_path(image_type) tools = tools_path(image_type) try: if section == "hardware_models": us_file = os.path.join(tools, "user_selections.json") us_raw = _load_json(us_file) us_data = us_raw[0] if us_raw and isinstance(us_raw, list) else {} us_data["HardwareModelSelection"] = data _save_json(us_file, [us_data]) audit("CONFIG_SAVE", f"{image_type}/hardware_models") elif section == "drivers": # Strip internal fields before saving clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] _save_json(os.path.join(ctrl, "HardwareDriver.json"), clean) audit("CONFIG_SAVE", f"{image_type}/drivers") elif section == "operating_systems": clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] _save_json(os.path.join(ctrl, "OperatingSystem.json"), clean) audit("CONFIG_SAVE", f"{image_type}/operating_systems") elif section == "packages": clean = [{k: v for k, v in d.items() if not k.startswith("_")} for d in data] _save_json(os.path.join(ctrl, "packages.json"), clean) audit("CONFIG_SAVE", f"{image_type}/packages") else: flash(f"Unknown section: {section}", "danger") return redirect(url_for("image_config", image_type=image_type)) flash(f"Saved {section.replace('_', ' ')} successfully.", "success") except Exception as exc: flash(f"Failed to save {section}: {exc}", "danger") return redirect(url_for("image_config", image_type=image_type)) # --------------------------------------------------------------------------- # Routes — Clonezilla Backups # --------------------------------------------------------------------------- @app.route("/backups") def clonezilla_backups(): backups = [] if os.path.isdir(CLONEZILLA_SHARE): for f in sorted(os.listdir(CLONEZILLA_SHARE)): fpath = os.path.join(CLONEZILLA_SHARE, f) if os.path.isfile(fpath) and f.lower().endswith(".zip"): stat = os.stat(fpath) backups.append({ "filename": f, "machine": os.path.splitext(f)[0], "size": stat.st_size, "modified": stat.st_mtime, }) return render_template( "backups.html", backups=backups, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/backups/upload", methods=["POST"]) def clonezilla_upload(): if "backup_file" not in request.files: flash("No file selected.", "danger") return redirect(url_for("clonezilla_backups")) f = request.files["backup_file"] if not f.filename: flash("No file selected.", "danger") return redirect(url_for("clonezilla_backups")) filename = secure_filename(f.filename) if not filename.lower().endswith(".zip"): flash("Only .zip files are accepted.", "danger") return redirect(url_for("clonezilla_backups")) os.makedirs(CLONEZILLA_SHARE, exist_ok=True) dest = os.path.join(CLONEZILLA_SHARE, filename) f.save(dest) audit("BACKUP_UPLOAD", filename) flash(f"Uploaded {filename} successfully.", "success") return redirect(url_for("clonezilla_backups")) @app.route("/backups/download/") def clonezilla_download(filename): filename = secure_filename(filename) fpath = os.path.join(CLONEZILLA_SHARE, filename) if not os.path.isfile(fpath): flash(f"Backup not found: {filename}", "danger") return redirect(url_for("clonezilla_backups")) return send_file(fpath, as_attachment=True) @app.route("/backups/delete/", methods=["POST"]) def clonezilla_delete(filename): filename = secure_filename(filename) fpath = os.path.join(CLONEZILLA_SHARE, filename) if os.path.isfile(fpath): os.remove(fpath) audit("BACKUP_DELETE", filename) flash(f"Deleted {filename}.", "success") else: flash(f"Backup not found: {filename}", "danger") return redirect(url_for("clonezilla_backups")) # --------------------------------------------------------------------------- # Routes — Blancco Reports # --------------------------------------------------------------------------- @app.route("/reports") def blancco_reports(): reports = [] if os.path.isdir(BLANCCO_REPORTS): for f in sorted(os.listdir(BLANCCO_REPORTS), reverse=True): fpath = os.path.join(BLANCCO_REPORTS, f) if os.path.isfile(fpath): stat = os.stat(fpath) ext = os.path.splitext(f)[1].lower() reports.append({ "filename": f, "size": stat.st_size, "modified": stat.st_mtime, "type": ext.lstrip(".").upper() or "FILE", }) return render_template( "reports.html", reports=reports, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/reports/download/") def blancco_download_report(filename): filename = secure_filename(filename) fpath = os.path.join(BLANCCO_REPORTS, filename) if not os.path.isfile(fpath): flash(f"Report not found: {filename}", "danger") return redirect(url_for("blancco_reports")) return send_file(fpath, as_attachment=True) @app.route("/reports/delete/", methods=["POST"]) def blancco_delete_report(filename): filename = secure_filename(filename) fpath = os.path.join(BLANCCO_REPORTS, filename) if os.path.isfile(fpath): os.remove(fpath) audit("REPORT_DELETE", filename) flash(f"Deleted {filename}.", "success") else: flash(f"Report not found: {filename}", "danger") return redirect(url_for("blancco_reports")) # --------------------------------------------------------------------------- # Routes — Enrollment Packages # --------------------------------------------------------------------------- @app.route("/enrollment") def enrollment(): packages = [] if os.path.isdir(ENROLLMENT_SHARE): for f in sorted(os.listdir(ENROLLMENT_SHARE)): fpath = os.path.join(ENROLLMENT_SHARE, f) if os.path.isfile(fpath) and f.lower().endswith(".ppkg"): stat = os.stat(fpath) packages.append({ "filename": f, "size": stat.st_size, "modified": stat.st_mtime, }) return render_template( "enrollment.html", packages=packages, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/enrollment/upload", methods=["POST"]) def enrollment_upload(): if "ppkg_file" not in request.files: flash("No file selected.", "danger") return redirect(url_for("enrollment")) f = request.files["ppkg_file"] if not f.filename: flash("No file selected.", "danger") return redirect(url_for("enrollment")) filename = secure_filename(f.filename) if not filename.lower().endswith(".ppkg"): flash("Only .ppkg files are accepted.", "danger") return redirect(url_for("enrollment")) os.makedirs(ENROLLMENT_SHARE, exist_ok=True) dest = os.path.join(ENROLLMENT_SHARE, filename) f.save(dest) audit("ENROLLMENT_UPLOAD", filename) flash(f"Uploaded {filename} successfully.", "success") return redirect(url_for("enrollment")) @app.route("/enrollment/download/") def enrollment_download(filename): filename = secure_filename(filename) fpath = os.path.join(ENROLLMENT_SHARE, filename) if not os.path.isfile(fpath): flash(f"Package not found: {filename}", "danger") return redirect(url_for("enrollment")) return send_file(fpath, as_attachment=True) @app.route("/enrollment/delete/", methods=["POST"]) def enrollment_delete(filename): filename = secure_filename(filename) fpath = os.path.join(ENROLLMENT_SHARE, filename) if os.path.isfile(fpath): os.remove(fpath) audit("ENROLLMENT_DELETE", filename) flash(f"Deleted {filename}.", "success") else: flash(f"Package not found: {filename}", "danger") return redirect(url_for("enrollment")) # --------------------------------------------------------------------------- # Routes — startnet.cmd Editor (WIM) # --------------------------------------------------------------------------- def _wim_extract_startnet(wim_path): """Extract startnet.cmd from a WIM file using wimextract.""" tmpdir = tempfile.mkdtemp() try: result = subprocess.run( ["wimextract", wim_path, "1", "/Windows/System32/startnet.cmd", "--dest-dir", tmpdir], capture_output=True, text=True, timeout=30, ) startnet_path = os.path.join(tmpdir, "startnet.cmd") if result.returncode == 0 and os.path.isfile(startnet_path): with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh: return fh.read() return None except Exception: return None finally: shutil.rmtree(tmpdir, ignore_errors=True) def _wim_update_startnet(wim_path, content): """Update startnet.cmd inside a WIM file using wimupdate.""" tmpdir = tempfile.mkdtemp() try: startnet_path = os.path.join(tmpdir, "startnet.cmd") with open(startnet_path, "w", encoding="utf-8", newline="\r\n") as fh: fh.write(content) # wimupdate reads commands from stdin update_cmd = f"add {startnet_path} /Windows/System32/startnet.cmd\n" result = subprocess.run( ["wimupdate", wim_path, "1"], input=update_cmd, capture_output=True, text=True, timeout=60, ) if result.returncode != 0: return False, result.stderr.strip() return True, "" except Exception as exc: return False, str(exc) finally: shutil.rmtree(tmpdir, ignore_errors=True) def _wim_list_files(wim_path, path="/"): """List files inside a WIM at the given path.""" try: result = subprocess.run( ["wimdir", wim_path, "1", path], capture_output=True, text=True, timeout=30, ) if result.returncode == 0: return [l.strip() for l in result.stdout.splitlines() if l.strip()] return [] except Exception: return [] @app.route("/startnet") def startnet_editor(): wim_exists = os.path.isfile(BOOT_WIM) content = "" wim_info = {} if wim_exists: content = _wim_extract_startnet(BOOT_WIM) or "" # Get WIM info try: result = subprocess.run( ["wiminfo", BOOT_WIM], capture_output=True, text=True, timeout=15, ) if result.returncode == 0: for line in result.stdout.splitlines(): if ":" in line: key, _, val = line.partition(":") wim_info[key.strip()] = val.strip() except Exception: pass return render_template( "startnet_editor.html", wim_exists=wim_exists, wim_path=BOOT_WIM, content=content, wim_info=wim_info, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) @app.route("/startnet/save", methods=["POST"]) def startnet_save(): if not os.path.isfile(BOOT_WIM): flash("boot.wim not found.", "danger") return redirect(url_for("startnet_editor")) content = request.form.get("content", "") ok, err = _wim_update_startnet(BOOT_WIM, content) if ok: audit("STARTNET_SAVE", "boot.wim updated") flash("startnet.cmd updated successfully in boot.wim.", "success") else: flash(f"Failed to update boot.wim: {err}", "danger") return redirect(url_for("startnet_editor")) # --------------------------------------------------------------------------- # Routes — Audit Log # --------------------------------------------------------------------------- @app.route("/audit") def audit_log(): entries = [] if os.path.isfile(AUDIT_LOG): with open(AUDIT_LOG, "r") as fh: for line in fh: entries.append(line.strip()) entries.reverse() # newest first return render_template( "audit.html", entries=entries, image_types=IMAGE_TYPES, friendly_names=FRIENDLY_NAMES, ) # --------------------------------------------------------------------------- # Routes — API # --------------------------------------------------------------------------- @app.route("/api/services") def api_services(): services = {s: service_status(s) for s in ("dnsmasq", "apache2", "smbd")} return jsonify(services) @app.route("/api/images") def api_images(): images = [image_status(it) for it in IMAGE_TYPES] return jsonify(images) @app.route("/api/images//unattend", methods=["POST"]) def api_save_unattend(image_type): if image_type not in IMAGE_TYPES: return jsonify({"error": "Unknown image type"}), 404 xml_file = unattend_path(image_type) payload = request.get_json(silent=True) if not payload: return jsonify({"error": "No JSON body provided"}), 400 if "raw_xml" in payload: raw_xml = payload["raw_xml"] try: etree.fromstring(raw_xml.encode("utf-8")) except etree.XMLSyntaxError as exc: return jsonify({"error": f"Invalid XML: {exc}"}), 400 xml_content = raw_xml else: try: xml_content = build_unattend_xml(payload) except Exception as exc: return jsonify({"error": f"Failed to build XML: {exc}"}), 400 try: os.makedirs(os.path.dirname(xml_file), exist_ok=True) with open(xml_file, "w", encoding="utf-8") as fh: fh.write(xml_content) except Exception as exc: return jsonify({"error": f"Failed to write file: {exc}"}), 500 audit("UNATTEND_SAVE_API", image_type) return jsonify({"status": "ok", "path": xml_file}) # --------------------------------------------------------------------------- # Template filters # --------------------------------------------------------------------------- @app.template_filter("timestamp_fmt") def timestamp_fmt(ts): """Format a Unix timestamp to a human-readable date string.""" return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") # --------------------------------------------------------------------------- # Context processor — make data available to all templates # --------------------------------------------------------------------------- @app.context_processor def inject_globals(): return { "all_image_types": IMAGE_TYPES, "all_friendly_names": FRIENDLY_NAMES, } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": app.run(host="127.0.0.1", port=9010, debug=False, threaded=True)