Source code for lw_pipeline.pipeline
"""Core Pipeline class for orchestrating pipeline steps."""
# Authors: The Lightweight Pipeline developers
# SPDX-License-Identifier: BSD-3-Clause
import logging
import sys
from lw_pipeline.pipeline_step import Pipeline_Exception, Pipeline_Step
lgr = logging.getLogger(__name__)
def _banner(message: str, width: int = 80, fill: str = "-") -> str:
"""Return a centered banner line for console/file logs."""
return f" {message} ".center(width, fill)
[docs]
class Pipeline:
"""
Pipeline class to run pipeline steps in sequence.
The Pipeline orchestrates the execution of multiple pipeline steps,
passing data through each step in order and handling errors appropriately.
Parameters
----------
steps : list of Pipeline_Step
A list of Pipeline_Step instances to execute in sequence.
Attributes
----------
pipeline_steps : list of Pipeline_Step
The steps to be executed.
Examples
--------
>>> from lw_pipeline import Pipeline, Pipeline_Step, Config
>>>
>>> class Multiply_Step(Pipeline_Step):
... def __init__(self, factor, config):
... super().__init__(f"Multiply by {factor}", config)
... self.factor = factor
... def step(self, data):
... return data * self.factor
>>>
>>> config = Config()
>>> step1 = Multiply_Step(2, config)
>>> step2 = Multiply_Step(3, config)
>>> pipeline = Pipeline([step1, step2])
>>> result = pipeline.run(5) # 5 * 2 * 3 = 30
"""
[docs]
def __init__(self, steps):
"""
Initialize the Pipeline.
Parameters
----------
steps : list of Pipeline_Step
A list of Pipeline_Step instances to execute in sequence.
Raises
------
ValueError
If steps is not a list of Pipeline_Step instances.
"""
if not all(isinstance(step, Pipeline_Step) for step in steps):
raise ValueError(
"All steps must be Pipeline_Step instances. "
"Use discovery.find_all_step_classes() to load steps from files."
)
self.pipeline_steps = steps
[docs]
def run(self, data=None):
"""
Run the pipeline.
Execute all pipeline steps in sequence, passing data from one
step to the next. Handles Pipeline_Exception errors and reports
progress.
Parameters
----------
data : object, optional
Optional input data to be passed to the first step.
Default is None.
Returns
-------
data : object
The output data after processing through all pipeline steps.
Raises
------
SystemExit
If a Pipeline_Exception occurs during step execution.
"""
pos = 1
if data is not None:
lgr.info(_banner("Pipeline starts with following input:"))
lgr.info("%s", data)
for step in self.pipeline_steps:
step_label = (
f"Step {pos}: {step.__class__.__module__} / "
f"{step.__class__.__name__}"
)
lgr.info(_banner(step_label))
lgr.info("- %s", step.description)
pos += 1
try:
data = step.step(data)
except Pipeline_Exception as e:
lgr.error("Error in %s: %s", step.description, e)
sys.exit(1)
lgr.info(_banner("Pipeline finished with following output:"))
lgr.info("%s", data)
return data