Skip to content

Commit

Permalink
Cleanup, squash, refactor later.
Browse files Browse the repository at this point in the history
  • Loading branch information
DinoBektesevic committed Oct 3, 2024
1 parent 86e36af commit 8494b0f
Show file tree
Hide file tree
Showing 4 changed files with 809 additions and 80 deletions.
76 changes: 44 additions & 32 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,56 +24,75 @@ def klone_resource_config():
retries=1,
executors=[
HighThroughputExecutor(
label="small_cpu",
max_workers=1,
label="ckpt_96gb_8cpus",
max_workers=1, # Do we mean max_workers_per_node here?
provider=SlurmProvider(
partition="ckpt-all",
account="astro",
partition="gpu-a40", # ckpt-all
account="escience", # astro
min_blocks=0,
max_blocks=4,
max_blocks=5,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=1, # perhaps should be 8???
mem_per_node=256, # In GB
mem_per_node=12, # 96 GB
cores_per_node=8,
exclusive=False,
walltime=walltimes["compute_bigmem"],
walltime=walltimes["sharded_reproject"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
),
HighThroughputExecutor(
label="large_mem",
max_workers=1,
label="astro_2gb_2cpus",
max_workers=1, # Do we mean max_workers_per_node here?
provider=SlurmProvider(
partition="ckpt-all",
account="astro",
partition="gpu-a40", # ckpt-all
account="escience", # astro
min_blocks=0,
max_blocks=2,
max_blocks=5,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=32,
mem_per_node=256,
mem_per_node=4,
cores_per_node=2,
exclusive=False,
walltime=walltimes["large_mem"],
walltime=walltimes["sharded_reproject"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
),
HighThroughputExecutor(
label="sharded_reproject",
label="esci_2gb_2cpus",
max_workers=1, # Do we mean max_workers_per_node here?
provider=SlurmProvider(
partition="ckpt-all",
account="astro",
partition="gpu-a40", # ckpt-all
account="escience", # astro
min_blocks=0,
max_blocks=2,
max_blocks=5,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=8,
mem_per_node=100,
mem_per_node=4,
cores_per_node=2,
exclusive=False,
walltime=walltimes["sharded_reproject"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
),
HighThroughputExecutor(
label="ckpt_2gb_2cpus",
max_workers=1, # Do we mean max_workers_per_node here?
provider=SlurmProvider(
partition="gpu-a40", # ckpt-all
account="escience", # astro
min_blocks=0,
max_blocks=5,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
mem_per_node=4,
cores_per_node=2,
exclusive=False,
walltime=walltimes["sharded_reproject"],
# Command to run before starting worker - i.e. conda activate <special_env>
Expand All @@ -87,25 +106,18 @@ def klone_resource_config():
partition="gpu-a40",
account="escience",
min_blocks=0,
max_blocks=2,
max_blocks=4,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=1, # perhaps should be 8???
mem_per_node=64, # In GB
cores_per_node=2, # perhaps should be 8???
mem_per_node=12, # 64 In GB
exclusive=False,
walltime=walltimes["gpu_max"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
scheduler_options="#SBATCH --gpus=1",
),
),
HighThroughputExecutor(
label="local_thread",
provider=LocalProvider(
init_blocks=0,
max_blocks=1,
),
),
],
)
129 changes: 129 additions & 0 deletions src/kbmod_wf/single_chip_step2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
logging.basicConfig(level=logging.INFO)

import argparse
import os
import glob

import toml
import parsl
from parsl import python_app, File
import parsl.executors
import time

from kbmod_wf.utilities import (
apply_runtime_updates,
get_resource_config,
get_executors,
get_configured_logger,
)


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "gpu"]),
ignore_for_cache=["logging_file"],
)
def step2(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger
logger = get_configured_logger("task.step2", logging_file.filepath)

import json

from kbmod.work_unit import WorkUnit
from kbmod.run_search import SearchRunner

with ErrorLogger(logger):
wu = WorkUnit.from_fits(inputs[0])
res = SearchRunner().run_search_from_work_unit(wu)

# a WCS in the results table would be very helpful
# so add it in.
header = wu.wcs.to_header(relax=True)
h, w = wu.wcs.pixel_shape
header["NAXIS1"], header["NAXIS2"] = h, w
res.table.meta["wcs"] = json.dumps(dict(header))

# write the results to a file
res.write_table(outputs[0].filepath)

return outputs


def workflow_runner(env=None, runtime_config={}):
"""This function will load and configure Parsl, and run the workflow.
Parameters
----------
env : str, optional
Environment string used to define which resource configuration to use,
by default None
runtime_config : dict, optional
Dictionary of assorted runtime configuration parameters, by default {}
"""
resource_config = get_resource_config(env=env)
resource_config = apply_runtime_updates(resource_config, runtime_config)
app_configs = runtime_config.get("apps", {})

dfk = parsl.load(resource_config)
logger = get_configured_logger("workflow.workflow_runner")

if dfk:
if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

#directory_path = runtime_config.get("staging_directory", "resampled_wus")
directory_path = "resampled_wus"
file_pattern = "*.wu"
pattern = os.path.join(directory_path, file_pattern)
entries = glob.glob(pattern)
logger.info(f"Found {len(entries)} files in {directory_path}")

# run kbmod search on each reprojected WorkUnit
search_futures = []
for workunit in entries:
wuname = os.path.basename(workunit)
wuname = wuname.split(".")[0]
open(f"logs/{wuname}.search.log", "w").close()
logging_file = File(f"logs/{wuname}.search.log")
search_futures.append(
step2(
inputs=[workunit,],
outputs=[File(f"results/{wuname}.results.ecsv")],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)

[f.result() for f in search_futures]
logger.info("Workflow complete")

parsl.clear()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
type=str,
choices=["dev", "klone"],
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)
Loading

0 comments on commit 8494b0f

Please sign in to comment.