Fix review findings: offline assets, security, audit logging
- Bundle Bootstrap CSS/JS/icons locally for air-gapped operation - Add path traversal validation on image import source - Disable Flask debug mode in production - Fix file handle leaks, remove unused import - Add python3-pip, python3-venv, p7zip-full to offline packages - Add pip wheel download/bundling for offline Flask install - Change UFW default policy from allow to deny - Fix wrong path displayed in unattend editor template - Dynamic sidebar image lists from all_image_types - Add audit logging for all write operations - Audit log viewer page with activity history Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,10 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Flask web application for managing a GE Aerospace PXE server."""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from flask import (
|
||||
@@ -24,6 +26,22 @@ app = Flask(__name__)
|
||||
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "pxe-manager-dev-key-change-in-prod")
|
||||
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 * 1024 # 16 GB max upload
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Audit logging
|
||||
# ---------------------------------------------------------------------------
|
||||
AUDIT_LOG = os.environ.get("AUDIT_LOG", "/var/log/pxe-webapp-audit.log")
|
||||
audit_logger = logging.getLogger("pxe_audit")
|
||||
audit_logger.setLevel(logging.INFO)
|
||||
_audit_handler = logging.FileHandler(AUDIT_LOG, mode="a")
|
||||
_audit_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
|
||||
audit_logger.addHandler(_audit_handler)
|
||||
|
||||
|
||||
def audit(action, detail=""):
|
||||
"""Write an entry to the audit log."""
|
||||
ip = request.remote_addr if request else "system"
|
||||
audit_logger.info(f"[{ip}] {action}: {detail}")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -190,7 +208,8 @@ def parse_unattend(xml_path):
|
||||
data["raw_xml"] = UNATTEND_TEMPLATE
|
||||
return data
|
||||
|
||||
raw = open(xml_path, "r", encoding="utf-8").read()
|
||||
with open(xml_path, "r", encoding="utf-8") as fh:
|
||||
raw = fh.read()
|
||||
data["raw_xml"] = raw
|
||||
|
||||
try:
|
||||
@@ -480,6 +499,12 @@ def images_import():
|
||||
flash("Invalid target image type.", "danger")
|
||||
return redirect(url_for("images_import"))
|
||||
|
||||
# Validate source is under an allowed USB mount path
|
||||
allowed = find_usb_mounts()
|
||||
if not any(source == m or source.startswith(m + "/") for m in allowed):
|
||||
flash("Source path is not on a mounted USB device.", "danger")
|
||||
return redirect(url_for("images_import"))
|
||||
|
||||
if not os.path.isdir(source):
|
||||
flash(f"Source path does not exist: {source}", "danger")
|
||||
return redirect(url_for("images_import"))
|
||||
@@ -497,6 +522,7 @@ def images_import():
|
||||
shutil.copytree(src_item, dst_item)
|
||||
else:
|
||||
shutil.copy2(src_item, dst_item)
|
||||
audit("IMAGE_IMPORT", f"{source} -> {target}")
|
||||
flash(
|
||||
f"Successfully imported content from {source} to {FRIENDLY_NAMES.get(target, target)}.",
|
||||
"success",
|
||||
@@ -553,6 +579,7 @@ def unattend_editor(image_type):
|
||||
os.makedirs(os.path.dirname(xml_file), exist_ok=True)
|
||||
with open(xml_file, "w", encoding="utf-8") as fh:
|
||||
fh.write(xml_content)
|
||||
audit("UNATTEND_SAVE", f"{image_type} ({save_mode})")
|
||||
flash("unattend.xml saved successfully.", "success")
|
||||
except Exception as exc:
|
||||
flash(f"Failed to save: {exc}", "danger")
|
||||
@@ -615,6 +642,7 @@ def clonezilla_upload():
|
||||
os.makedirs(CLONEZILLA_SHARE, exist_ok=True)
|
||||
dest = os.path.join(CLONEZILLA_SHARE, filename)
|
||||
f.save(dest)
|
||||
audit("BACKUP_UPLOAD", filename)
|
||||
flash(f"Uploaded {filename} successfully.", "success")
|
||||
return redirect(url_for("clonezilla_backups"))
|
||||
|
||||
@@ -635,6 +663,7 @@ def clonezilla_delete(filename):
|
||||
fpath = os.path.join(CLONEZILLA_SHARE, filename)
|
||||
if os.path.isfile(fpath):
|
||||
os.remove(fpath)
|
||||
audit("BACKUP_DELETE", filename)
|
||||
flash(f"Deleted {filename}.", "success")
|
||||
else:
|
||||
flash(f"Backup not found: {filename}", "danger")
|
||||
@@ -684,6 +713,7 @@ def blancco_delete_report(filename):
|
||||
fpath = os.path.join(BLANCCO_REPORTS, filename)
|
||||
if os.path.isfile(fpath):
|
||||
os.remove(fpath)
|
||||
audit("REPORT_DELETE", filename)
|
||||
flash(f"Deleted {filename}.", "success")
|
||||
else:
|
||||
flash(f"Report not found: {filename}", "danger")
|
||||
@@ -696,7 +726,6 @@ def blancco_delete_report(filename):
|
||||
|
||||
def _wim_extract_startnet(wim_path):
|
||||
"""Extract startnet.cmd from a WIM file using wimextract."""
|
||||
import tempfile
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
try:
|
||||
result = subprocess.run(
|
||||
@@ -707,8 +736,8 @@ def _wim_extract_startnet(wim_path):
|
||||
)
|
||||
startnet_path = os.path.join(tmpdir, "startnet.cmd")
|
||||
if result.returncode == 0 and os.path.isfile(startnet_path):
|
||||
content = open(startnet_path, "r", encoding="utf-8", errors="replace").read()
|
||||
return content
|
||||
with open(startnet_path, "r", encoding="utf-8", errors="replace") as fh:
|
||||
return fh.read()
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
@@ -718,7 +747,6 @@ def _wim_extract_startnet(wim_path):
|
||||
|
||||
def _wim_update_startnet(wim_path, content):
|
||||
"""Update startnet.cmd inside a WIM file using wimupdate."""
|
||||
import tempfile
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
try:
|
||||
startnet_path = os.path.join(tmpdir, "startnet.cmd")
|
||||
@@ -796,12 +824,33 @@ def startnet_save():
|
||||
content = request.form.get("content", "")
|
||||
ok, err = _wim_update_startnet(BOOT_WIM, content)
|
||||
if ok:
|
||||
audit("STARTNET_SAVE", "boot.wim updated")
|
||||
flash("startnet.cmd updated successfully in boot.wim.", "success")
|
||||
else:
|
||||
flash(f"Failed to update boot.wim: {err}", "danger")
|
||||
return redirect(url_for("startnet_editor"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routes — Audit Log
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@app.route("/audit")
|
||||
def audit_log():
|
||||
entries = []
|
||||
if os.path.isfile(AUDIT_LOG):
|
||||
with open(AUDIT_LOG, "r") as fh:
|
||||
for line in fh:
|
||||
entries.append(line.strip())
|
||||
entries.reverse() # newest first
|
||||
return render_template(
|
||||
"audit.html",
|
||||
entries=entries,
|
||||
image_types=IMAGE_TYPES,
|
||||
friendly_names=FRIENDLY_NAMES,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routes — API
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -849,6 +898,7 @@ def api_save_unattend(image_type):
|
||||
except Exception as exc:
|
||||
return jsonify({"error": f"Failed to write file: {exc}"}), 500
|
||||
|
||||
audit("UNATTEND_SAVE_API", image_type)
|
||||
return jsonify({"status": "ok", "path": xml_file})
|
||||
|
||||
|
||||
@@ -859,7 +909,6 @@ def api_save_unattend(image_type):
|
||||
@app.template_filter("timestamp_fmt")
|
||||
def timestamp_fmt(ts):
|
||||
"""Format a Unix timestamp to a human-readable date string."""
|
||||
from datetime import datetime
|
||||
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
|
||||
|
||||
|
||||
@@ -880,4 +929,4 @@ def inject_globals():
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=5000, debug=True)
|
||||
app.run(host="0.0.0.0", port=5000, debug=False)
|
||||
|
||||
Reference in New Issue
Block a user