lw_pipeline.Pipeline#
- class lw_pipeline.Pipeline(steps)[source]#
Bases:
objectPipeline class to run pipeline steps in sequence.
The Pipeline orchestrates the execution of multiple pipeline steps, passing data through each step in order and handling errors appropriately.
- Parameters:
steps (list of Pipeline_Step) – A list of Pipeline_Step instances to execute in sequence.
- pipeline_steps#
The steps to be executed.
- Type:
list of Pipeline_Step
Examples
>>> from lw_pipeline import Pipeline, Pipeline_Step, Config >>> >>> class Multiply_Step(Pipeline_Step): ... def __init__(self, factor, config): ... super().__init__(f"Multiply by {factor}", config) ... self.factor = factor ... def step(self, data): ... return data * self.factor >>> >>> config = Config() >>> step1 = Multiply_Step(2, config) >>> step2 = Multiply_Step(3, config) >>> pipeline = Pipeline([step1, step2]) >>> result = pipeline.run(5) # 5 * 2 * 3 = 30
- __init__(steps)[source]#
Initialize the Pipeline.
- Parameters:
steps (list of Pipeline_Step) – A list of Pipeline_Step instances to execute in sequence.
- Raises:
ValueError – If steps is not a list of Pipeline_Step instances.
Methods
- run(data=None)[source]#
Run the pipeline.
Execute all pipeline steps in sequence, passing data from one step to the next. Handles Pipeline_Exception errors and reports progress.
- Parameters:
data (object, optional) – Optional input data to be passed to the first step. Default is None.
- Returns:
data – The output data after processing through all pipeline steps.
- Return type:
object
- Raises:
SystemExit – If a Pipeline_Exception occurs during step execution.