From c537fd26cd4f1da6b45bcd06e093171e2a5d3872 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Tue, 16 Jul 2024 21:51:26 -0700 Subject: [PATCH] Initial implementation of a runtime configuration mechanism to allow easier config modification. --- .gitignore | 2 +- example_runtime_config.toml | 14 +++++++ pyproject.toml | 1 + .../resource_configs/klone_configuration.py | 4 ++ src/kbmod_wf/utilities/__init__.py | 5 ++- .../utilities/configuration_utilities.py | 30 ++++++++++++++ .../utilities/memoization_utilities.py | 9 +++++ src/kbmod_wf/workflow.py | 40 ++++++++++++++----- 8 files changed, 93 insertions(+), 12 deletions(-) create mode 100644 example_runtime_config.toml create mode 100644 src/kbmod_wf/utilities/memoization_utilities.py diff --git a/.gitignore b/.gitignore index db67270..73b5b4e 100644 --- a/.gitignore +++ b/.gitignore @@ -150,4 +150,4 @@ _html/ .initialize_new_project.sh # Parsl log files -runinfo/ +run_logs/ diff --git a/example_runtime_config.toml b/example_runtime_config.toml new file mode 100644 index 0000000..d30f55d --- /dev/null +++ b/example_runtime_config.toml @@ -0,0 +1,14 @@ + +# All values set here will be applied to the resource configuration prior to +# calling parsl.load(config). Even if the key does't exist in the resource +# config, it will be added with the value defined here. +[resource_config_modifiers] +checkpoint_mode = "task_exit" + + +# Values in the apps.XXX section will be passed as a dictionary to the corresponding +# app. e.g. apps.create_uri_manifest will be passed to the create_uri_manifest app. +[apps.create_uri_manifest] +# The path to the staging directory +# e.g. "/gscratch/dirac/kbmod/workflow/staging" +staging_directory = "/home/drew/code/kbmod-wf/dev_staging" \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index fc34e04..b6e20cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ "parsl", # The primary workflow orchestration tool + "toml", # Used to read runtime configuration files ] [project.urls] diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index e18477c..78a045f 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -3,6 +3,7 @@ from parsl import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider, SlurmProvider +from parsl.utils import get_all_checkpoints walltimes = { "compute-bigmem": "01:00:00", # change this to be appropriate @@ -11,6 +12,9 @@ def klone_resource_config(): return Config( + app_cache=True, + checkpoint_mode="task_exit", + checkpoint_files=get_all_checkpoints(), run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()), executors=[ HighThroughputExecutor( diff --git a/src/kbmod_wf/utilities/__init__.py b/src/kbmod_wf/utilities/__init__.py index 59286d2..2c4e578 100644 --- a/src/kbmod_wf/utilities/__init__.py +++ b/src/kbmod_wf/utilities/__init__.py @@ -1,5 +1,6 @@ -from .configuration_utilities import get_resource_config +from .configuration_utilities import get_resource_config, apply_runtime_updates from .executor_utilities import get_executors from .logger_utilities import configure_logger +from .memoization_utilities import id_for_memo_file -__all__ = ["get_resource_config", "get_executors", "configure_logger"] +__all__ = ["apply_runtime_updates", "get_resource_config", "get_executors", "configure_logger"] diff --git a/src/kbmod_wf/utilities/configuration_utilities.py b/src/kbmod_wf/utilities/configuration_utilities.py index 8fcc494..b244563 100644 --- a/src/kbmod_wf/utilities/configuration_utilities.py +++ b/src/kbmod_wf/utilities/configuration_utilities.py @@ -1,4 +1,5 @@ import platform +import toml from typing import Literal from kbmod_wf.resource_configs import * @@ -53,3 +54,32 @@ def is_running_on_wsl() -> bool: except FileNotFoundError: pass return False + + +def apply_runtime_updates(resource_config, runtime_config): + """Before calling parsl.load(config), we want to modify any resource configuration + parameters with any runtime configuration options that might be set. + + Any key in the top level of the runtime_config dictionary that matches a + parameter of the resource_config will be used to override the resource_config + value. + + Parameters + ---------- + resource_config : parsl.config.Config + The configuration object that defines the computational resources for + running the workflow. These are defined in the resource_configs module. + runtime_config : dict + This is the set of runtime configuration options that are used to modify + the workflow on a per-run basis. + + Returns + ------- + parsl.config.Config + The original resource_config updated with values from runtime_config + """ + resource_config_modifiers = runtime_config.get("resource_config_modifiers", {}) + for key, value in resource_config_modifiers.items(): + setattr(resource_config, key, value) + + return resource_config diff --git a/src/kbmod_wf/utilities/memoization_utilities.py b/src/kbmod_wf/utilities/memoization_utilities.py new file mode 100644 index 0000000..5d6a5e7 --- /dev/null +++ b/src/kbmod_wf/utilities/memoization_utilities.py @@ -0,0 +1,9 @@ +import os +from parsl.dataflow.memoization import id_for_memo +from parsl import File + + +@id_for_memo.register(File) +def id_for_memo_file(parsl_file_object: File, output_ref: bool = False) -> bytes: + if output_ref and os.path.exists(parsl_file_object.filepath): + return pickle.dumps(parsl_file_object.filepath) diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index 2fe9b8f..ea99c8b 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -1,16 +1,17 @@ import argparse import os +import toml import parsl from parsl import python_app, File import parsl.executors -from kbmod_wf.utilities.configuration_utilities import get_resource_config +from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config from kbmod_wf.utilities.executor_utilities import get_executors from kbmod_wf.utilities.logger_utilities import configure_logger @python_app(executors=get_executors(["local_dev_testing", "local_thread"])) -def create_uri_manifest(inputs=[], outputs=[], directory_path=None, logging_file=None): +def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None): """This app will go to a given directory, find all of the uri.lst files there, and copy the paths to the manifest file.""" import glob @@ -19,11 +20,11 @@ def create_uri_manifest(inputs=[], outputs=[], directory_path=None, logging_file logger = configure_logger("task.create_uri_manifest", logging_file.filepath) - #! We need to do something about this immediately. `__file__` doesn't seem to - #! be valid here when running on klone + directory_path = config.get("staging_directory") if directory_path is None: - this_dir = os.path.dirname(os.path.abspath(__file__)) - directory_path = os.path.abspath(os.path.join(this_dir, "../../dev_staging")) + raise ValueError("No staging_directory provided in the configuration.") + + logger.info(f"Looking for staged files in {directory_path}") # Gather all the *.lst entries in the directory pattern = os.path.join(directory_path, "*.lst") @@ -124,11 +125,20 @@ def kbmod_search(inputs=[], outputs=[], logging_file=None): return outputs[0] -def workflow_runner(env=None): - with parsl.load(get_resource_config(env=env)) as dfk: +def workflow_runner(env: str = None, runtime_config: dict = None) -> None: + resource_config = get_resource_config(env=env) + + if runtime_config is not None: + resource_config = apply_runtime_updates(resource_config, runtime_config) + app_configs = runtime_config.get("apps", {}) + + with parsl.load(resource_config) as dfk: logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) logger = configure_logger("workflow.workflow_runner", logging_file.filepath) + if runtime_config is not None: + logger.info(f"Using runtime configuration defintion:\n{toml.dumps(runtime_config)}") + logger.info("Starting workflow") # gather all the .lst files that are staged for processing @@ -136,6 +146,7 @@ def workflow_runner(env=None): create_uri_manifest_future = create_uri_manifest( inputs=[], outputs=[manifest_file], + config=app_configs.get("create_uri_manifest", {}), logging_file=logging_file, ) @@ -201,6 +212,17 @@ def workflow_runner(env=None): help="The environment to run the workflow in.", ) + parser.add_argument( + "--runtime-config", + type=str, + help="The complete runtime configuration filepath to use for the workflow.", + ) + args = parser.parse_args() - workflow_runner(env=args.env) + # if a runtime_config file was provided and exists, load the toml as a dict. + if args.runtime_config is not None and os.path.exists(args.runtime_config): + with open(args.runtime_config, "r") as toml_runtime_config: + runtime_config = toml.load(toml_runtime_config) + + workflow_runner(env=args.env, runtime_config=runtime_config)