"""Shared Alembic env.py logic for bundled plugins. Each bundled plugin (computers, equipment, network, notifications, printers, usb) has a `migrations/env.py` that does the minimum: import os os.environ['PLUGIN_NAME'] = 'computers' from shopdb.plugins.alembic_template import run_migrations run_migrations() This module wires the plugin's models into a MetaData object filtered to only the tables that belong to that plugin, then runs Alembic in either offline or online mode against the Flask app's configured engine. Plugin tables must be importable via `plugins..models`. Plugins register their `__tablename__` set in PLUGIN_TABLE_OWNERS below so the filter is explicit (avoids depending on import-side-effect global state). """ from __future__ import annotations import importlib import logging import os from typing import Iterable from alembic import context from sqlalchemy import MetaData, pool logger = logging.getLogger('alembic.env.plugin') # Explicit table-ownership map. Adding tables to a plugin requires updating # this dict so the per-plugin migration knows which tables to include. PLUGIN_TABLE_OWNERS: dict[str, Iterable[str]] = { 'computers': ('computertypes', 'computers', 'computerinstalledapps'), 'equipment': ('equipmenttypes', 'equipment'), 'network': ('networkdevicetypes', 'networkdevices', 'vlans', 'subnets'), 'notifications': ('notificationtypes', 'notifications'), 'printers': ('printertypes', 'printers', 'printerdata'), 'usb': ('usbdevicetypes', 'usbdevices', 'usbcheckouts'), } def _get_plugin_metadata(plugin_name: str) -> MetaData: """Import the plugin's models and return a MetaData containing only its declared tables (filtered via PLUGIN_TABLE_OWNERS).""" owned = set(PLUGIN_TABLE_OWNERS.get(plugin_name, ())) if not owned: raise RuntimeError( f"PLUGIN_TABLE_OWNERS has no entry for plugin '{plugin_name}'. " f"Update shopdb/plugins/alembic_template.py." ) # Importing models attaches them to the global db.metadata. importlib.import_module(f'plugins.{plugin_name}.models') from shopdb.extensions import db full = db.metadata plugin_md = MetaData() for table in list(full.tables.values()): if table.name in owned: table.to_metadata(plugin_md) return plugin_md def create_plugin_tables(plugin_name: str): """Emit CreateTable DDL for every table this plugin owns. Idempotent via Alembic's batch_op.create_table behavior (raises if exists; the baseline migration is meant to run against an empty schema). Called from each plugin's 0001_baseline.py upgrade() so the table definitions stay sourced from the SQLAlchemy models rather than being duplicated in handwritten Alembic ops. """ from alembic import op from sqlalchemy.schema import CreateTable md = _get_plugin_metadata(plugin_name) bind = op.get_bind() # Sort by FK dependency so parent tables are created first. for table in md.sorted_tables: op.execute(str(CreateTable(table).compile(dialect=bind.dialect))) def drop_plugin_tables(plugin_name: str): """Mirror of create_plugin_tables for downgrade(). Drops in reverse FK order.""" from alembic import op md = _get_plugin_metadata(plugin_name) for table in reversed(md.sorted_tables): op.execute(f'DROP TABLE IF EXISTS "{table.name}"') def run_migrations(): """Entry point called by each plugin's migrations/env.py.""" plugin_name = os.environ.get('PLUGIN_NAME') if not plugin_name: raise RuntimeError("PLUGIN_NAME env var must be set before run_migrations()") config = context.config target_metadata = _get_plugin_metadata(plugin_name) # Per-plugin version table so each plugin's chain is independent of core # Alembic's alembic_version table. version_table = f'alembic_version_{plugin_name}' db_url = config.get_main_option('sqlalchemy.url') if not db_url: # Pull from the Flask app config if running inside an app context # (e.g. via flask plugin migrate ). try: from flask import current_app db_url = current_app.config['SQLALCHEMY_DATABASE_URI'] config.set_main_option('sqlalchemy.url', db_url.replace('%', '%%')) except Exception as ex: raise RuntimeError( "sqlalchemy.url not set and no Flask app context available. " f"Original error: {ex}" ) if context.is_offline_mode(): context.configure( url=db_url, target_metadata=target_metadata, literal_binds=True, version_table=version_table, include_schemas=False, ) with context.begin_transaction(): context.run_migrations() else: from sqlalchemy import engine_from_config connectable = engine_from_config( config.get_section(config.config_ini_section, {}), prefix='sqlalchemy.', poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata, version_table=version_table, include_schemas=False, ) with context.begin_transaction(): context.run_migrations()