Source code for lw_pipeline.pipeline_step
"""Main pipeline class to abstract from pipeline steps."""
# Authors: The Lightweight Pipeline developers
# SPDX-License-Identifier: BSD-3-Clause
from abc import ABC, abstractmethod
from lw_pipeline.helper.naming import guess_short_id
[docs]
class Pipeline_Step(ABC):
"""Abstract class for a pipeline step."""
[docs]
def __init__(self, description, config, short_id=""):
self.description = description
if short_id:
self._short_id = short_id
else:
self._short_id = guess_short_id(self.__class__.__module__)
self._config = config
@property
def config(self):
"""Configuration of the pipeline step."""
return self._config
@property
def short_id(self):
"""Short id of the pipeline step."""
return self._short_id
[docs]
@abstractmethod
def step(self, data):
"""Abstract method to be implemented by the pipeline step."""
pass
[docs]
class Pipeline_Exception(Exception):
"""Exception class for the pipeline."""
pass