Source code for mario.plug

import importlib
import importlib.resources
import importlib.util
import inspect
import sys
import types
import typing as t
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List

import attr
import importlib_metadata
import toml

from mario import config
from mario import declarative


[docs]@attr.dataclass class PluginObject: name: str traversal_function: types.FunctionType required_parameters: List[str] calculate_more_params: types.FunctionType = attr.ib(default=lambda x: {})
[docs]@attr.dataclass class CommandStage: name: str options: List[str] arguments: List[str] remap_params: Dict
[docs]@attr.dataclass class CommandCommand: name: str components: List[CommandStage] short_help: str options: t.Dict = attr.ib(factory=dict) arguments: t.Dict = attr.ib(factory=dict)
@attr.s(repr=False) class _NoDefaultType: def __repr__(self): return "NO_DEFAULT" NO_DEFAULT = _NoDefaultType()
[docs]@attr.dataclass class GlobalOption: name: str type: t.Type default: _NoDefaultType
[docs]@attr.s class Registry: traversals: Dict[str, PluginObject] = attr.ib(factory=dict) global_options: Dict[str, GlobalOption] = attr.ib(factory=dict) cli_functions: Dict[str, Any] = attr.ib(factory=dict) commands: Dict[str, CommandCommand] = attr.ib(factory=dict)
[docs] def add_traversal(self, name=None, calculate_more_params=lambda x: {}): def wrap(function): if name is None: registered_name = function.__name__ else: registered_name = name params = list(inspect.signature(function).parameters.keys()) # pylint: disable=unsupported-assignment-operation self.traversals[ registered_name ] = PluginObject( # pylint: disable=unsupported-assignment-operation registered_name, function, params, calculate_more_params ) return function return wrap
[docs] def add_cli(self, name=None): def wrap(function): if name is None: registered_name = getattr(function, "__name__", None), function.name else: registered_name = name # pylint: disable=unsupported-assignment-operation self.cli_functions[registered_name] = function return function return wrap
[docs]def plugin_module_paths() -> List[str]: return [ entry_point.value + "." + entry_point.name for entry_point in importlib_metadata.entry_points()["mario_plugins"] ]
[docs]def collect_modules(import_paths: Iterable[str]) -> List[types.ModuleType]: modules = [] for path in import_paths: modules.append(importlib.import_module(path)) return modules
[docs]def combine_registries(registries): global_options = {} traversals = {} cli_functions = {} commands = {} for registry in registries: traversals.update(registry.traversals) global_options.update(registry.global_options) cli_functions.update(registry.cli_functions) commands.update(registry.commands) return Registry(traversals, global_options, cli_functions, commands)
[docs]def make_plugin_registry(): plugin_modules = collect_modules(plugin_module_paths()) plugin_registries = [module.registry for module in plugin_modules] return combine_registries(plugin_registries)
[docs]def make_config_registry(): sys.path.append(str(config.get_config_dir())) try: # pylint: disable=import-error # pylint: disable=import-outside-toplevel import m except ModuleNotFoundError: return Registry() return getattr(m, "registry", None) or Registry()
[docs]def make_global_registry(): return combine_registries( [ make_plugin_registry(), make_config_registry(), make_config_commands_registry(), make_plugin_commands_registry(), ] )
[docs]def make_commands(conf): commands = declarative.CommandSpecSchema(many=True).load(conf.get("command", [])) return commands
[docs]def make_config_commands_registry(): conf = config.load_config() commands = make_commands(conf) return Registry(commands={c.name: c for c in commands})
[docs]def make_plugin_commands_registry(package="mario.plugins"): plugin_tomls = [ filename for filename in importlib.resources.contents(package) if filename.endswith(".toml") ] confs = [ toml.loads(importlib.resources.read_text(package, filename)) for filename in plugin_tomls ] conf_command_groups = [make_commands(conf) for conf in confs] registries = [ Registry(commands={c.name: c for c in commands}) for commands in conf_command_groups ] return combine_registries(registries)