"""Plugin discovery and loading. The loader reads each plugin's manifest.json before instantiating the plugin class. Dependency sorting and contract-version compatibility checks operate on manifests, not plugin instances. The plugin class is imported and instantiated only after the manifest passes validation. Failure policy (per the enforcing-plugin-contract skill): - In dev/test (app.config.DEBUG or TESTING true): re-raise the original exception so failures are loud. - In production: log the failure with full context and exclude the plugin from registration. The framework keeps booting. """ import importlib import importlib.util import json import logging from pathlib import Path from typing import Dict, List, Type, Optional from flask import Flask from packaging.specifiers import SpecifierSet, InvalidSpecifier from packaging.version import Version, InvalidVersion from .base import BasePlugin from .registry import PluginRegistry from ..exceptions import ( PluginError, PluginNotFoundError, PluginContractError, PluginVersionError, PluginDependencyError, ) logger = logging.getLogger(__name__) class PluginLoader: """Discovers and loads plugins from the plugins directory.""" def __init__(self, plugins_dir: Path, registry: PluginRegistry): self.plugins_dir = plugins_dir self.registry = registry self._loaded_plugins: Dict[str, BasePlugin] = {} self._plugin_classes: Dict[str, Type[BasePlugin]] = {} self._manifests: Dict[str, dict] = {} def _is_strict_mode(self, app: Optional[Flask]) -> bool: """Return True if loader should re-raise instead of isolating failures.""" if app is None: return True return bool(app.config.get('DEBUG') or app.config.get('TESTING')) def _handle_failure(self, app: Optional[Flask], exc: Exception, name: str) -> None: """In strict mode re-raise; in production log with context and continue.""" if self._is_strict_mode(app): raise exc logger.exception(f'Plugin {name} failed to load: {exc}') def discover_plugins(self) -> List[str]: """Discover available plugins in plugins directory.""" available = [] if not self.plugins_dir.exists(): return available for item in self.plugins_dir.iterdir(): if not item.is_dir(): continue if (item / 'plugin.py').exists(): available.append(item.name) return available def load_manifest(self, name: str) -> dict: """Load the plugin's manifest.json. Raises PluginNotFoundError if the manifest does not exist or is unparseable. The result is cached. """ if name in self._manifests: return self._manifests[name] manifest_path = self.plugins_dir / name / 'manifest.json' if not manifest_path.exists(): raise PluginNotFoundError( f'Plugin {name} has no manifest.json at {manifest_path}', plugin_name=name, ) try: with open(manifest_path) as f: manifest = json.load(f) except json.JSONDecodeError as e: raise PluginContractError( f'Plugin {name} has invalid manifest.json: {e}', plugin_name=name, ) from e for required in ('name', 'version', 'description'): if not manifest.get(required): raise PluginContractError( f'Plugin {name} manifest.json missing required field "{required}"', plugin_name=name, ) if manifest['name'] != name: raise PluginContractError( f'Plugin directory "{name}" does not match manifest name "{manifest["name"]}"', plugin_name=name, ) self._manifests[name] = manifest return manifest def check_contract_version(self, name: str, contract_version: str) -> None: """Verify the plugin's core_version range includes contract_version. Raises PluginVersionError on mismatch. """ manifest = self.load_manifest(name) core_version_spec = manifest.get('core_version', '') if not core_version_spec: return try: specifier = SpecifierSet(core_version_spec) framework_version = Version(contract_version) except (InvalidSpecifier, InvalidVersion) as e: raise PluginContractError( f'Plugin {name} has invalid version specifier "{core_version_spec}": {e}', plugin_name=name, ) from e if framework_version not in specifier: raise PluginVersionError( f'Plugin {name} requires core_version {core_version_spec} ' f'but framework is at {contract_version}', plugin_name=name, ) def load_plugin_class(self, name: str) -> Type[BasePlugin]: """Import the plugin's plugin.py and return the BasePlugin subclass. Raises PluginNotFoundError if plugin.py is missing. Raises PluginContractError if the module has no BasePlugin subclass or import fails. """ if name in self._plugin_classes: return self._plugin_classes[name] plugin_module_path = self.plugins_dir / name / 'plugin.py' if not plugin_module_path.exists(): raise PluginNotFoundError( f'Plugin {name} plugin.py not found at {plugin_module_path}', plugin_name=name, ) try: spec = importlib.util.spec_from_file_location( f'plugins.{name}.plugin', plugin_module_path, ) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) except Exception as e: raise PluginContractError( f'Plugin {name} import failed: {e}', plugin_name=name, ) from e for attr_name in dir(module): attr = getattr(module, attr_name) if (isinstance(attr, type) and issubclass(attr, BasePlugin) and attr is not BasePlugin): self._plugin_classes[name] = attr return attr raise PluginContractError( f'Plugin {name} module has no BasePlugin subclass', plugin_name=name, ) def load_plugin(self, name: str, app: Flask, db) -> Optional[BasePlugin]: """Load and instantiate a plugin. Returns the plugin instance on success. In strict mode (dev/test), any failure raises. In production, returns None and logs the failure with full context. """ if name in self._loaded_plugins: return self._loaded_plugins[name] try: from shopdb import __contract_version__ manifest = self.load_manifest(name) self.check_contract_version(name, __contract_version__) for dep in manifest.get('dependencies', []): if not self.registry.is_enabled(dep): raise PluginDependencyError( f'Plugin {name} requires {dep} which is not enabled', plugin_name=name, ) plugin_class = self.load_plugin_class(name) plugin = plugin_class() plugin.init_app(app, db) self._loaded_plugins[name] = plugin return plugin except PluginError as e: self._handle_failure(app, e, name) return None except Exception as e: wrapped = PluginError( f'Unexpected failure loading plugin {name}: {e}', plugin_name=name, ) self._handle_failure(app, wrapped, name) return None def load_enabled_plugins(self, app: Flask, db) -> Dict[str, BasePlugin]: """Load all enabled plugins in dependency order.""" loaded = {} enabled = self.registry.get_enabled_plugins() sorted_plugins = self._sort_by_dependencies(enabled) for name in sorted_plugins: plugin = self.load_plugin(name, app, db) if plugin: loaded[name] = plugin logger.info(f'Loaded plugin: {name} v{plugin.meta.version}') return loaded def _sort_by_dependencies(self, plugin_names: List[str]) -> List[str]: """Sort plugins so dependencies come first. Reads dependencies from manifest.json directly; does not instantiate plugin classes during sort. """ sorted_list = [] visited = set() def visit(name): if name in visited: return visited.add(name) try: manifest = self.load_manifest(name) for dep in manifest.get('dependencies', []): if dep in plugin_names: visit(dep) except PluginError: pass sorted_list.append(name) for name in plugin_names: visit(name) return sorted_list def get_loaded_plugin(self, name: str) -> Optional[BasePlugin]: """Get an already loaded plugin.""" return self._loaded_plugins.get(name) def get_all_loaded(self) -> Dict[str, BasePlugin]: """Get all loaded plugins.""" return self._loaded_plugins.copy()