Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decouple metrics and runner #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions MOBO-Closures/ProjectUtils/panda_idds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ax.core.base_trial import BaseTrial, TrialStatus
from ax.core.data import Data
from ax.core.metric import Metric, MetricFetchResult, MetricFetchE
from ax.metrics.noisy_function import GenericNoisyFunctionMetric
from ax.core.runner import Runner
from ax.utils.common.result import Ok, Err

Expand All @@ -17,6 +18,7 @@
# @workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2', init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")
# @workflow_def(service='panda', local=True, cloud='US', queue='BNL_OSG_2', return_workflow=True, init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")
# @workflow_def(service='panda', source_dir=None, source_dir_parent_level=1, local=True, cloud='US', queue='FUNCX_TEST', return_workflow=True, init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")

@workflow_def(service='panda', source_dir=None, source_dir_parent_level=1, local=True, cloud='US', queue='BNL_OSG_2', return_workflow=True, init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")
def empty_workflow_func():
pass
Expand All @@ -32,7 +34,6 @@ def __init__(self, runner_funcs=None):
self.workflow.pre_run()

def run(self, trial: BaseTrial):
print(self.runner_funcs)
if not isinstance(trial, BaseTrial):
raise ValueError("This runner only handles `BaseTrial`.")

Expand All @@ -43,13 +44,12 @@ def run(self, trial: BaseTrial):

name_results = {}
self.transforms[trial.index] = {}
for name in self.runner_funcs:

for name, metric in trial.experiment.metrics.items():
# one work is one objective
# with multiple objectives, there will be multiple work objects

function = self.runner_funcs[name]['function']
pre_kwargs = self.runner_funcs[name]['pre_kwargs']
work = work_def(function, workflow=self.workflow, pre_kwargs=pre_kwargs, return_work=True, map_results=True)
function = metric._f
work = work_def(function, workflow=self.workflow, return_work=True, map_results=True)
w = work(multi_jobs_kwargs_list=params_list)
print("trial %s: submit a task for %s: %s" % (trial.index, name, w))
tf_id = w.submit()
Expand Down Expand Up @@ -109,8 +109,7 @@ def poll_trial_status(self, trials: Iterable[BaseTrial]):
return status_dict


class PanDAIDDSMetric(Metric): # Pulls data for trial from external system.
# def __init__(self, name: str, lower_is_better: Optional[bool] = None, properties: Optional[Dict[str, Any]] = None, function: optional[Any] = None) -> None:
class PanDAIDDSMetric(GenericNoisyFunctionMetric): # Pulls data for trial from external system.

def fetch_trial_data(self, trial: BaseTrial) -> MetricFetchResult:
print("fetch_trial_data")
Expand Down
29 changes: 11 additions & 18 deletions MOBO-Closures/wrapper_panda_idds.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

import wandb

from idds.iworkflow.workflow import workflow as workflow_def # workflow # noqa F401
from idds.iworkflow.work import work as work_def
# from idds.iworkflow.workflow import workflow as workflow_def # workflow # noqa F401
# from idds.iworkflow.work import work as work_def
from ProjectUtils.panda_idds_utils import PanDAIDDSRunner, PanDAIDDSMetric


Expand All @@ -38,9 +38,9 @@ def RunProblem(problem, x, kwargs):

# @workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2', init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")
# @workflow_def(service='panda', local=True, cloud='US', queue='BNL_OSG_2', return_workflow=True, init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")
@workflow_def(service='panda', local=True, cloud='US', queue='FUNCX_TEST', return_workflow=True, init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")
def empty_workflow_func():
pass
# @workflow_def(service='panda', local=True, cloud='US', queue='FUNCX_TEST', return_workflow=True, init_env="singularity exec --pwd $(pwd) -B $(pwd):$(pwd) /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/wguan/mlcontainer:py311_0.0.3")
# def empty_workflow_func():
# pass


def ftot(x, problem, tkwargs):
Expand Down Expand Up @@ -188,26 +188,19 @@ def f4(xdic):
param_names = [f"x{i}" for i in range(d)]

names = ["a", "b", "c", "d"]
# functions = [f1, f2, f3, f4]
functions_problem = [f1_problem, f2_problem, f3_problem, f4_problem]
functions = functions_problem
functions = [f1, f2, f3, f4]
# functions_problem = [f1_problem, f2_problem, f3_problem, f4_problem]
# functions = functions_problem
metrics = []

for name, function in zip(names[:M], functions_problem[:M]):
for name, function in zip(names[:M], functions[:M]):
metrics.append(
PanDAIDDSMetric(
name=name, lower_is_better=False
name=name, f=function, noise_sd=0.0, lower_is_better=False
)
)

runner_funcs = {}
for name, function in zip(names[:M], functions_problem[:M]):
# runner_funcs[name] = function
# runner_funcs[name] = work_def(function, workflow=workflow, pre_kwargs={'problem': problem, 'tkwargs': tkwargs}, return_work=True, map_results=True, no_wraps=True)
runner_funcs[name] = {'function': function, 'pre_kwargs': {'problem': problem, 'tkwargs': tkwargs}}

# print(runner_funcs)
runner = PanDAIDDSRunner(runner_funcs)
runner = PanDAIDDSRunner()

mo = MultiObjective(
objectives=[Objective(m) for m in metrics],
Expand Down