Skip to content

Commit

Permalink
Add functionality to add experiment and execute it immediately (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
tornede authored Jun 3, 2024
1 parent 79b7520 commit 0cfb381
Show file tree
Hide file tree
Showing 14 changed files with 793 additions and 500 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ Feature

- Added documentation about how to execute PyExperimenter on distributed machines.
- Improved the usage and documentation of ssh tunnel to be more flexible and user friendly.
- Add add_experiment_and_execute method to PyExperimenter to add and execute an experiment in one step.

Fix
---




v1.4.1 (11.03.2024)
===================

Expand Down
1,042 changes: 583 additions & 459 deletions docs/source/examples/example_general_usage.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/source/usage/database_credential_file.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Below is an example of a database credential file, that connects to a server wit
server: example.mysqlserver.com
However, for security reasons, databases might only be accessible from a specific IP address. In these cases, one can use an ssh jumphost. This means that ``PyExperimenter`` will first connect to the ssh server
that has access to the database and then connect to the database server from there. This is done by adding an additional ``Ssh`` section to the database credential file, and can be activated either by a ``PyExperimenter`` keyword argument or in the :ref:`experimenter configuration file <_experiment_configuration_file>`.
that has access to the database and then connect to the database server from there. This is done by adding an additional ``Ssh`` section to the database credential file, and can be activated either by a ``PyExperimenter`` keyword argument or in the :ref:`experimenter configuration file <experiment_configuration_file>`.
The following example shows how to connect to a database server using an SSH server with the address ``ssh_hostname`` and the port ``optional_ssh_port``.

.. code-block:: yaml
Expand Down
34 changes: 34 additions & 0 deletions docs/source/usage/distributed_execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,37 @@ Multiple ``experiment executer`` processes execute the experiments in parallel o
from py_experimenter.experimenter import PyExperimenter
experimenter.execute(experiment_function, max_experiments=1)
Add Experiment and Execute
--------------------------

When executing jobs on clusters one might want to use `hydra combined with submitit <hydra_submitit_>`_ or a similar software that configures different jobs. If so it makes sense to create the database initially

.. code-block:: python
...
experimenter = PyExperimenter(
experiment_configuration_file_path = "path/to/file",
database_credential_file_path = "path/to/file"
)
experimenter.create_table()
and then add the configured experiments experiments in the worker job, followed by an immediate execution.

.. code-block:: python
def _experiment_function(keyfields: dict, result_processor: ResultProcessor, custom_fields: dict):
...
...
@hydra.main(config_path="config", config_name="hydra_configname", version_base="1.1")
def experiment_wrapepr(config: Configuration):
...
experimenter = PyExperimenter(
experiment_configuration_file_path = "some/value/from/config",
database_credential_file_path = "path/to/file"
)
experimenter.add_experiment_and_execute(keyfield_values_from_config, _experiment_function)
.. _hydra_submitit: https://hydra.cc/docs/plugins/submitit_launcher/
23 changes: 22 additions & 1 deletion docs/source/usage/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ An experiment can be executed easily with the following call:
- ``max_experiments`` determines how many experiments will be executed by this ``PyExperimenter``. If set to ``-1``, it will execute experiments in a sequential fashion until no more open experiments are available.
- ``random_order`` determines if the experiments will be executed in a random order. By default, the parameter is set to ``False``, meaning that experiments will be executed ordered by their ``id``.

.. _add_experiment_and_execute:

--------------------------
Add Experiment and Execute
--------------------------

Instead of filling the database table with rows and then executing the experiments, it is also possible to add an experiment and execute it directly. This can be done with the following call:

.. code-block:: python
experimenter.add_experiment_and_execute(
keyfields = {'dataset': 'new_data', 'cross_validation_splits': 4, 'seed': 42, 'kernel': 'poly'},
experiment_function = run_experiment
)
This function may be useful in case of dependencies, where the result of one experiment is needed to configure the next one, or if the experiments are supposed to be configured with software such as `Hydra <hydra_>`_.


.. _reset_experiments:

-----------------
Expand Down Expand Up @@ -214,4 +232,7 @@ If an SSH tunnel was opened during the creation of the ``PyExperimenter``, it ha
.. code-block:: python
experimenter.execute(...)
experimenter.close_ssh_tunnel()
experimenter.close_ssh_tunnel()
.. _hydra: https://hydra.cc/
48 changes: 42 additions & 6 deletions py_experimenter/database_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ def fill_table(self, combinations) -> None:
if self._check_combination_in_existing_rows(combination, existing_rows):
rows_skipped += 1
continue
combination["status"] = ExperimentStatus.CREATED.value
combination["creation_date"] = time
combination = self._add_metadata(combination, time)
rows.append(combination)

if rows:
Expand All @@ -183,6 +182,44 @@ def fill_table(self, combinations) -> None:
else:
self.logger.info(f"No rows to add. All the {len(combinations)} experiments already exist.")

def add_experiment(self, combination: Dict[str, str]) -> None:
existing_rows = self._get_existing_rows(list(self.database_configuration.keyfields.keys()))
if self._check_combination_in_existing_rows(combination, existing_rows):
self.logger.info("Experiment already exists in database. Skipping.")
return

connection = self.connect()
try:
cursor = self.cursor(connection)
combination = self._add_metadata(combination, utils.get_timestamp_representation(), ExperimentStatus.RUNNING.value)
insert_query = self._get_insert_query(self.database_configuration.table_name, list(combination.keys()))
self.execute(cursor, insert_query, list(combination.values()))
cursor.execute(f"SELECT {self._last_insert_id_string()};")
experiment_id = cursor.fetchone()[0]
self.commit(connection)
except Exception as e:
raise DatabaseConnectionError(f"error \n{e}\n raised when adding experiment to database.")
finally:
self.close_connection(connection)
return experiment_id

def _get_insert_query(self, table_name: str, columns: List[str]) -> str:
return f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join([self._prepared_statement_placeholder] * len(columns))})"

@abc.abstractmethod
def _last_insert_id_string(self) -> str:
pass

def _add_metadata(
self,
combination: Dict[str, Any],
time: str,
status: ExperimentStatus = ExperimentStatus.CREATED.value,
) -> Dict[str, Any]:
combination["creation_date"] = time
combination["status"] = status
return combination

def _check_combination_in_existing_rows(self, combination, existing_rows) -> bool:
if combination in existing_rows:
return True
Expand Down Expand Up @@ -239,16 +276,15 @@ def _get_pull_experiment_query(self, order_by: str):
def _write_to_database(self, combinations: List[Dict[str, str]]) -> None:
columns = list(combinations[0].keys())
values = [list(combination.values()) for combination in combinations]
prepared_statement_palcehodler = ','.join([f"({', '.join([self._prepared_statement_placeholder] * len(columns))})"] * len(combinations))
prepared_statement_palcehodler = ",".join([f"({', '.join([self._prepared_statement_placeholder] * len(columns))})"] * len(combinations))

stmt = f"INSERT INTO {self.database_configuration.table_name} ({','.join(columns)}) VALUES {prepared_statement_palcehodler}"
values = reduce(concat, values)
values = reduce(concat, values)
connection = self.connect()
cursor = self.cursor(connection)
self.execute(cursor, stmt, values)
self.commit(connection)
self.close_connection(connection)


def pull_paused_experiment(self, experiment_id: int) -> Dict[str, Any]:
connnection = self.connect()
Expand Down
3 changes: 3 additions & 0 deletions py_experimenter/database_connector_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def _table_exists(self, cursor) -> bool:
table_names = self.fetchall(cursor)
return self.database_configuration.table_name in [x[0] for x in table_names]

def _last_insert_id_string(self) -> str:
return "last_insert_rowid()"

@staticmethod
def random_order_string():
return "RANDOM()"
Expand Down
3 changes: 3 additions & 0 deletions py_experimenter/database_connector_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def _pull_open_experiment(self, random_order) -> Tuple[int, List, List]:

return experiment_id, description, values

def _last_insert_id_string(self) -> str:
return "LAST_INSERT_ID()"

def _get_pull_experiment_query(self, order_by: str):
return super()._get_pull_experiment_query(order_by) + " FOR UPDATE;"

Expand Down
40 changes: 38 additions & 2 deletions py_experimenter/experimenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def fill_table_from_combination(self, fixed_parameter_combinations: List[dict] =
>>> ]
:param fixed_parameter_combinations: List of predefined parameter combinations (each of type dict).
Defaults to None.
Defaults to None.
:type fixed_parameter_combinations: List[dict], optional
:param parameters: Dictionary of parameters and their lists of possible values. Defaults to None.
:type parameters: dict, optional
Expand Down Expand Up @@ -222,6 +222,42 @@ def fill_table_with_rows(self, rows: List[dict]) -> None:
self.db_connector.create_table_if_not_existing()
self.db_connector.fill_table(rows)

def create_table(self, create_database: bool = False) -> None:
"""
Creates Table in the database if it does not exist. If the table already exists, nothing is done.
If the preexisting table has a different structure than the one defined in the experiment configuration file,
TableHasWrongStructureError is raised.
:param create_database: If True, the database is created if it does not exist. Defaults to False.
:type create_database: bool
:raises DatabaseConnectionError: If an error occurred during the connection to the database.
:raises TableHasWrongStructureError: If the table has a different structure than the one defined in the
experiment configuration file.
"""
if create_database:
self.db_connector._create_database_if_not_existing()
self.logger.info(f"Database {self.config.database_configuration.database_name} created.")
self.db_connector.create_table_if_not_existing()
self.logger.info(f"Table {self.config.database_configuration.table_name} created.")

def add_experiment_and_execute(self, keyfield_values: Dict, experiment_function: Callable[[Dict, Dict, ResultProcessor], Optional[ExperimentStatus]]) -> None:
"""
Add one new experiment to the database table with status RUNNING and execute it.
The given `keyfield_values` are added to the database table. The status of the experiment is set to `Running`.
Then _execute_experiment is called with the given `experiment_function` and the `keyfield_values`, to immediately start
execution.
:param keyfield_values: The keyfield values of the experiment to be executed.
:type keyfield_values: Dict
:param experiment_function: The function that should be executed with the different parametrizations.
:type experiment_function: Callable[[Dict, Dict, ResultProcessor], None]
"""
experiment_id = self.db_connector.add_experiment(keyfield_values)
self.logger.info(f"Experiment with id {experiment_id} successfully added to database for immediate execution.")
self._execute_experiment(experiment_id, keyfield_values, experiment_function)
self.logger.info(f"Experiment with id {experiment_id} successfully executed.")

def execute(
self,
experiment_function: Callable[[Dict, Dict, ResultProcessor], Optional[ExperimentStatus]],
Expand Down Expand Up @@ -422,7 +458,7 @@ def reset_experiments(self, *states: Tuple["str"]) -> None:
table again.
:param states: The status of experiments that should be reset. Either `created`, `running`, `error`, `done`, or `all`.
Note that `states` is a variable-length argument, so multiple states can be given as a tuple.
Note that `states` is a variable-length argument, so multiple states can be given as a tuple.
:type status: Tuple[str]
"""
if not states:
Expand Down
14 changes: 8 additions & 6 deletions test/test_codecarbon/test_codecarbon_core_functions_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from py_experimenter.database_connector import DatabaseConnector
from py_experimenter.database_connector_mysql import DatabaseConnectorMYSQL
from py_experimenter.experimenter import PyExperimenter


Expand All @@ -15,17 +16,18 @@ def experimenter_mysql():
os.mkdir("config")

configuration_path = os.path.join("test", "test_codecarbon", "configs", "test_config_mysql.yml")
with patch.object(DatabaseConnectorMYSQL, "_test_connection", return_value=None):
experimenter = PyExperimenter(experiment_configuration_file_path=configuration_path, use_ssh_tunnel=False)

experimenter = PyExperimenter(experiment_configuration_file_path=configuration_path, use_ssh_tunnel=False)
yield experimenter

experimenter.delete_table()


def test_delete_table_mysql(experimenter_mysql):
with patch.object(DatabaseConnector, "connect", return_value=None), patch.object(DatabaseConnector, "cursor", return_value=None), patch.object(
DatabaseConnector, "commit", return_value=None
):
with patch.object(DatabaseConnectorMYSQL, "connect", return_value=None), patch.object(
DatabaseConnectorMYSQL, "close_connection", return_value=None
), patch.object(DatabaseConnector, "cursor", return_value=None), patch.object(DatabaseConnector, "commit", return_value=None):
with patch.object(DatabaseConnector, "execute", return_value=None) as mock_execute:
experimenter_mysql.delete_table()

Expand All @@ -38,9 +40,9 @@ def test_delete_table_mysql(experimenter_mysql):


def test_get_table_mysql(experimenter_mysql):
with patch.object(DatabaseConnector, "connect", return_value=None), patch.object(
with patch.object(DatabaseConnectorMYSQL, "connect", return_value=None), patch.object(
pandas, "read_sql", return_value=pandas.DataFrame()
), patch.object(DatabaseConnector, "close_connection", return_value=None) as mock_close:
), patch.object(DatabaseConnectorMYSQL, "close_connection", return_value=None) as mock_close:
df = experimenter_mysql.get_codecarbon_table()

assert df.empty is True
3 changes: 2 additions & 1 deletion test/test_codecarbon/test_integration_mysql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import random
import os
import random

import numpy as np
import pytest

Expand Down
9 changes: 2 additions & 7 deletions test/test_database_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
from mock import patch
from omegaconf import OmegaConf

from py_experimenter import (
database_connector,
database_connector_lite,
database_connector_mysql,
utils,
)
from py_experimenter import database_connector, database_connector_lite, database_connector_mysql, utils
from py_experimenter.config import DatabaseCfg
from py_experimenter.database_connector import DatabaseConnector
from py_experimenter.database_connector_lite import DatabaseConnectorLITE
Expand Down Expand Up @@ -97,7 +92,7 @@ def test_create_table_if_not_existing(
(
os.path.join("test", "test_config_files", "load_config_test_file", "mysql_test_file.yml"),
[{"value": 1, "exponent": 3}, {"value": 1, "exponent": 4}, {"value": 2, "exponent": 3}, {"value": 2, "exponent": 4}],
["value", "exponent", "status", "creation_date"],
["value", "exponent", "creation_date", "status"],
[
{"value": 1, "exponent": 3, "status": "created"},
{"value": 1, "exponent": 4, "status": "created"},
Expand Down
Loading

0 comments on commit 0cfb381

Please sign in to comment.