cjm-plugin-system
Install
pip install cjm_plugin_system
Project Structure
nbs/
├── core/ (9)
│ ├── config.ipynb # Project-level configuration for paths, runtime settings, and environment management
│ ├── interface.ipynb # Abstract base class defining the generic plugin interface
│ ├── manager.ipynb # Plugin discovery, loading, and lifecycle management system
│ ├── metadata.ipynb # Data structures for plugin metadata
│ ├── platform.ipynb # Cross-platform utilities for process management, path handling, and system detection
│ ├── proxy.ipynb # Bridge between Host application and isolated Worker processes
│ ├── queue.ipynb # Resource-aware job queue for sequential plugin execution with cancellation support
│ ├── scheduling.ipynb # Resource scheduling policies for plugin execution
│ └── worker.ipynb # FastAPI server that runs inside isolated plugin environments
├── utils/ (2)
│ ├── hashing.ipynb # Shared cryptographic hashing primitives for content integrity verification
│ └── validation.ipynb # Validation helpers for plugin configuration dataclasses
└── cli.ipynb # CLI tool for declarative plugin management
Total: 12 notebooks across 2 directories
Module Dependencies
graph LR
cli[cli<br/>cli]
core_config[core.config<br/>Configuration]
core_interface[core.interface<br/>Plugin Interface]
core_manager[core.manager<br/>Plugin Manager]
core_metadata[core.metadata<br/>Plugin Metadata]
core_platform[core.platform<br/>Platform Utilities]
core_proxy[core.proxy<br/>Remote Plugin Proxy]
core_queue[core.queue<br/>Job Queue]
core_scheduling[core.scheduling<br/>Scheduling]
core_worker[core.worker<br/>Universal Worker]
utils_hashing[utils.hashing<br/>Content Hashing Utilities]
utils_validation[utils.validation<br/>Configuration Validation]
cli --> core_platform
cli --> core_config
core_manager --> core_interface
core_manager --> core_scheduling
core_manager --> core_metadata
core_manager --> core_proxy
core_manager --> core_config
core_platform --> core_config
core_proxy --> core_platform
core_proxy --> core_interface
core_proxy --> core_config
core_queue --> core_manager
core_scheduling --> core_metadata
core_worker --> core_platform
14 cross-module dependencies detected
CLI Reference
cjm-ctl Command
Usage: cjm-ctl [OPTIONS] COMMAND [ARGS]...
CJM Plugin System CLI
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────╮
│ --cjm-config PATH Path to cjm.yaml configuration file │
│ --data-dir PATH Override data directory (manifests, logs) │
│ --conda-prefix PATH Override conda/mamba prefix path │
│ --conda-type TEXT Conda implementation: micromamba, miniforge, or conda │
│ --install-completion Install completion for the current shell. │
│ --show-completion Show completion for the current shell, to copy it or customize the │
│ installation. │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ─────────────────────────────────────────────────────────────────────────────────────────────╮
│ setup-runtime Download and setup micromamba runtime for project-local mode. │
│ install-all Install and register all plugins defined in plugins.yaml. │
│ setup-host Install interface libraries in the current Python environment. │
│ estimate-size Estimate disk space required for plugin environments. │
│ list List installed plugins from manifest directory. │
│ remove Remove a plugin's manifest and conda environment. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────╯
For detailed help on any command, use cjm-ctl <command> --help.
Module Overview
Detailed documentation for each module in the project:
cli (cli.ipynb)
CLI tool for declarative plugin management
Import
from cjm_plugin_system.cli import (
app,
main,
setup_runtime,
run_cmd,
install_all,
setup_host,
estimate_size,
list_plugins,
remove_plugin
)
Functions
def main(
ctx:typer.Context,
cjm_config:Annotated[Optional[Path], typer.Option(
"--cjm-config",
help="Path to cjm.yaml configuration file"
)]=None,
data_dir:Annotated[Optional[Path], typer.Option(
"--data-dir",
help="Override data directory (manifests, logs)"
)]=None,
conda_prefix:Annotated[Optional[Path], typer.Option(
"--conda-prefix",
help="Override conda/mamba prefix path"
)]=None,
conda_type:Annotated[Optional[str], typer.Option(
"--conda-type",
help="Conda implementation: micromamba, miniforge, or conda"
)]=None,
) -> None
"CJM Plugin System CLI for managing isolated plugin environments."
def setup_runtime(
force:bool=typer.Option(False, "--force", "-f", help="Re-download even if binary exists")
) -> None
"Download and setup micromamba runtime for project-local mode."
def _check_runtime_available() -> None:
"""Check if the configured conda runtime is available, exit with helpful message if not."""
cfg = get_config()
if not ensure_runtime_available(cfg)
"Check if the configured conda runtime is available, exit with helpful message if not."
def _get_conda_cmd_str() -> str
"Get the conda/micromamba command string for shell commands."
def _download_url_to_temp(
url: str,
suffix: str = ".yml"
) -> Optional[Path]:
"Download a URL to a temporary file. Returns None if download fails."
def _resolve_env_file(
env_file: str
) -> tuple[str, Optional[Path]]:
"""
Resolve env_file to a local path, downloading if it's a URL.
Returns (local_path, temp_file) where temp_file is set if we created
a temporary file that should be cleaned up later.
"""
def run_cmd(
cmd: str,
check: bool = True
) -> None
"""
Run a shell command and stream output.
Uses the platform's default shell (no hardcoded /bin/bash).
"""
def _generate_manifest(
env_name: str,
package_name: str,
manifest_dir: Path
) -> None
"Run introspection script inside the target env to generate manifest."
def _add_conda_env_to_manifest(
manifest_dir:Path,
plugin_name:str,
env_name:str
) -> bool:
"Add conda_env field to an existing manifest file."
def _conda_env_exists_configured(
env_name: str
) -> bool:
"Check if conda environment exists using configured conda command."
def install_all(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
force:bool=typer.Option(False, help="Force recreation of environments")
) -> None
"Install and register all plugins defined in plugins.yaml."
def setup_host(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Install interface libraries in the current Python environment."
def _format_size(
size_bytes: int
) -> str:
"Format bytes as human-readable string."
def _get_pypi_size(
package_spec: str
) -> tuple[int, str]:
"Query PyPI for package download size."
def _estimate_conda_size(
env_file: str,
env_name: str
) -> tuple[int, int]:
"Estimate conda package sizes using dry-run."
def _estimate_pip_sizes(
packages: list[str]
) -> tuple[int, int, list[tuple[str, int]]]:
"Estimate pip package sizes from PyPI."
def estimate_size(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
plugin_name:Optional[str]=typer.Option(None, "--plugin", "-p", help="Estimate for a single plugin"),
verbose:bool=typer.Option(False, "--verbose", "-v", help="Show per-package breakdown")
) -> None
"Estimate disk space required for plugin environments."
def _get_conda_envs() -> set[str]:
"""Get list of existing conda environment names using configured conda command."""
cfg = get_config()
cmd_parts = build_conda_command(cfg, "env", "list", "--json")
try
"Get list of existing conda environment names using configured conda command."
def _get_installed_manifests(
manifest_dir:Optional[Path]=None
) -> list[dict]:
"Load all manifest JSON files from the manifest directory."
def _extract_env_from_python_path(
python_path:str
) -> str:
"Extract conda environment name from python_path."
def list_plugins(
plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for cross-reference"),
show_envs:bool=typer.Option(False, "--envs", "-e", help="Show conda environment status")
) -> None
"List installed plugins from manifest directory."
def remove_plugin(
plugin_name:str=typer.Argument(..., help="Name of the plugin to remove"),
plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for env name lookup"),
keep_env:bool=typer.Option(False, "--keep-env", help="Keep the conda environment, only remove manifest"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Remove a plugin's manifest and conda environment."
Configuration (config.ipynb)
Project-level configuration for paths, runtime settings, and
environment management
Import
from cjm_plugin_system.core.config import (
RuntimeMode,
CondaType,
RuntimeConfig,
CJMConfig,
load_config,
get_config,
set_config,
reset_config
)
Functions
def _load_from_yaml(
yaml_path:Path
) -> CJMConfig:
"Load config from YAML file, resolving relative paths."
def load_config(
config_path:Optional[Path]=None,
data_dir:Optional[Path]=None,
conda_prefix:Optional[Path]=None,
conda_type:Optional[str]=None
) -> CJMConfig:
"Load config with layered resolution (CLI > env vars > yaml > defaults)."
def get_config() -> CJMConfig:
"""Get current config (loads defaults if not set)."""
global _current_config
if _current_config is None
"Get current config (loads defaults if not set)."
def set_config(
config:CJMConfig
) -> None
"Set current config (called by CLI callback)."
def reset_config() -> None
"Reset to unloaded state (for testing)."
Classes
class RuntimeMode(str, Enum):
"Runtime mode for the plugin system."
class CondaType(str, Enum):
"Type of conda implementation to use."
@dataclass
class RuntimeConfig:
"Runtime environment configuration."
mode: RuntimeMode = RuntimeMode.SYSTEM
conda_type: CondaType = CondaType.CONDA
prefix: Optional[Path]
binaries: Dict[str, Path] = field(...)
@dataclass
class CJMConfig:
"Main configuration for cjm-plugin-system."
runtime: RuntimeConfig = field(...)
data_dir: Path = field(...)
plugins_config: Path = field(...)
models_dir: Optional[Path]
def manifests_dir(self) -> Path:
"""Directory containing plugin manifests."""
return self.data_dir / "manifests"
@property
def plugin_data_dir(self) -> Path:
"Directory containing plugin manifests."
def plugin_data_dir(self) -> Path:
"""Directory for plugin runtime data (databases, caches)."""
return self.data_dir / "data"
@property
def logs_dir(self) -> Path:
"Directory for plugin runtime data (databases, caches)."
def logs_dir(self) -> Path:
"""Directory containing plugin logs."""
return self.data_dir / "logs"
@property
def conda_binary_path(self) -> Optional[Path]:
"Directory containing plugin logs."
def conda_binary_path(self) -> Optional[Path]:
"""Get the configured binary path for the current platform."""
system = platform_mod.system().lower()
machine = platform_mod.machine().lower()
if system == "windows"
"Get the configured binary path for the current platform."
Variables
_current_config: Optional[CJMConfig] = None
Content Hashing Utilities (hashing.ipynb)
Shared cryptographic hashing primitives for content integrity
verification
Import
from cjm_plugin_system.utils.hashing import (
hash_bytes,
hash_file,
verify_hash
)
Functions
def hash_bytes(
content: bytes,
algo: str = "sha256"
) -> str:
"Compute a hash of byte content."
def hash_file(
path: Union[str, Path],
algo: str = "sha256",
chunk_size: int = 8192
) -> str:
"Stream-hash a file without loading it entirely into memory."
def verify_hash(
content: bytes,
expected: str
) -> bool:
"Verify content against an expected hash string."
Plugin Interface (interface.ipynb)
Abstract base class defining the generic plugin interface
Import
from cjm_plugin_system.core.interface import (
FileBackedDTO,
PluginInterface
)
Classes
@runtime_checkable
class FileBackedDTO(Protocol):
"Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
def to_temp_file(self) -> str:
"Save the data to a temporary file and return the absolute path."
class PluginInterface(ABC):
"Abstract base class for all plugins (both local workers and remote proxies)."
def name(self) -> str:
"""Unique plugin identifier."""
...
@property
@abstractmethod
def version(self) -> str:
"Unique plugin identifier."
def version(self) -> str:
"""Plugin version."""
...
@abstractmethod
def initialize(
self,
config: Optional[Dict[str, Any]] = None
) -> None
"Plugin version."
def initialize(
self,
config: Optional[Dict[str, Any]] = None
) -> None
"Initialize or re-configure the plugin."
def execute(
self,
*args,
**kwargs
) -> Any:
"Execute the plugin's main functionality."
def execute_stream(
self,
*args,
**kwargs
) -> Generator[Any, None, None]:
"Stream execution results chunk by chunk."
def get_config_schema(self) -> Dict[str, Any]:
"""Return JSON Schema describing the plugin's configuration options."""
...
@abstractmethod
def get_current_config(self) -> Dict[str, Any]:
"Return JSON Schema describing the plugin's configuration options."
def get_current_config(self) -> Dict[str, Any]:
"""Return the current configuration state as a dictionary."""
...
@abstractmethod
def cleanup(self) -> None
"Return the current configuration state as a dictionary."
def cleanup(self) -> None:
"""Clean up resources when plugin is unloaded."""
...
def cancel(self) -> None
"Clean up resources when plugin is unloaded."
def cancel(self) -> None:
"""Cancel the current execution. Override for cooperative cancellation support."""
pass
"Cancel the current execution. Override for cooperative cancellation support."
def report_progress(
self,
progress: float,
message: str = ""
) -> None
"Report execution progress. Call during execute() to update status."
Plugin Manager (manager.ipynb)
Plugin discovery, loading, and lifecycle management system
Import
from cjm_plugin_system.core.manager import (
PluginManager,
get_plugin_config,
get_plugin_config_schema,
get_all_plugin_configs,
update_plugin_config,
reload_plugin,
get_plugin_stats,
execute_plugin_stream
)
Functions
def get_plugin_config(
self,
plugin_name: str
) -> Optional[Dict[str, Any]]:
"Get the current configuration of a plugin."
def get_plugin_config_schema(
self,
plugin_name: str
) -> Optional[Dict[str, Any]]:
"Get the configuration JSON Schema for a plugin."
def get_all_plugin_configs(self) -> Dict[str, Dict[str, Any]]:
"""Get current configuration for all loaded plugins."""
return {
name: plugin.get_current_config()
"Get current configuration for all loaded plugins."
def update_plugin_config(
self,
plugin_name: str,
config: Dict[str, Any]
) -> bool:
"Update a plugin's configuration (hot-reload without restart)."
def reload_plugin(
self,
plugin_name: str,
config: Optional[Dict[str, Any]] = None
) -> bool:
"Reload a plugin by terminating and restarting its Worker."
def get_plugin_stats(
self,
plugin_name: str
) -> Optional[Dict[str, Any]]:
"Get resource usage stats for a plugin's Worker process."
async def execute_plugin_stream(
self,
plugin_name: str,
*args,
**kwargs
) -> AsyncGenerator[Any, None]:
"Execute a plugin with streaming response."
Classes
class PluginManager:
def __init__(
self,
plugin_interface:Type[PluginInterface]=PluginInterface,
search_paths:Optional[List[Path]]=None,
scheduler:Optional[ResourceScheduler]=None
)
"Manages plugin discovery, loading, and lifecycle via process isolation."
def __init__(
self,
plugin_interface:Type[PluginInterface]=PluginInterface,
search_paths:Optional[List[Path]]=None,
scheduler:Optional[ResourceScheduler]=None
)
"Initialize the plugin manager."
def register_system_monitor(
self,
plugin_name:str
) -> None
"Bind a loaded plugin to act as the hardware system monitor."
def discover_manifests(self) -> List[PluginMeta]:
"""Discover plugins via JSON manifests in search paths."""
self.discovered = []
seen_plugins = set()
for base_path in self.search_paths
"Discover plugins via JSON manifests in search paths."
def get_discovered_by_category(
self,
category:str
) -> List[PluginMeta]:
"Get discovered plugins filtered by category."
def get_plugins_by_category(
self,
category:str
) -> List[PluginMeta]:
"Get loaded plugins filtered by category."
def get_discovered_categories(self) -> List[str]:
"""Get all unique categories among discovered plugins."""
return list(set(meta.category for meta in self.discovered if meta.category))
def get_loaded_categories(self) -> List[str]:
"Get all unique categories among discovered plugins."
def get_loaded_categories(self) -> List[str]:
"""Get all unique categories among loaded plugins."""
return list(set(meta.category for meta in self.plugins.values() if meta.category))
def get_plugin_meta(
self,
plugin_name:str
) -> Optional[PluginMeta]:
"Get all unique categories among loaded plugins."
def get_plugin_meta(
self,
plugin_name:str
) -> Optional[PluginMeta]:
"Get metadata for a loaded plugin by name."
def get_discovered_meta(
self,
plugin_name:str
) -> Optional[PluginMeta]:
"Get metadata for a discovered (not necessarily loaded) plugin by name."
def load_plugin(
self,
plugin_meta:PluginMeta,
config:Optional[Dict[str, Any]]=None
) -> bool:
"Load a plugin by spawning a Worker subprocess."
def load_all(
self,
configs:Optional[Dict[str, Dict[str, Any]]]=None
) -> Dict[str, bool]:
"Discover and load all available plugins."
def unload_plugin(
self,
plugin_name:str
) -> bool:
"Unload a plugin and terminate its Worker process."
def unload_all(self) -> None:
"""Unload all plugins and terminate all Worker processes."""
for name in list(self.plugins.keys())
"Unload all plugins and terminate all Worker processes."
def get_plugin(
self,
plugin_name:str
) -> Optional[PluginInterface]:
"Get a loaded plugin instance by name."
def list_plugins(self) -> List[PluginMeta]:
"""List all loaded plugins."""
return list(self.plugins.values())
def _evict_for_resources(self, needed_meta:PluginMeta) -> bool
"List all loaded plugins."
def execute_plugin(
self,
plugin_name:str,
*args,
**kwargs
) -> Any:
"Execute a plugin's main functionality (sync)."
async def execute_plugin_async(
self,
plugin_name:str,
*args,
**kwargs
) -> Any:
"Execute a plugin's main functionality (async)."
def enable_plugin(
self,
plugin_name:str
) -> bool:
"Enable a plugin."
def disable_plugin(
self,
plugin_name:str
) -> bool:
"Disable a plugin without unloading it."
def get_plugin_logs(
self,
plugin_name:str,
lines:int=50
) -> str:
"Read the last N lines of the plugin's log file."
Plugin Metadata (metadata.ipynb)
Data structures for plugin metadata
Import
from cjm_plugin_system.core.metadata import (
PluginMeta
)
Classes
@dataclass
class PluginMeta:
"Metadata about a plugin."
name: str
version: str
description: str = ''
author: str = ''
package_name: str = ''
category: str = ''
interface: str = ''
config_schema: Optional[Dict[str, Any]]
instance: Optional[Any]
enabled: bool = True
last_executed: float = 0.0
Platform Utilities (platform.ipynb)
Cross-platform utilities for process management, path handling, and
system detection
Import
from cjm_plugin_system.core.platform import (
MICROMAMBA_URLS,
is_windows,
is_macos,
is_linux,
is_apple_silicon,
get_current_platform,
get_python_in_env,
get_popen_isolation_kwargs,
terminate_process,
terminate_self,
run_shell_command,
conda_env_exists,
get_micromamba_download_url,
download_micromamba,
get_conda_command,
build_conda_command,
get_micromamba_binary_path,
ensure_runtime_available
)
Functions
def is_windows() -> bool:
"""Check if running on Windows."""
return platform.system() == "Windows"
def is_macos() -> bool
"Check if running on Windows."
def is_macos() -> bool:
"""Check if running on macOS."""
return platform.system() == "Darwin"
def is_linux() -> bool
"Check if running on macOS."
def is_linux() -> bool:
"""Check if running on Linux."""
return platform.system() == "Linux"
def is_apple_silicon() -> bool
"Check if running on Linux."
def is_apple_silicon() -> bool
"Check if running on Apple Silicon Mac (for MPS detection)."
def get_current_platform() -> str:
"""Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""
system = platform.system().lower()
machine = platform.machine().lower()
if system == "darwin"
"""
Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""
def get_python_in_env(
env_path: Path
) -> Path:
"""
Get the Python executable path for a conda environment.
On Windows: env_path/python.exe
On Unix: env_path/bin/python
"""
def get_popen_isolation_kwargs() -> Dict[str, Any]:
"""Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""
if is_windows()
"""
Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""
def terminate_process(
process: subprocess.Popen,
timeout: float = 2.0
) -> None
"""
Terminate a subprocess gracefully, with fallback to force kill.
On all platforms:
1. Calls process.terminate() (SIGTERM on Unix, TerminateProcess on Windows)
2. Waits for timeout seconds
3. If still running, calls process.kill() (SIGKILL on Unix, TerminateProcess on Windows)
"""
def terminate_self() -> None:
"""Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""
if is_windows()
"""
Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""
def run_shell_command(
cmd: str,
check: bool = True,
capture_output: bool = False,
**kwargs
) -> subprocess.CompletedProcess
"""
Run a shell command cross-platform.
Unlike using shell=True with executable='/bin/bash', this function
uses the platform's default shell:
- Linux/macOS: /bin/sh (or $SHELL)
- Windows: cmd.exe
"""
def conda_env_exists(
env_name: str,
conda_cmd: str = "conda"
) -> bool
"""
Check if a conda environment exists (cross-platform).
Uses 'conda env list --json' instead of piping to grep,
which doesn't work on Windows.
"""
def get_micromamba_download_url(
platform_str: Optional[str] = None
) -> str:
"Get the micromamba download URL for the specified or current platform."
def download_micromamba(
dest_path: Path,
platform_str: Optional[str] = None,
show_progress: bool = True
) -> bool:
"Download and extract micromamba binary to the specified path."
def get_conda_command(
config: CJMConfig
) -> List[str]:
"Get the conda/mamba/micromamba base command with prefix args for local mode."
def build_conda_command(
config: CJMConfig,
*args: str
) -> List[str]:
"Build a complete conda/mamba/micromamba command."
def get_micromamba_binary_path(
config: CJMConfig
) -> Optional[Path]:
"Get the configured micromamba binary path for the current platform."
def ensure_runtime_available(
config: CJMConfig
) -> bool:
"Check if the configured conda/micromamba runtime is available."
Variables
MICROMAMBA_URLS: Dict[str, str]
Remote Plugin Proxy (proxy.ipynb)
Bridge between Host application and isolated Worker processes
Import
from cjm_plugin_system.core.proxy import (
RemotePluginProxy,
execute_async,
execute_stream_sync,
execute_stream,
get_stats,
is_alive,
cancel,
cancel_async,
get_progress,
get_progress_async
)
Functions
def _maybe_serialize_input(
self,
obj: Any
) -> Any:
"Convert FileBackedDTO objects to file paths for zero-copy transfer."
def _prepare_payload(
self,
args: tuple,
kwargs: dict
) -> Dict[str, Any]:
"Prepare arguments for HTTP transmission."
async def execute_async(
self,
*args,
**kwargs
) -> Any:
"Execute the plugin asynchronously."
def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
"Synchronous wrapper for streaming (blocking)."
async def execute_stream(
self,
*args,
**kwargs
) -> AsyncGenerator[Any, None]:
"Execute with streaming response (async generator)."
def get_stats(self) -> Dict[str, Any]:
"""Get worker process resource usage."""
with httpx.Client() as client
"Get worker process resource usage."
def is_alive(self) -> bool:
"""Check if the worker process is still running and responsive."""
if not self.process or self.process.poll() is not None
"Check if the worker process is still running and responsive."
def cancel(self) -> bool:
"""Request cancellation of running execution."""
try
"Request cancellation of running execution."
async def cancel_async(self) -> bool:
"""Request cancellation asynchronously."""
try
"Request cancellation asynchronously."
def get_progress(self) -> Dict[str, Any]:
"""Get current execution progress from worker."""
try
"Get current execution progress from worker."
async def get_progress_async(self) -> Dict[str, Any]:
"""Get current execution progress asynchronously."""
try
"Get current execution progress asynchronously."
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb)
"Enter context manager."
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and cleanup."""
self.cleanup()
return False
async def __aenter__(self)
"Exit context manager and cleanup."
async def __aenter__(self):
"""Enter async context manager."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb)
"Enter async context manager."
async def __aexit__(self, exc_type, exc_val, exc_tb)
"Exit async context manager and cleanup."
Classes
class RemotePluginProxy:
def __init__(
self,
manifest:Dict[str, Any]
)
"Proxy that forwards plugin calls to an isolated Worker subprocess."
def __init__(
self,
manifest:Dict[str, Any]
)
"Initialize proxy and start the worker process."
def name(self) -> str:
"""Plugin name."""
return self.manifest.get('name', 'unknown')
@property
def version(self) -> str:
"Plugin name."
def version(self) -> str:
"""Plugin version."""
return self.manifest.get('version', '0.0.0')
def _get_free_port(self) -> int
"Plugin version."
def initialize(
self,
config:Optional[Dict[str, Any]]=None
) -> None
"Initialize or reconfigure the plugin."
def execute(
self,
*args,
**kwargs
) -> Any:
"Execute the plugin synchronously."
def get_config_schema(self) -> Dict[str, Any]:
"""Get the plugin's configuration schema."""
with httpx.Client() as client
"Get the plugin's configuration schema."
def get_current_config(self) -> Dict[str, Any]:
"""Get the plugin's current configuration."""
with httpx.Client() as client
"Get the plugin's current configuration."
def cleanup(self) -> None:
"""Clean up plugin resources and terminate worker process."""
try
"Clean up plugin resources and terminate worker process."
Job Queue (queue.ipynb)
Resource-aware job queue for sequential plugin execution with
cancellation support
Import
from cjm_plugin_system.core.queue import (
JobStatus,
Job,
JobQueue,
submit,
cancel,
reorder,
get_job,
wait_for_job,
get_state,
get_job_logs,
start,
stop
)
Functions
async def submit(
self,
plugin_name: str,
*args,
priority: int = 0,
**kwargs
) -> str:
"Submit a job to the queue."
async def cancel(
self,
job_id: str
) -> bool:
"Cancel a pending or running job."
def reorder(
self,
job_id: str,
new_priority: int
) -> bool:
"Change the priority of a pending job."
def get_job(
self,
job_id: str
) -> Optional[Job]:
"Get a job by ID."
async def wait_for_job(
self,
job_id: str,
timeout: Optional[float] = None
) -> Job:
"Wait for a job to complete."
def get_state(self) -> Dict[str, Any]:
"""Get the current queue state."""
running_dict = None
if self._running
"Get the current queue state."
def get_job_logs(
self,
job_id: str,
lines: int = 100
) -> str:
"Get logs for a job from the plugin's log file."
async def start(self) -> None:
"""Start the queue processor."""
if self._running_flag
"Start the queue processor."
async def stop(self) -> None:
"""Stop the queue processor gracefully."""
self._running_flag = False
self._job_available.set()
if self._processor_task
"Stop the queue processor gracefully."
def _move_to_history(self, job: Job) -> None:
"""Move a job to history, maintaining max_history limit."""
self._history.append(job)
if len(self._history) > self.max_history
"Move a job to history, maintaining max_history limit."
def _signal_job_completed(self, job_id: str) -> None:
"""Signal that a job has completed."""
event = self._job_completed_events.get(job_id)
if event
"Signal that a job has completed."
async def _process_loop(self) -> None:
"""Main processing loop."""
while self._running_flag
"Main processing loop."
async def _execute_job(self, job: Job) -> None:
"""Execute a single job."""
self.logger.info(f"Starting job {job.id[:8]} ({job.plugin_name})")
job.status = JobStatus.running
job.started_at = time.time()
self._running = job
try
"Execute a single job."
async def _execute_with_cancellation(
self,
job: Job,
plugin: Any
) -> Any
"Execute job with cancellation monitoring."
async def _poll_progress(
self,
job: Job,
plugin: Any
) -> None
"Poll progress from the plugin during execution."
Classes
class JobStatus(str, Enum):
"Status of a job in the queue."
@dataclass
class Job:
"A queued plugin execution request."
id: str
plugin_name: str
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
status: JobStatus = JobStatus.pending
priority: int = 0
created_at: float = field(...)
started_at: Optional[float]
completed_at: Optional[float]
result: Any
error: Optional[str]
progress: float = 0.0
status_message: str = ''
class JobQueue:
def __init__(
self,
manager: PluginManager,
max_history: int = 100,
cancel_timeout: float = 3.0,
progress_poll_interval: float = 1.0
)
"Resource-aware job queue for sequential plugin execution."
def __init__(
self,
manager: PluginManager,
max_history: int = 100,
cancel_timeout: float = 3.0,
progress_poll_interval: float = 1.0
)
"Initialize the job queue."
Scheduling (scheduling.ipynb)
Resource scheduling policies for plugin execution
Import
from cjm_plugin_system.core.scheduling import (
ResourceScheduler,
PermissiveScheduler,
SafetyScheduler,
QueueScheduler
)
Classes
class ResourceScheduler(ABC):
"Abstract base class for resource allocation policies."
def allocate(
self,
plugin_meta: PluginMeta,
stats_provider: Callable[[], Dict[str, Any]]
) -> bool:
"Decide if a plugin can start based on its requirements and system state."
async def allocate_async(
self,
plugin_meta: PluginMeta,
stats_provider: Callable[[], Awaitable[Dict[str, Any]]]
) -> bool:
"Async allocation decision. Default delegates to sync allocate after fetching stats once."
def on_execution_start(
self,
plugin_name: str
) -> None
"Notify scheduler that a task started (to reserve resources)."
def on_execution_finish(
self,
plugin_name: str
) -> None
"Notify scheduler that a task finished (to release resources)."
class PermissiveScheduler(ResourceScheduler):
"Scheduler that allows all executions (Default / Dev Mode)."
def allocate(
self,
plugin_meta: PluginMeta,
stats_provider: Callable[[], Dict[str, Any]]
) -> bool:
"Allow all plugin executions without checking resources."
def on_execution_start(
self,
plugin_name: str
) -> None
"No-op for permissive scheduler."
def on_execution_finish(
self,
plugin_name: str
) -> None
"No-op for permissive scheduler."
class SafetyScheduler(ResourceScheduler):
"Scheduler that prevents execution if resources are insufficient."
def allocate(
self,
plugin_meta: PluginMeta,
stats_provider: Callable[[], Dict[str, Any]]
) -> bool:
"Check resource requirements against system state."
def on_execution_start(
self,
plugin_name: str
) -> None
"Called when execution starts (for future resource reservation)."
def on_execution_finish(
self,
plugin_name: str
) -> None
"Called when execution finishes (for future resource release)."
class QueueScheduler:
def __init__(
self,
timeout: float = 300.0,
poll_interval: float = 2.0
)
"Scheduler that waits for resources to become available."
def __init__(
self,
timeout: float = 300.0,
poll_interval: float = 2.0
)
"Initialize queue scheduler with timeout and polling settings."
def allocate(
self,
plugin_meta: PluginMeta,
stats_provider: Callable[[], Dict[str, Any]]
) -> bool:
"Wait for resources using blocking sleep."
async def allocate_async(
self,
plugin_meta: PluginMeta,
stats_provider: Callable[[], Awaitable[Dict[str, Any]]]
) -> bool:
"Wait for resources using non-blocking async sleep."
def on_execution_start(
self,
plugin_name: str
) -> None
"Track that a plugin has started executing."
def on_execution_finish(
self,
plugin_name: str
) -> None
"Track that a plugin has finished executing."
def get_active_plugins(self) -> Set[str]:
"Get the set of plugins with active executions."
Configuration Validation (validation.ipynb)
Validation helpers for plugin configuration dataclasses
Import
from cjm_plugin_system.utils.validation import (
T,
SCHEMA_TITLE,
SCHEMA_DESC,
SCHEMA_MIN,
SCHEMA_MAX,
SCHEMA_ENUM,
SCHEMA_MIN_LEN,
SCHEMA_MAX_LEN,
SCHEMA_PATTERN,
SCHEMA_FORMAT,
validate_field_value,
validate_config,
config_to_dict,
dict_to_config,
extract_defaults,
dataclass_to_jsonschema
)
Functions
def validate_field_value(
value:Any,
metadata:Dict[str, Any],
field_name:str=""
) -> Tuple[bool, Optional[str]]:
"Validate a value against field metadata constraints."
def validate_config(
config:Any
) -> Tuple[bool, Optional[str]]:
"Validate all fields in a configuration dataclass against their metadata constraints."
def config_to_dict(
config:Any
) -> Dict[str, Any]:
"Convert a configuration dataclass instance to a dictionary."
def dict_to_config(
config_class:Type[T],
data:Optional[Dict[str, Any]]=None,
validate:bool=False
) -> T:
"Create a configuration dataclass instance from a dictionary."
def extract_defaults(
config_class:Type
) -> Dict[str, Any]:
"Extract default values from a configuration dataclass type."
def _python_type_to_json_type(
python_type:type
) -> Dict[str, Any]:
"Convert Python type to JSON schema type."
def dataclass_to_jsonschema(
cls:type
) -> Dict[str, Any]:
"Convert a dataclass to a JSON schema for form generation."
Variables
T
SCHEMA_TITLE = 'title'
SCHEMA_DESC = 'description'
SCHEMA_MIN = 'minimum'
SCHEMA_MAX = 'maximum'
SCHEMA_ENUM = 'enum'
SCHEMA_MIN_LEN = 'minLength'
SCHEMA_MAX_LEN = 'maxLength'
SCHEMA_PATTERN = 'pattern'
SCHEMA_FORMAT = 'format'
Universal Worker (worker.ipynb)
FastAPI server that runs inside isolated plugin environments
Import
from cjm_plugin_system.core.worker import (
EnhancedJSONEncoder,
parent_monitor,
create_app,
run_worker
)
Functions
def parent_monitor(
ppid: int
) -> None
"""
Monitor parent process and terminate self if parent dies.
This implements the "Suicide Pact" pattern: if the Host process dies,
the Worker must terminate itself to prevent zombie processes.
"""
def create_app(
module_name: str,
class_name: str
) -> FastAPI:
"Create FastAPI app that hosts the specified plugin."
def run_worker() -> None:
"""CLI entry point for running the worker."""
parser = argparse.ArgumentParser(description="Universal Plugin Worker")
parser.add_argument("--module", required=True, help="Plugin module path")
parser.add_argument("--class", dest="class_name", required=True, help="Plugin class name")
parser.add_argument("--port", type=int, required=True, help="Port to listen on")
parser.add_argument("--ppid", type=int, required=False, help="Parent PID to monitor")
args = parser.parse_args()
if args.ppid
"CLI entry point for running the worker."
Classes
class EnhancedJSONEncoder(JSONEncoder):
"JSON encoder that handles dataclasses and other common types."
def default(
self,
o: Any
) -> Any:
"Convert non-serializable objects to serializable form."