Flask backend with Vue 3 frontend for shop floor machine management. Includes database schema export for MySQL shopdb_flask database. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
174 lines
5.3 KiB
Python
174 lines
5.3 KiB
Python
"""Plugin migration management using Alembic."""
|
|
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PluginMigrationManager:
|
|
"""
|
|
Manages database migrations for plugins.
|
|
Each plugin has its own migrations directory.
|
|
"""
|
|
|
|
def __init__(self, plugins_dir: Path, database_url: str):
|
|
self.plugins_dir = plugins_dir
|
|
self.database_url = database_url
|
|
|
|
def get_migrations_dir(self, plugin_name: str) -> Optional[Path]:
|
|
"""Get migrations directory for a plugin."""
|
|
migrations_dir = self.plugins_dir / plugin_name / 'migrations'
|
|
if migrations_dir.exists():
|
|
return migrations_dir
|
|
return None
|
|
|
|
def run_plugin_migrations(
|
|
self,
|
|
plugin_name: str,
|
|
revision: str = 'head'
|
|
) -> bool:
|
|
"""
|
|
Run migrations for a plugin.
|
|
|
|
Uses flask db upgrade with the plugin's migrations directory.
|
|
"""
|
|
migrations_dir = self.get_migrations_dir(plugin_name)
|
|
|
|
if not migrations_dir:
|
|
logger.info(f"No migrations directory for plugin {plugin_name}")
|
|
return True # No migrations to run
|
|
|
|
try:
|
|
# Use alembic directly with plugin's migrations
|
|
from alembic.config import Config
|
|
from alembic import command
|
|
|
|
config = Config()
|
|
config.set_main_option('script_location', str(migrations_dir))
|
|
config.set_main_option('sqlalchemy.url', self.database_url)
|
|
|
|
# Use plugin-specific version table
|
|
config.set_main_option(
|
|
'version_table',
|
|
f'alembic_version_{plugin_name}'
|
|
)
|
|
|
|
command.upgrade(config, revision)
|
|
logger.info(f"Migrations completed for {plugin_name}")
|
|
return True
|
|
|
|
except ImportError:
|
|
# Fallback to subprocess if alembic not available in context
|
|
logger.warning("Using subprocess for migrations")
|
|
return self._run_migrations_subprocess(plugin_name, revision)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration failed for {plugin_name}: {e}")
|
|
return False
|
|
|
|
def _run_migrations_subprocess(
|
|
self,
|
|
plugin_name: str,
|
|
revision: str = 'head'
|
|
) -> bool:
|
|
"""Run migrations via subprocess as fallback."""
|
|
migrations_dir = self.get_migrations_dir(plugin_name)
|
|
if not migrations_dir:
|
|
return True
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
sys.executable, '-m', 'alembic',
|
|
'-c', str(migrations_dir / 'alembic.ini'),
|
|
'upgrade', revision
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
env={
|
|
**dict(__import__('os').environ),
|
|
'DATABASE_URL': self.database_url
|
|
}
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.error(f"Migration error: {result.stderr}")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration subprocess failed: {e}")
|
|
return False
|
|
|
|
def downgrade_plugin(
|
|
self,
|
|
plugin_name: str,
|
|
revision: str = 'base'
|
|
) -> bool:
|
|
"""
|
|
Downgrade/rollback plugin migrations.
|
|
"""
|
|
migrations_dir = self.get_migrations_dir(plugin_name)
|
|
|
|
if not migrations_dir:
|
|
return True
|
|
|
|
try:
|
|
from alembic.config import Config
|
|
from alembic import command
|
|
|
|
config = Config()
|
|
config.set_main_option('script_location', str(migrations_dir))
|
|
config.set_main_option('sqlalchemy.url', self.database_url)
|
|
config.set_main_option(
|
|
'version_table',
|
|
f'alembic_version_{plugin_name}'
|
|
)
|
|
|
|
command.downgrade(config, revision)
|
|
logger.info(f"Downgrade completed for {plugin_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Downgrade failed for {plugin_name}: {e}")
|
|
return False
|
|
|
|
def get_current_revision(self, plugin_name: str) -> Optional[str]:
|
|
"""Get current migration revision for a plugin."""
|
|
migrations_dir = self.get_migrations_dir(plugin_name)
|
|
if not migrations_dir:
|
|
return None
|
|
|
|
try:
|
|
from alembic.config import Config
|
|
from alembic.script import ScriptDirectory
|
|
|
|
config = Config()
|
|
config.set_main_option('script_location', str(migrations_dir))
|
|
|
|
script = ScriptDirectory.from_config(config)
|
|
return script.get_current_head()
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
def has_pending_migrations(self, plugin_name: str) -> bool:
|
|
"""Check if plugin has pending migrations."""
|
|
# Simplified check - would need DB connection for full check
|
|
migrations_dir = self.get_migrations_dir(plugin_name)
|
|
if not migrations_dir:
|
|
return False
|
|
|
|
versions_dir = migrations_dir / 'versions'
|
|
if not versions_dir.exists():
|
|
return False
|
|
|
|
# Check if there are any migration files
|
|
migration_files = list(versions_dir.glob('*.py'))
|
|
return len(migration_files) > 0
|