Source code for lw_pipeline.pipeline_step

"""Main pipeline class to abstract from pipeline steps."""

# Authors: The Lightweight Pipeline developers
# SPDX-License-Identifier: BSD-3-Clause

import logging
from abc import ABC, abstractmethod

from lw_pipeline.helper.naming import guess_short_id
from lw_pipeline.output_manager import Output_Manager
from lw_pipeline.output_registration import Output_Registry


[docs] class Pipeline_Step(ABC): """Abstract class for a pipeline step."""
[docs] def __init__(self, description, config, short_id=""): self.description = description if short_id: self._short_id = short_id else: self._short_id = guess_short_id(self.__class__.__module__) self._config = config # Initialize output management self._output_manager = None self._output_registry = None # Initialize logger for this step self._logger = None
@property def logger(self): """ Get a logger for this pipeline step. Returns ------- logging.Logger Logger instance named after this step. """ if self._logger is None: self._logger = logging.getLogger(f"lw_pipeline.step.{self.short_id}") return self._logger @property def config(self): """Configuration of the pipeline step.""" return self._config @property def short_id(self): """Short id of the pipeline step.""" return self._short_id @property def output_manager(self): """ Get the Output_Manager for this step. Returns ------- Output_Manager Manager for saving outputs with consistent paths and metadata. """ if self._output_manager is None: # Create output manager without registry first self._output_manager = Output_Manager( self.config, self.short_id, self.description ) # Inject registry to avoid circular reference during initialization self._output_manager.set_registry(self.output_registry) return self._output_manager @property def output_registry(self): """ Get the Output_Registry for this step. Returns ------- Output_Registry Registry of registered outputs for this step. """ if self._output_registry is None: self._output_registry = Output_Registry(self) return self._output_registry
[docs] def get_output_path( self, name, suffix=None, extension=None, use_bids_structure=False, custom_dir=None, **bids_params, ): """ Get an output file path for this step. This is a convenience wrapper around output_manager.get_output_path(). Parameters ---------- name : str Output name (will be prefixed with step_id). suffix : str, optional BIDS suffix. extension : str, optional File extension. use_bids_structure : bool, optional Use BIDS directory structure. Default is False. custom_dir : str or Path, optional Custom output directory. **bids_params : dict BIDS parameters (subject, session, task, run, datatype). Returns ------- Path Output file path. """ return self.output_manager.get_output_path( name, suffix=suffix, extension=extension, use_bids_structure=use_bids_structure, custom_dir=custom_dir, **bids_params, )
[docs] def should_generate_output(self, name): """ Check if a specific output should be generated. This method checks the configuration and registered outputs to determine if the output matching the given name should be generated. Use this in methods decorated with @register_output to conditionally skip expensive computations. Parameters ---------- name : str Name of the output to check. Returns ------- bool True if the output should be generated. Examples -------- >>> @register_output("expensive_plot", enabled_by_default=False) >>> def create_plot(self): ... if not self.should_generate_output("expensive_plot"): ... return ... # ... expensive plotting code ... """ return self.output_registry.should_generate(name, self.config)
[docs] @abstractmethod def step(self, data): """Abstract method to be implemented by the pipeline step.""" pass
[docs] class Pipeline_Exception(Exception): """Exception class for the pipeline.""" pass