Source code for lw_pipeline.__main__

"""Run the pipeline from the command line."""

# Authors: The Lightweight Pipeline developers
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import importlib.util
import os
import sys

from lw_pipeline import Config, Pipeline_Exception, Pipeline_Step
from lw_pipeline.helper.report import find_steps_derivatives, generate_report


def main():
    """Run pipeline from command line."""
    parser = argparse.ArgumentParser()
    parser.add_argument("-v", "--version", action="version", version="0.1")

    parser.add_argument("-r", "--run", action="store_true", help="Run the pipeline")

    parser.add_argument(
        "steps",
        metavar="TT",
        type=str,
        nargs="*",
        help="List of steps to run, separated by commas (only necessary to "
        "specify 00-99)",
    )

    parser.add_argument("-c", "--config", help="Path to the configuration file")

    parser.add_argument(
        "-l", "--list", action="store_true", help="List all steps in the step directory"
    )

    parser.add_argument(
        "--list-derivatives",
        action="store_true",
        help="List methods in steps that could be used to produce derivatives.",
    )

    parser.add_argument(
        "--ignore-questions",
        action="store_true",
        help="Ignore questions, i.e. always respond with default answer to a question.",
    )

    parser.add_argument(
        "--report",
        action="store_true",
        help="Generate a report of the pipeline's derivatives.",
    )

    parser.add_argument(
        "--store-report",
        action="store_true",
        help="Store the report tables in .tsv files in the derivatives dir "
        "(pipeline_report_bids_dir.tsv, pipeline_report_deriv_dir.tsv).",
    )

    parser.add_argument(
        "--full-report",
        action="store_true",
        help="Generate a full report (do not limit to subj, ses, task specification in"
        " the config) of the pipeline's derivatives.",
    )

    options = parser.parse_args()

    config = Config(options.config, verbose=True)
    if options.ignore_questions:
        config.auto_response = "default"

    if options.run:
        # retrieve all steps script file names
        step_files = find_all_step_files(config.steps_dir)
        if not options.steps:
            print("Running entire pipeline")
        else:
            # filter step files based on steps specified in the command line argument
            step_files_specified = []
            for step_identifier in options.steps:
                step = [
                    step_file
                    for step_file in step_files
                    if step_file.startswith(step_identifier)
                ]
                if not step:
                    print(f"Error: Step file '{step_identifier}' not found.")
                    sys.exit(1)
                if len(step) > 1:
                    print(f"Error: Step file '{step_identifier}' is ambiguous.")
                    sys.exit(1)
                step_files_specified.append(step[0])
            step_files = step_files_specified
            print("Running the steps:", ", ".join(step_files))
        pipeline = Pipeline(step_files, config)
        # run the pipeline
        pipeline.run()
    elif options.list:
        print("Steps:".center(80, "-"))
        print("\n".join(find_all_step_files(config.steps_dir)))
    elif options.list_derivatives:
        print("Derivatives:".center(80, "-"))
        find_steps_derivatives(find_all_step_files(config.steps_dir), config)
    elif options.report or options.store_report or options.full_report:
        print(f"Generating {'limited ' if not options.full_report else 'full '}report")
        generate_report(config, options.store_report, options.full_report)
    else:
        # if no arguments implying actions are given, print help
        parser.print_help()


def find_all_step_files(steps_dir):
    """Find all the .py files in the steps directory."""
    # Get a list of all python files in the steps directory
    step_files = [
        f for f in os.listdir(steps_dir) if f.endswith(".py") and not f.startswith("__")
    ]

    # Sort the step files alphabetically
    step_files.sort()

    return step_files


def find_all_step_classes(step_files, config):
    """Find all the Pipeline_Step classes in the given step files."""
    steps_dir = config.steps_dir

    step_classes = []

    # Set module name to the name of the steps directory
    module_name = os.path.basename(steps_dir)

    # Import the module
    spec = importlib.util.spec_from_file_location(
        module_name,
        os.path.join(config.steps_dir, "__init__.py"),
        submodule_search_locations=[config.steps_dir],
    )
    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    spec.loader.exec_module(module)

    # Loop through the step files and import the modules
    for step_file in step_files:
        # Skip files like __init__.py, etc.
        if step_file.startswith("__"):
            continue

        # Remove the file extension to get the module name
        step_name = os.path.splitext(step_file)[0]

        # import the submodule
        module = importlib.import_module(f"{module_name}.{step_name}")

        # Get the subclasses of PipelineStep defined in the module
        pipeline_step_classes = [
            cls
            for cls in module.__dict__.values()
            if isinstance(cls, type)
            and issubclass(cls, Pipeline_Step)
            and cls != Pipeline_Step
        ]

        # Loop through the pipeline elements and invoke them
        for pipeline_step_class in pipeline_step_classes:
            step_classes.append(pipeline_step_class(config))

    return step_classes


[docs] class Pipeline: """Pipeline class to run the pipeline steps."""
[docs] def __init__(self, steps, config=None): """ Initialize the Pipeline. Parameters ---------- steps : list A list of step file names or a list of Pipeline_Step instances. config : Config, optional An instance of Config class, required only if steps are file names """ if all(isinstance(step, str) for step in steps): if config is None: raise ValueError("Config must be provided if steps are file names.") self.pipeline_steps = find_all_step_classes(steps, config) elif all(isinstance(step, Pipeline_Step) for step in steps): self.pipeline_steps = steps else: raise ValueError( "Steps must be either a list of file names or Pipeline_Step instances." )
[docs] def run(self, data=None): """ Run the pipeline. Include all Pipeline_Step classes contained in the step_files list. Parameters ---------- data : object, optional Optional input data to be passed to the first step. Returns ------- data : object The output data after processing through all pipeline steps. """ # counter for executed steps/position in the pipeline pos = 1 if data is not None: print("Pipeline starts with following input:".center(80, "-")) print(data) for step in self.pipeline_steps: # print the number/name of the step print( f"Step {pos}: {step.__class__.__module__} / " f"{step.__class__.__name__}".center(80, "-") ) pos = pos + 1 print("ℹ " + step.description) try: data = step.step(data) except Pipeline_Exception as e: print(f"Error in {step.description}: {e}") sys.exit(1) print("Pipeline finished with following output:".center(80, "-")) print(data) return data
if __name__ == "__main__": main()