diff --git a/src/test_workflow/benchmark_test/benchmark_create_cluster.py b/src/test_workflow/benchmark_test/benchmark_create_cluster.py index 4b44080695..b36b974991 100644 --- a/src/test_workflow/benchmark_test/benchmark_create_cluster.py +++ b/src/test_workflow/benchmark_test/benchmark_create_cluster.py @@ -150,5 +150,8 @@ def create(cls, *args: Any) -> Generator[Any, None, None]: """ cluster = cls(*args) - cluster.start() - yield cluster + try: + cluster.start() + yield cluster + finally: + cluster.terminate() diff --git a/src/test_workflow/benchmark_test/benchmark_test_cluster.py b/src/test_workflow/benchmark_test/benchmark_test_cluster.py index e725249a1d..42f71a9779 100644 --- a/src/test_workflow/benchmark_test/benchmark_test_cluster.py +++ b/src/test_workflow/benchmark_test/benchmark_test_cluster.py @@ -33,12 +33,14 @@ def __init__( def start(self) -> None: command = f"curl -X GET http://{self.args.cluster_endpoint}" if self.args.insecure else f"curl -X GET https://{self.args.cluster_endpoint} -u 'admin:{get_password('2.12.0')}' --insecure" - result = subprocess.run(command, shell=True, capture_output=True) + try: + result = subprocess.run(command, shell=True, capture_output=True, timeout=5) + except subprocess.TimeoutExpired: + raise TimeoutError(f"Time out! Couldn't connect to the cluster {self.args.cluster_endpoint}") + if result.stdout: res_dict = json.loads(result.stdout) self.args.distribution_version = res_dict['version']['number'] - logging.info(self.args.distribution_version) - logging.info(result.stdout) self.wait_for_processing() self.cluster_endpoint_with_port = "".join([self.args.cluster_endpoint, ":", str(self.port)]) diff --git a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py index 5e67be2543..81295d0977 100644 --- a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py +++ b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_cluster.py @@ -5,6 +5,7 @@ # this file be licensed under the Apache-2.0 license or a # compatible open source license. +import subprocess import unittest from unittest.mock import MagicMock, Mock, patch @@ -59,15 +60,27 @@ def test_endpoint_with_security(self, mock_http_auth: Mock, mock_requests_get: M "cluster_name" : "opensearch-cluster.amazon.com", "version": { "distribution": "opensearch", - "number": “2.9.0”, + "number": "2.9.0", "build_type": "tar", "minimum_index_compatibility_version": "2.0.0" } } ''' mock_subprocess_run.return_value = mock_result - with patch("json.loads", ): + with patch("json.loads"): self.benchmark_test_cluster.start() mock_requests_get.assert_called_with(url=f"https://{self.args.cluster_endpoint}/_cluster/health", auth=mock_http_auth.return_value, verify=False) self.assertEqual(self.benchmark_test_cluster.endpoint_with_port, 'opensearch-cluster.amazon.com:443') self.assertEqual(self.benchmark_test_cluster.port, 443) + + + def test_endpoint_with_timeout_error(self) -> None: + self.args.insecure = True + self.args.cluster_endpoint = "opensearch-cluster.amazon.com" + with patch('subprocess.run') as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired("Command", 5) + + with self.assertRaises(TimeoutError) as context: + self.benchmark_test_cluster.start() + + self.assertIn("Time out! Couldn't connect to the cluster", str(context.exception)) diff --git a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py index 1bb6ce66d6..524650543e 100644 --- a/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py +++ b/tests/tests_test_workflow/test_benchmark_workflow/benchmark_test/test_benchmark_test_runner_opensearch.py @@ -7,6 +7,7 @@ import os import tempfile import unittest +from typing import Optional from typing import Any from unittest.mock import Mock, patch, MagicMock @@ -81,6 +82,7 @@ def test_run_with_dist_url_and_version(self, mock_suite: Mock, mock_cluster: Moc def test_run_with_cluster_endpoint(self, mock_retry_call: Mock, mock_suite: Mock, mock_benchmark_test_cluster: Mock) -> None: args = MagicMock(cluster_endpoint=True) + mock_cluster = MagicMock() mock_benchmark_test_cluster.return_value = mock_cluster @@ -89,3 +91,39 @@ def test_run_with_cluster_endpoint(self, mock_retry_call: Mock, mock_suite: Mock self.assertEqual(mock_suite.call_count, 1) self.assertEqual(mock_benchmark_test_cluster.call_count, 1) mock_retry_call.assert_called_once_with(mock_suite.return_value.execute, tries=3, delay=60, backoff=2) + + @patch('test_workflow.benchmark_test.benchmark_test_cluster.BenchmarkTestCluster.wait_for_processing') + @patch("test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestSuite") + @patch('test_workflow.benchmark_test.benchmark_test_runner_opensearch.retry_call') + @patch("subprocess.run") + @patch("requests.get") + def test_run_with_cluster_endpoint_with_arguments(self, mock_requests_get: Mock, mock_subprocess_run: Mock, mock_retry_call: Mock, mock_suite: Mock, mock_wait_for_processing: Optional[Mock]) -> None: + args = MagicMock(cluster_endpoint=True) + mock_wait_for_processing.return_value = None + mock_result = MagicMock() + mock_result.stdout = ''' + { + "cluster_name" : "opensearch-cluster.amazon.com", + "version": { + "distribution": "opensearch", + "number": "2.9.0", + "build_type": "tar", + "minimum_index_compatibility_version": "2.0.0" + } + } + ''' + mock_subprocess_run.return_value = mock_result + + instance = BenchmarkTestRunnerOpenSearch(args, None) + with patch('test_workflow.benchmark_test.benchmark_test_runner_opensearch.BenchmarkTestCluster') as MockBenchmarkTestCluster: + mock_cluster_instance = MockBenchmarkTestCluster.return_value + mock_cluster_instance.endpoint_with_port = "opensearch-cluster.amazon.com" + mock_cluster_instance.get_distribution_version.return_value = "2.9.0" + mock_cluster_instance.fetch_password.return_value = "admin" + + with patch("json.loads"): + instance.run_tests() + self.assertEqual(mock_suite.call_count, 1) + self.assertEqual(MockBenchmarkTestCluster.call_count, 1) + mock_retry_call.assert_called_once_with(mock_suite.return_value.execute, tries=3, delay=60, backoff=2) +