Skip to content

Commit

Permalink
Merge pull request #14 from dirac-institute/issue/13/runtime-config-i…
Browse files Browse the repository at this point in the history
…mplementation

Initial implementation of a runtime configuration mechanism
  • Loading branch information
drewoldag authored Jul 17, 2024
2 parents efc9494 + c537fd2 commit 2977f63
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,4 @@ _html/
.initialize_new_project.sh

# Parsl log files
runinfo/
run_logs/
14 changes: 14 additions & 0 deletions example_runtime_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

# All values set here will be applied to the resource configuration prior to
# calling parsl.load(config). Even if the key does't exist in the resource
# config, it will be added with the value defined here.
[resource_config_modifiers]
checkpoint_mode = "task_exit"


# Values in the apps.XXX section will be passed as a dictionary to the corresponding
# app. e.g. apps.create_uri_manifest will be passed to the create_uri_manifest app.
[apps.create_uri_manifest]
# The path to the staging directory
# e.g. "/gscratch/dirac/kbmod/workflow/staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dynamic = ["version"]
requires-python = ">=3.9"
dependencies = [
"parsl", # The primary workflow orchestration tool
"toml", # Used to read runtime configuration files
]

[project.urls]
Expand Down
4 changes: 4 additions & 0 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from parsl import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider, SlurmProvider
from parsl.utils import get_all_checkpoints

walltimes = {
"compute-bigmem": "01:00:00", # change this to be appropriate
Expand All @@ -11,6 +12,9 @@

def klone_resource_config():
return Config(
app_cache=True,
checkpoint_mode="task_exit",
checkpoint_files=get_all_checkpoints(),
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
executors=[
HighThroughputExecutor(
Expand Down
5 changes: 3 additions & 2 deletions src/kbmod_wf/utilities/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .configuration_utilities import get_resource_config
from .configuration_utilities import get_resource_config, apply_runtime_updates
from .executor_utilities import get_executors
from .logger_utilities import configure_logger
from .memoization_utilities import id_for_memo_file

__all__ = ["get_resource_config", "get_executors", "configure_logger"]
__all__ = ["apply_runtime_updates", "get_resource_config", "get_executors", "configure_logger"]
30 changes: 30 additions & 0 deletions src/kbmod_wf/utilities/configuration_utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import platform
import toml
from typing import Literal

from kbmod_wf.resource_configs import *
Expand Down Expand Up @@ -53,3 +54,32 @@ def is_running_on_wsl() -> bool:
except FileNotFoundError:
pass
return False


def apply_runtime_updates(resource_config, runtime_config):
"""Before calling parsl.load(config), we want to modify any resource configuration
parameters with any runtime configuration options that might be set.
Any key in the top level of the runtime_config dictionary that matches a
parameter of the resource_config will be used to override the resource_config
value.
Parameters
----------
resource_config : parsl.config.Config
The configuration object that defines the computational resources for
running the workflow. These are defined in the resource_configs module.
runtime_config : dict
This is the set of runtime configuration options that are used to modify
the workflow on a per-run basis.
Returns
-------
parsl.config.Config
The original resource_config updated with values from runtime_config
"""
resource_config_modifiers = runtime_config.get("resource_config_modifiers", {})
for key, value in resource_config_modifiers.items():
setattr(resource_config, key, value)

return resource_config
9 changes: 9 additions & 0 deletions src/kbmod_wf/utilities/memoization_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os
from parsl.dataflow.memoization import id_for_memo
from parsl import File


@id_for_memo.register(File)
def id_for_memo_file(parsl_file_object: File, output_ref: bool = False) -> bytes:
if output_ref and os.path.exists(parsl_file_object.filepath):
return pickle.dumps(parsl_file_object.filepath)
40 changes: 31 additions & 9 deletions src/kbmod_wf/workflow.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import argparse
import os
import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities.configuration_utilities import get_resource_config
from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config
from kbmod_wf.utilities.executor_utilities import get_executors
from kbmod_wf.utilities.logger_utilities import configure_logger


@python_app(executors=get_executors(["local_dev_testing", "local_thread"]))
def create_uri_manifest(inputs=[], outputs=[], directory_path=None, logging_file=None):
def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None):
"""This app will go to a given directory, find all of the uri.lst files there,
and copy the paths to the manifest file."""
import glob
Expand All @@ -19,11 +20,11 @@ def create_uri_manifest(inputs=[], outputs=[], directory_path=None, logging_file

logger = configure_logger("task.create_uri_manifest", logging_file.filepath)

#! We need to do something about this immediately. `__file__` doesn't seem to
#! be valid here when running on klone
directory_path = config.get("staging_directory")
if directory_path is None:
this_dir = os.path.dirname(os.path.abspath(__file__))
directory_path = os.path.abspath(os.path.join(this_dir, "../../dev_staging"))
raise ValueError("No staging_directory provided in the configuration.")

logger.info(f"Looking for staged files in {directory_path}")

# Gather all the *.lst entries in the directory
pattern = os.path.join(directory_path, "*.lst")
Expand Down Expand Up @@ -124,18 +125,28 @@ def kbmod_search(inputs=[], outputs=[], logging_file=None):
return outputs[0]


def workflow_runner(env=None):
with parsl.load(get_resource_config(env=env)) as dfk:
def workflow_runner(env: str = None, runtime_config: dict = None) -> None:
resource_config = get_resource_config(env=env)

if runtime_config is not None:
resource_config = apply_runtime_updates(resource_config, runtime_config)
app_configs = runtime_config.get("apps", {})

with parsl.load(resource_config) as dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))
logger = configure_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration defintion:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

# gather all the .lst files that are staged for processing
manifest_file = File(os.path.join(os.getcwd(), "manifest.txt"))
create_uri_manifest_future = create_uri_manifest(
inputs=[],
outputs=[manifest_file],
config=app_configs.get("create_uri_manifest", {}),
logging_file=logging_file,
)

Expand Down Expand Up @@ -201,6 +212,17 @@ def workflow_runner(env=None):
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

workflow_runner(env=args.env)
# if a runtime_config file was provided and exists, load the toml as a dict.
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)

0 comments on commit 2977f63

Please sign in to comment.