Source code for lw_pipeline.pipeline

"""Core Pipeline class for orchestrating pipeline steps."""

# Authors: The Lightweight Pipeline developers
# SPDX-License-Identifier: BSD-3-Clause

import sys

from lw_pipeline.pipeline_step import Pipeline_Exception, Pipeline_Step


[docs] class Pipeline: """ Pipeline class to run pipeline steps in sequence. The Pipeline orchestrates the execution of multiple pipeline steps, passing data through each step in order and handling errors appropriately. Parameters ---------- steps : list of Pipeline_Step A list of Pipeline_Step instances to execute in sequence. Attributes ---------- pipeline_steps : list of Pipeline_Step The steps to be executed. Examples -------- >>> from lw_pipeline import Pipeline, Pipeline_Step, Config >>> >>> class Multiply_Step(Pipeline_Step): ... def __init__(self, factor, config): ... super().__init__(f"Multiply by {factor}", config) ... self.factor = factor ... def step(self, data): ... return data * self.factor >>> >>> config = Config() >>> step1 = Multiply_Step(2, config) >>> step2 = Multiply_Step(3, config) >>> pipeline = Pipeline([step1, step2]) >>> result = pipeline.run(5) # 5 * 2 * 3 = 30 """
[docs] def __init__(self, steps): """ Initialize the Pipeline. Parameters ---------- steps : list of Pipeline_Step A list of Pipeline_Step instances to execute in sequence. Raises ------ ValueError If steps is not a list of Pipeline_Step instances. """ if not all(isinstance(step, Pipeline_Step) for step in steps): raise ValueError( "All steps must be Pipeline_Step instances. " "Use discovery.find_all_step_classes() to load steps from files." ) self.pipeline_steps = steps
[docs] def run(self, data=None): """ Run the pipeline. Execute all pipeline steps in sequence, passing data from one step to the next. Handles Pipeline_Exception errors and reports progress. Parameters ---------- data : object, optional Optional input data to be passed to the first step. Default is None. Returns ------- data : object The output data after processing through all pipeline steps. Raises ------ SystemExit If a Pipeline_Exception occurs during step execution. """ pos = 1 if data is not None: print("Pipeline starts with following input:".center(80, "-")) print(data) for step in self.pipeline_steps: # Print step information print( f"Step {pos}: {step.__class__.__module__} / " f"{step.__class__.__name__}".center(80, "-") ) pos += 1 print("ℹ " + step.description) try: data = step.step(data) except Pipeline_Exception as e: print(f"Error in {step.description}: {e}") sys.exit(1) print("Pipeline finished with following output:".center(80, "-")) print(data) return data