lw_pipeline.Pipeline#

class lw_pipeline.Pipeline(steps)[source]#

Bases: object

Pipeline class to run pipeline steps in sequence.

The Pipeline orchestrates the execution of multiple pipeline steps, passing data through each step in order and handling errors appropriately.

Parameters:

steps (list of Pipeline_Step) – A list of Pipeline_Step instances to execute in sequence.

pipeline_steps#

The steps to be executed.

Type:

list of Pipeline_Step

Examples

>>> from lw_pipeline import Pipeline, Pipeline_Step, Config
>>>
>>> class Multiply_Step(Pipeline_Step):
...     def __init__(self, factor, config):
...         super().__init__(f"Multiply by {factor}", config)
...         self.factor = factor
...     def step(self, data):
...         return data * self.factor
>>>
>>> config = Config()
>>> step1 = Multiply_Step(2, config)
>>> step2 = Multiply_Step(3, config)
>>> pipeline = Pipeline([step1, step2])
>>> result = pipeline.run(5)  # 5 * 2 * 3 = 30
__init__(steps)[source]#

Initialize the Pipeline.

Parameters:

steps (list of Pipeline_Step) – A list of Pipeline_Step instances to execute in sequence.

Raises:

ValueError – If steps is not a list of Pipeline_Step instances.

Methods

__init__(steps)

Initialize the Pipeline.

run([data])

Run the pipeline.

run(data=None)[source]#

Run the pipeline.

Execute all pipeline steps in sequence, passing data from one step to the next. Handles Pipeline_Exception errors and reports progress.

Parameters:

data (object, optional) – Optional input data to be passed to the first step. Default is None.

Returns:

data – The output data after processing through all pipeline steps.

Return type:

object

Raises:

SystemExit – If a Pipeline_Exception occurs during step execution.