Source code for lw_pipeline.discovery

"""Step discovery and dynamic loading utilities."""

# Authors: The Lightweight Pipeline developers
# SPDX-License-Identifier: BSD-3-Clause

import importlib.util
import os
import sys

from lw_pipeline.pipeline_step import Pipeline_Step


[docs] def find_all_step_files(steps_dir): """ Find all .py files in the steps directory. Scans the specified directory for Python files that can contain pipeline step definitions. Excludes special files like __init__.py. Parameters ---------- steps_dir : str Path to the directory containing step files. Returns ------- list of str Sorted list of Python filenames (without path) found in the directory. Files starting with '__' are excluded. Examples -------- >>> find_all_step_files("steps/") ['00_preprocessing.py', '01_analysis.py', '02_visualization.py'] """ step_files = [] for file in os.listdir(steps_dir): if file.endswith(".py") and not file.startswith("__"): step_files.append(file) return sorted(step_files)
[docs] def find_all_step_classes(step_files, config): """ Find and instantiate all Pipeline_Step classes from step files. Dynamically imports Python modules from the specified step files, searches for Pipeline_Step subclasses, and instantiates them with the provided configuration. Parameters ---------- step_files : list of str List of Python filenames containing step definitions. Files should be in the directory specified by config.steps_dir. config : Config Configuration object to pass to step constructors. Returns ------- list of Pipeline_Step List of instantiated Pipeline_Step objects ready for execution. Notes ----- - The function automatically discovers all Pipeline_Step subclasses in each module (excluding the base Pipeline_Step class itself). - Each class is instantiated with the provided config object. - Module names are derived from the steps_dir basename. Examples -------- >>> from lw_pipeline import Config >>> config = Config("config.py") >>> step_files = find_all_step_files(config.steps_dir) >>> steps = find_all_step_classes(step_files, config) >>> len(steps) 3 """ steps_dir = config.steps_dir step_classes = [] # Set module name to the name of the steps directory module_name = os.path.basename(steps_dir) # Import the steps package spec = importlib.util.spec_from_file_location( module_name, os.path.join(steps_dir, "__init__.py"), submodule_search_locations=[steps_dir], ) module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) # Loop through step files and import modules for step_file in step_files: # Skip special files if step_file.startswith("__"): continue # Remove file extension to get module name step_name = os.path.splitext(step_file)[0] # Import the submodule module = importlib.import_module(f"{module_name}.{step_name}") # Get Pipeline_Step subclasses defined in the module pipeline_step_classes = [ cls for cls in module.__dict__.values() if isinstance(cls, type) and issubclass(cls, Pipeline_Step) and cls != Pipeline_Step ] # Instantiate each step class with the config for pipeline_step_class in pipeline_step_classes: step_classes.append(pipeline_step_class(config)) return step_classes
[docs] def list_all_outputs(config): """ List all registered outputs in pipeline steps. Discovers all pipeline steps and displays their registered outputs in a formatted table, showing which outputs are enabled by default. Parameters ---------- config : Config Configuration object containing steps_dir. Notes ----- Outputs are marked with: - ✓ : Enabled by default - ○ : Disabled by default Examples -------- >>> from lw_pipeline import Config >>> config = Config("config.py") >>> list_all_outputs(config) 00 - Preprocessing: Preprocess raw data Outputs: ✓ cleaned_data - Cleaned EEG data ○ debug_plot - Debug visualization (disabled by default) """ step_files = find_all_step_files(config.steps_dir) steps = find_all_step_classes(step_files, config) for step in steps: print(f"\n{step.short_id} - {step.__class__.__name__}:") print(f" {step.description}") outputs = step.output_registry.list_outputs(include_disabled=True) if outputs: print(" Outputs:") for name, description, enabled in outputs: marker = "✓" if enabled else "○" suffix = "" if enabled else " (disabled by default)" print(f" {marker} {name} - {description}{suffix}") else: print(" No registered outputs")