Source code for lw_pipeline.config

"""Configuration settings to pass throught the pipeline."""

# Authors: The Lightweight Pipeline developers
# SPDX-License-Identifier: BSD-3-Clause

import importlib
import os
import sys

from lw_pipeline import Pipeline_Exception


[docs] class Config: """A class representing the configuration settings.""" config_file_path = None """The path to an external configuration file."""
[docs] def __init__(self, config_file_path=None, verbose=False): """ Initialize the Config object. Parameters ---------- config_file_path : str, optional The path to the configuration file. If provided, the configuration settings will be updated based on the variables defined in the file. verbose : bool, optional If True, print messages about the configuration file being used. Default is False. """ if config_file_path: self.config_file_path = config_file_path config_file = os.path.abspath(config_file_path) if os.path.isfile(config_file): config_dir = os.path.dirname(config_file) sys.path.insert( 0, config_dir ) # add the config directory to the module search path self._load_file_to_variables( config_file, verbose=verbose ) # check for a local version of the config file config_basename, config_ext = os.path.splitext( os.path.basename(config_file) ) local_config_file = os.path.join( config_dir, f"{config_basename}_local{config_ext}" ) if os.path.isfile(local_config_file): self._load_file_to_variables(local_config_file, verbose=verbose) else: raise FileNotFoundError( f"Specified configuration file not found: {config_file_path}." ) else: if verbose: print("Using default configuration file.")
def _load_file_to_variables(self, file_path, verbose=False): """Load variables from a specified configuration file into the current class.""" module_name = os.path.splitext(os.path.basename(file_path))[0] config_module = importlib.import_module(module_name) # Update the current variables in the this class with the ones from # the specified configuration file vars(self).update( { k: v for k, v in vars(config_module).items() if not k.startswith("_") } ) self.check_steps_dir() if verbose: print(f"Using configuration file: {file_path}.")
[docs] def check_steps_dir(self): """ Make sure steps dir is absolute. Notes ----- - If `config_file_path` is not `None`, the relative `steps_dir` is resolved relative to the directory containing the configuration file. - If `config_file_path` is `None`, the relative `steps_dir` is resolved relative to the current working directory. Parameters ---------- steps_dir : str The directory path for steps, which will be converted to an absolute path. config_file_path : str or None The path to the configuration file, used to resolve relative paths. """ value = self.steps_dir # check if steps dir is relative in that case make it relative to config file # or the current working directory if not os.path.isabs(value): # check if there is an externatl config file or if default config is used if self.config_file_path is not None: value = os.path.join(os.path.dirname(self.config_file_path), value) else: value = os.path.join(os.getcwd(), value) # make steps dir absolute value = os.path.abspath(value) self.steps_dir = value
[docs] def ask(self, message, default="n"): """ Ask to do something, e.g. before potentially deleting data, etc. Make sure to specify options, e.g. (y/n), in the message. """ if self.auto_response == "off": try: response = input(f"\u26a0 Question: {message}: ") except EOFError: # e.g. if not run interactively raise Pipeline_Exception( f"Could not obtain response to question: ({message}). " "Make sure to specify auto_response in the config, or run " "with --ignore-questions to use the default response." ) return response elif self.auto_response == "default": return default else: return self.auto_response
[docs] def set_variable_and_write_to_config_file(self, variable, value): """ Set a variable in this class and write to config file, if not defined there. For safety, only allow to write variables that are not already set. Args: variable (str): The name of the variable to update. value (mixed): The value to set the variable to. """ if hasattr(self, variable) and getattr(self, variable): raise Pipeline_Exception( "Cannot overwrite already set variable in configuration file." ) return if not self.config_file_path: raise Pipeline_Exception("No configuration file specified .") return setattr(self, variable, value) with open(self.config_file_path, "a") as f: f.write(f"\n{variable} = {value}\n") print(f"Configuration file updated: {self.config_file_path}")
[docs] def get_version(self): """ Get a version of the pipeline by getting last commit hash from the git. Cave: This only works if the pipeline is in a git repository. If not, it will return "unknown". """ try: import subprocess # make sure to execute git commands in the root directory of the repository root_dir = os.path.dirname(os.path.abspath(__file__)) git_hash = ( subprocess.check_output( ["git", "rev-parse", "--short", "HEAD"], cwd=root_dir ) .strip() .decode("utf-8") ) version = f"git-{git_hash}" except Exception: version = "unknown" return version
# general default variables # ------------------------- steps_dir = "steps/" """Steps directory relative to config file or current working directory if no """ """external config file is used.""" auto_response = "off" """Decide how questions are answered (off/y/n/default)""" data_dir = os.path.join(os.path.expanduser("~"), "data") """Default data directory""" bids_root = os.path.join(data_dir, "bids") """Root directory for BIDS formatted data""" subjects = [] """List of subjects to include in the pipeline processing. If empty list, \ include all subjects""" sessions = [] """List of sessions to include in the pipeline processing. If empty list, \ include all sessions""" tasks = [] """List of tasks to include in the pipeline processing. If empty list, \ include all tasks""" # variables for PipelineData class # -------------------------------- deriv_root = os.path.join(data_dir, "derivatives") """Root directory for derivatives""" overwrite = False """Overwrite existing derivative files, if False they are skipped""" eeg_path = {} """Path to the eeg data which should be converted to BIDS Structure: subject -> condition -> task -> list of eeg files (runs) File names expected relative to data_dir""" bids_acquisition = None """EEG information that should be included in the BIDS file""" bids_datatype = "eeg" """BIDS datatype of the data created as derivatives in the pipeline""" bids_extension = ".edf" """Extension of the BIDS files in the bids root directory""" n_jobs = 1 """Number of parallel jobs to run"""
# default variables for conversion ... # default variables preprocessing ... # default values analysis ...