diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f988164..13b8332a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,15 +8,17 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added a log collection guide ([#579](https://github.com/opensearch-project/opensearch-py/pull/579)) - Added GHA release ([#614](https://github.com/opensearch-project/opensearch-py/pull/614)) ### Changed +- Updated the `get_policy` API in the index_management plugin to allow the policy_id argument as optional ([#633](https://github.com/opensearch-project/opensearch-py/pull/633)) ### Deprecated ### Removed - Removed unnecessary `# -*- coding: utf-8 -*-` headers from .py files ([#615](https://github.com/opensearch-project/opensearch-py/pull/615), [#617](https://github.com/opensearch-project/opensearch-py/pull/617)) ### Fixed - Fix KeyError when scroll return no hits ([#616](https://github.com/opensearch-project/opensearch-py/pull/616)) +- Fix reuse of `OpenSearch` using `Urllib3HttpConnection` and `AsyncOpenSearch` after calling `close` ([#639](https://github.com/opensearch-project/opensearch-py/pull/639)) ### Security ### Dependencies - Bumps `aiohttp` from >=3,<4 to >=3.9.0,<4 ([#634](https://github.com/opensearch-project/opensearch-py/pull/634)) -- Bumps `pytest-asyncio` from <=0.21.1 to <=0.23.2 +- Bumps `pytest-asyncio` from <=0.21.1 to <=0.23.3 ## [2.4.2] ### Added diff --git a/benchmarks/bench_async.py b/benchmarks/bench_async.py index 7e9d1b3d..3df4e0ed 100644 --- a/benchmarks/bench_async.py +++ b/benchmarks/bench_async.py @@ -17,6 +17,7 @@ async def index_records(client: Any, index_name: str, item_count: int) -> None: + """asynchronously bulk index item_count records into the index (index_name)""" await asyncio.gather( *[ client.index( @@ -34,6 +35,10 @@ async def index_records(client: Any, index_name: str, item_count: int) -> None: async def test_async(client_count: int = 1, item_count: int = 1) -> None: + """ + asynchronously index with item_count records and run client_count clients. This function can be used to + test balancing the number of items indexed with the number of documents. + """ host = "localhost" port = 9200 auth = ("admin", "admin") @@ -74,6 +79,7 @@ async def test_async(client_count: int = 1, item_count: int = 1) -> None: def test(item_count: int = 1, client_count: int = 1) -> None: + """sets up and executes the asynchronous tests""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(test_async(item_count, client_count)) @@ -84,26 +90,32 @@ def test(item_count: int = 1, client_count: int = 1) -> None: def test_1() -> None: + """run a test for one item and 32*ITEM_COUNT clients""" test(1, 32 * ITEM_COUNT) def test_2() -> None: + """run a test for two items and 16*ITEM_COUNT clients""" test(2, 16 * ITEM_COUNT) def test_4() -> None: + """run a test for two items and 8*ITEM_COUNT clients""" test(4, 8 * ITEM_COUNT) def test_8() -> None: + """run a test for four items and 4*ITEM_COUNT clients""" test(8, 4 * ITEM_COUNT) def test_16() -> None: + """run a test for 16 items and 2*ITEM_COUNT clients""" test(16, 2 * ITEM_COUNT) def test_32() -> None: + """run a test for 32 items and ITEM_COUNT clients""" test(32, ITEM_COUNT) diff --git a/benchmarks/bench_info_sync.py b/benchmarks/bench_info_sync.py index aba6d024..36e59814 100644 --- a/benchmarks/bench_info_sync.py +++ b/benchmarks/bench_info_sync.py @@ -21,6 +21,7 @@ def get_info(client: Any, request_count: int) -> float: + """get info from client""" tt: float = 0 for n in range(request_count): start = time.time() * 1000 @@ -31,6 +32,7 @@ def get_info(client: Any, request_count: int) -> float: def test(thread_count: int = 1, request_count: int = 1, client_count: int = 1) -> None: + """test to index with thread_count threads, item_count records and run client_count clients""" host = "localhost" port = 9200 auth = ("admin", "admin") @@ -79,22 +81,27 @@ def test(thread_count: int = 1, request_count: int = 1, client_count: int = 1) - def test_1() -> None: + """testing 1 threads""" test(1, 32 * REQUEST_COUNT, 1) def test_2() -> None: + """testing 2 threads""" test(2, 16 * REQUEST_COUNT, 2) def test_4() -> None: + """testing 4 threads""" test(4, 8 * REQUEST_COUNT, 3) def test_8() -> None: + """testing 8 threads""" test(8, 4 * REQUEST_COUNT, 8) def test_32() -> None: + """testing 32 threads""" test(32, REQUEST_COUNT, 32) diff --git a/benchmarks/bench_sync.py b/benchmarks/bench_sync.py index 0f3c5286..d86085d2 100644 --- a/benchmarks/bench_sync.py +++ b/benchmarks/bench_sync.py @@ -22,6 +22,7 @@ def index_records(client: Any, index_name: str, item_count: int) -> Any: + """bulk index item_count records into index_name""" tt = 0 for n in range(10): data: Any = [] @@ -48,6 +49,7 @@ def index_records(client: Any, index_name: str, item_count: int) -> Any: def test(thread_count: int = 1, item_count: int = 1, client_count: int = 1) -> None: + """test to index with thread_count threads, item_count records and run client_count clients""" host = "localhost" port = 9200 auth = ("admin", "admin") @@ -118,22 +120,27 @@ def test(thread_count: int = 1, item_count: int = 1, client_count: int = 1) -> N def test_1() -> None: + """testing 1 threads""" test(1, 32 * ITEM_COUNT, 1) def test_2() -> None: + """testing 2 threads""" test(2, 16 * ITEM_COUNT, 2) def test_4() -> None: + """testing 4 threads""" test(4, 8 * ITEM_COUNT, 3) def test_8() -> None: + """testing 8 threads""" test(8, 4 * ITEM_COUNT, 8) def test_32() -> None: + """testing 32 threads""" test(32, ITEM_COUNT, 32) diff --git a/dev-requirements.txt b/dev-requirements.txt index a256eae6..8a009370 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -21,5 +21,5 @@ twine # Requirements for testing [async] extra aiohttp -pytest-asyncio<=0.23.2 +pytest-asyncio<=0.23.3 unasync diff --git a/opensearchpy/_async/http_aiohttp.py b/opensearchpy/_async/http_aiohttp.py index b1baf148..c49fd574 100644 --- a/opensearchpy/_async/http_aiohttp.py +++ b/opensearchpy/_async/http_aiohttp.py @@ -361,6 +361,7 @@ async def close(self) -> Any: """ if self.session: await self.session.close() + self.session = None async def _create_aiohttp_session(self) -> Any: """Creates an aiohttp.ClientSession(). This is delayed until diff --git a/opensearchpy/_async/plugins/index_management.py b/opensearchpy/_async/plugins/index_management.py index cf15c4eb..c5084a37 100644 --- a/opensearchpy/_async/plugins/index_management.py +++ b/opensearchpy/_async/plugins/index_management.py @@ -56,15 +56,13 @@ async def add_policy( @query_params() async def get_policy( - self, policy: Any, params: Any = None, headers: Any = None + self, policy: Any = None, params: Any = None, headers: Any = None ) -> Any: """ - Gets the policy by `policy_id`. + Gets the policy by `policy_id`; returns all policies if no policy_id is provided. :arg policy: The name of the policy """ - if policy in SKIP_IN_PATH: - raise ValueError("Empty value passed for a required argument 'policy'.") return await self.transport.perform_request( "GET", diff --git a/opensearchpy/connection/http_async.py b/opensearchpy/connection/http_async.py index 468f3244..f5a4ec7c 100644 --- a/opensearchpy/connection/http_async.py +++ b/opensearchpy/connection/http_async.py @@ -277,6 +277,7 @@ async def close(self) -> Any: """ if self.session: await self.session.close() + self.session = None async def _create_aiohttp_session(self) -> Any: """Creates an aiohttp.ClientSession(). This is delayed until diff --git a/opensearchpy/connection/http_urllib3.py b/opensearchpy/connection/http_urllib3.py index 54f2a22a..ab9a1a78 100644 --- a/opensearchpy/connection/http_urllib3.py +++ b/opensearchpy/connection/http_urllib3.py @@ -214,9 +214,13 @@ def __init__( if pool_maxsize and isinstance(pool_maxsize, int): kw["maxsize"] = pool_maxsize - self.pool = pool_class( + self._urllib3_pool_factory = lambda: pool_class( self.hostname, port=self.port, timeout=self.timeout, **kw ) + self._create_urllib3_pool() + + def _create_urllib3_pool(self) -> None: + self.pool = self._urllib3_pool_factory() # type: ignore def perform_request( self, @@ -228,6 +232,10 @@ def perform_request( ignore: Collection[int] = (), headers: Optional[Mapping[str, str]] = None, ) -> Any: + if self.pool is None: + self._create_urllib3_pool() + assert self.pool is not None + url = self.url_prefix + url if params: url = "%s?%s" % (url, urlencode(params)) @@ -305,4 +313,6 @@ def close(self) -> None: """ Explicitly closes connection """ - self.pool.close() + if self.pool: + self.pool.close() + self.pool = None diff --git a/opensearchpy/plugins/index_management.py b/opensearchpy/plugins/index_management.py index 0683b006..2d3551ab 100644 --- a/opensearchpy/plugins/index_management.py +++ b/opensearchpy/plugins/index_management.py @@ -55,14 +55,14 @@ def add_policy( ) @query_params() - def get_policy(self, policy: Any, params: Any = None, headers: Any = None) -> Any: + def get_policy( + self, policy: Any = None, params: Any = None, headers: Any = None + ) -> Any: """ - Gets the policy by `policy_id`. + Gets the policy by `policy_id`; returns all policies if no policy_id is provided. :arg policy: The name of the policy """ - if policy in SKIP_IN_PATH: - raise ValueError("Empty value passed for a required argument 'policy'.") return self.transport.perform_request( "GET", diff --git a/samples/advanced_index_actions/advanced_index_actions_sample.py b/samples/advanced_index_actions/advanced_index_actions_sample.py index b06d82c3..b5af6be4 100644 --- a/samples/advanced_index_actions/advanced_index_actions_sample.py +++ b/samples/advanced_index_actions/advanced_index_actions_sample.py @@ -18,6 +18,10 @@ def main() -> None: + """ + demonstrates various functions to operate on the index (e.g. clear different levels of cache, refreshing the + index) + """ # Set up client = OpenSearch( hosts=["https://localhost:9200"], diff --git a/samples/aws/search_requests.py b/samples/aws/search_requests.py index 544285ac..743d3d96 100644 --- a/samples/aws/search_requests.py +++ b/samples/aws/search_requests.py @@ -20,6 +20,12 @@ def main() -> None: + """ + connects to a cluster specified in environment variables, creates an index, inserts documents, + searches the index, deletes the document, deletes the index. + the environment variables are "ENDPOINT" for the cluster endpoint, AWS_REGION for the region in which the cluster + is hosted, and SERVICE to indicate if this is an ES 7.10.2 compatible cluster + """ # verbose logging logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) diff --git a/samples/aws/search_urllib3.py b/samples/aws/search_urllib3.py index 5ac438d9..c7382d0d 100644 --- a/samples/aws/search_urllib3.py +++ b/samples/aws/search_urllib3.py @@ -20,10 +20,18 @@ def main() -> None: + """ + 1. connects to an OpenSearch cluster on AWS defined by environment variables (i.e. ENDPOINT - cluster endpoint like + my-test-domain.us-east-1.es.amazonaws.com; AWS_REGION like us-east-1, us-west-2; and SERVICE like es which + differentiates beteween serverless and the managed service. + 2. creates an index called "movies" and adds a single document + 3. queries for that document + 4. deletes the document + 5. deletes the index + """ # verbose logging logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) - # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com url = urlparse(environ["ENDPOINT"]) region = environ.get("AWS_REGION", "us-east-1") service = environ.get("SERVICE", "es") diff --git a/samples/bulk/bulk_array.py b/samples/bulk/bulk_array.py index cb6dc8b1..a7814ddb 100755 --- a/samples/bulk/bulk_array.py +++ b/samples/bulk/bulk_array.py @@ -17,6 +17,7 @@ def main() -> None: + """demonstrates how to bulk load data into an index""" # connect to an instance of OpenSearch host = os.getenv("HOST", default="localhost") diff --git a/samples/bulk/bulk_helpers.py b/samples/bulk/bulk_helpers.py index 7371d6b1..02150e25 100755 --- a/samples/bulk/bulk_helpers.py +++ b/samples/bulk/bulk_helpers.py @@ -17,6 +17,10 @@ def main() -> None: + """ + demonstrates how to bulk load data using opensearchpy.helpers including examples of serial, parallel, and streaming + bulk load + """ # connect to an instance of OpenSearch host = os.getenv("HOST", default="localhost") diff --git a/samples/bulk/bulk_ld.py b/samples/bulk/bulk_ld.py index 89e6f661..a6c3a585 100755 --- a/samples/bulk/bulk_ld.py +++ b/samples/bulk/bulk_ld.py @@ -17,6 +17,9 @@ def main() -> None: + """ + bulk index 100 items and then delete the index + """ # connect to an instance of OpenSearch host = os.getenv("HOST", default="localhost") diff --git a/samples/document_lifecycle/document_lifecycle_sample.py b/samples/document_lifecycle/document_lifecycle_sample.py index 53c7e718..b195262b 100644 --- a/samples/document_lifecycle/document_lifecycle_sample.py +++ b/samples/document_lifecycle/document_lifecycle_sample.py @@ -17,6 +17,9 @@ def main() -> None: + """ + provides samples for different ways to handle documents including indexing, searching, updating, and deleting + """ # Connect to OpenSearch client = OpenSearch( hosts=["https://localhost:9200"], diff --git a/samples/hello/hello.py b/samples/hello/hello.py index acecf890..39744752 100755 --- a/samples/hello/hello.py +++ b/samples/hello/hello.py @@ -16,6 +16,10 @@ def main() -> None: + """ + an example showing how to create an synchronous connection to OpenSearch, create an index, index a document + and search to return the document + """ host = "localhost" port = 9200 auth = ("admin", "admin") # For testing only. Don't store credentials in code. diff --git a/samples/hello/hello_async.py b/samples/hello/hello_async.py index c6a04e02..a3620dba 100755 --- a/samples/hello/hello_async.py +++ b/samples/hello/hello_async.py @@ -16,6 +16,10 @@ async def main() -> None: + """ + an example showing how to create an asynchronous connection to OpenSearch, create an index, index a document + and search to return the document + """ # connect to OpenSearch host = "localhost" port = 9200 diff --git a/samples/index_template/index_template_sample.py b/samples/index_template/index_template_sample.py index 25cfdddd..00978aba 100644 --- a/samples/index_template/index_template_sample.py +++ b/samples/index_template/index_template_sample.py @@ -12,6 +12,20 @@ def main() -> None: + """ + 1. connects to an OpenSearch instance running on localhost + 2. Create an index template named `books` with default settings and mappings for indices of + the `books-*` pattern. You can create an index template to define default settings and mappings for indices + of certain patterns. + 3. When creating an index that matches the `books-*` pattern, OpenSearch will automatically apply the template's + settings and mappings to the index. Create an index named books-nonfiction and verify that its settings and mappings + match those of the template + 4. If multiple index templates match the index's name, OpenSearch will apply the template with the highest + `priority`. In the example, two templates are created with different priorities. + 5. Composable index templates are a new type of index template that allow you to define multiple component templates + and compose them into a final template. The last part of the example before cleaning up creates a component + template named `books_mappings` with default mappings for indices of the `books-*` and `books-fiction-*` patterns. + """ # Create a client instance client = OpenSearch( hosts=["https://localhost:9200"], @@ -20,8 +34,7 @@ def main() -> None: http_auth=("admin", "admin"), ) - # You can create an index template to define default settings and mappings for indices of certain patterns. - # The following example creates an index template named `books` with default settings and mappings for indices of the `books-*` pattern: + # create an index template client.indices.put_index_template( name="books", body={ @@ -41,13 +54,10 @@ def main() -> None: }, ) - # Now, when you create an index that matches the `books-*` pattern, OpenSearch will automatically apply the template's settings and mappings to the index. - # Let's create an index named books-nonfiction and verify that its settings and mappings match those of the template: + # create the index which applies the index template settings matched by pattern client.indices.create(index="books-nonfiction") print(client.indices.get(index="books-nonfiction")) - # If multiple index templates match the index's name, OpenSearch will apply the template with the highest `priority`. - # The following example creates two index templates named `books-*` and `books-fiction-*` with different settings: client.indices.put_index_template( name="books", body={ @@ -74,8 +84,6 @@ def main() -> None: client.indices.create(index="books-fiction-romance") print(client.indices.get(index="books-fiction-romance")) - # Composable index templates are a new type of index template that allow you to define multiple component templates and compose them into a final template. - # The following example creates a component template named `books_mappings` with default mappings for indices of the `books-*` and `books-fiction-*` patterns: client.cluster.put_component_template( name="books_mappings", body={ @@ -92,6 +100,7 @@ def main() -> None: }, ) + # composable index templates client.indices.put_index_template( name="books", body={ diff --git a/samples/json/json_hello.py b/samples/json/json_hello.py index c0e537ec..9a8c213f 100755 --- a/samples/json/json_hello.py +++ b/samples/json/json_hello.py @@ -14,6 +14,9 @@ def main() -> None: + """ + demonstrates how to index a document using a dict + """ # connect to OpenSearch host = "localhost" diff --git a/samples/json/json_hello_async.py b/samples/json/json_hello_async.py index afe5065f..499def22 100755 --- a/samples/json/json_hello_async.py +++ b/samples/json/json_hello_async.py @@ -16,6 +16,10 @@ async def main() -> None: + """ + this sample uses asyncio and AsyncOpenSearch to asynchronously connect to local OpenSearch cluster, create an index, + index data, search the index, delete the document, delete the index + """ # connect to OpenSearch host = "localhost" port = 9200 diff --git a/samples/knn/knn_async_basics.py b/samples/knn/knn_async_basics.py index 273015c2..8847f924 100755 --- a/samples/knn/knn_async_basics.py +++ b/samples/knn/knn_async_basics.py @@ -18,6 +18,9 @@ async def main() -> None: + """ + asynchronously create, bulk index, and query kNN. then delete the index + """ # connect to an instance of OpenSearch host = os.getenv("HOST", default="localhost") port = int(os.getenv("PORT", 9200)) diff --git a/samples/knn/knn_basics.py b/samples/knn/knn_basics.py index 4ea49a21..b3cdfca4 100755 --- a/samples/knn/knn_basics.py +++ b/samples/knn/knn_basics.py @@ -17,6 +17,9 @@ def main() -> None: + """ + create, bulk index, and query kNN. then delete the index + """ # connect to an instance of OpenSearch host = os.getenv("HOST", default="localhost") diff --git a/samples/knn/knn_boolean_filter.py b/samples/knn/knn_boolean_filter.py index 156fcf86..40b5434b 100755 --- a/samples/knn/knn_boolean_filter.py +++ b/samples/knn/knn_boolean_filter.py @@ -17,6 +17,9 @@ def main() -> None: + """ + create, query, and delete a kNN index + """ # connect to an instance of OpenSearch host = os.getenv("HOST", default="localhost") diff --git a/samples/knn/knn_efficient_filter.py b/samples/knn/knn_efficient_filter.py index 7777173d..4c23a43e 100755 --- a/samples/knn/knn_efficient_filter.py +++ b/samples/knn/knn_efficient_filter.py @@ -16,6 +16,9 @@ def main() -> None: + """ + create a kNN index using Lucene kNN and query it using filters + """ # connect to an instance of OpenSearch host = os.getenv("HOST", default="localhost") diff --git a/samples/logging/log_collection_sample.py b/samples/logging/log_collection_sample.py index fbf25b60..84ff0194 100644 --- a/samples/logging/log_collection_sample.py +++ b/samples/logging/log_collection_sample.py @@ -23,6 +23,10 @@ def main() -> None: + """ + sample for custom logging; this shows how to create a console handler, connect to OpenSearch, define a custom + logger and log to an OpenSearch index + """ print("Collecting logs.") # Create a console handler @@ -47,15 +51,22 @@ def main() -> None: # Add console handler to the logger os_logger.addHandler(console_handler) - # Define a custom handler that logs to OpenSearch class OpenSearchHandler(logging.Handler): + """ + define a custom handler that logs to opensearch + """ + # Initializer / Instance attributes def __init__(self, opensearch_client: Any) -> None: super().__init__() self.os_client = opensearch_client - # Build index name (e.g., "logs-YYYY-MM-DD") def _build_index_name(self) -> str: + """ + Build index name (e.g., "logs-YYYY-MM-DD") + :rtype: bool + :return: a str with date formatted as 'logs-YYYY-MM-DD' + """ return f"logs-{datetime.date(datetime.now())}" # Emit logs to the OpenSearch cluster diff --git a/samples/security/roles.py b/samples/security/roles.py index 7628a9f4..53508634 100644 --- a/samples/security/roles.py +++ b/samples/security/roles.py @@ -9,13 +9,13 @@ # Modifications Copyright OpenSearch Contributors. See # GitHub history for details. - -# A basic OpenSearch sample that create and manage roles. - from opensearchpy import OpenSearch def main() -> None: + """ + A basic OpenSearch sample that create and manage roles. + """ # connect to OpenSearch host = "localhost" diff --git a/samples/security/users.py b/samples/security/users.py index 7b89a37f..54f4d36e 100644 --- a/samples/security/users.py +++ b/samples/security/users.py @@ -9,13 +9,13 @@ # Modifications Copyright OpenSearch Contributors. See # GitHub history for details. - -# A basic OpenSearch sample that create and manage users. - from opensearchpy import OpenSearch def main() -> None: + """ + A basic OpenSearch sample that create and manage users. + """ # connect to OpenSearch host = "localhost" diff --git a/setup.py b/setup.py index e40526da..984e1dea 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ if package == MODULE_DIR or package.startswith(MODULE_DIR + ".") ] install_requires = [ - "urllib3>=1.26.18", + "urllib3>=1.26.18, <2", "requests>=2.4.0, <3.0.0", "six", "python-dateutil", diff --git a/test_opensearchpy/test_async/test_server/test_clients.py b/test_opensearchpy/test_async/test_server/test_clients.py index cee6bc7b..521f0600 100644 --- a/test_opensearchpy/test_async/test_server/test_clients.py +++ b/test_opensearchpy/test_async/test_server/test_clients.py @@ -67,3 +67,15 @@ async def test_aiohttp_connection_works_without_yarl( resp = await async_client.info(pretty=True) assert isinstance(resp, dict) + + +class TestClose: + async def test_close_doesnt_break_client(self, async_client: Any) -> None: + await async_client.cluster.health() + await async_client.close() + await async_client.cluster.health() + + async def test_with_doesnt_break_client(self, async_client: Any) -> None: + for _ in range(2): + async with async_client as client: + await client.cluster.health() diff --git a/test_opensearchpy/test_server/test_clients.py b/test_opensearchpy/test_server/test_clients.py index e945b69a..a77b0f37 100644 --- a/test_opensearchpy/test_server/test_clients.py +++ b/test_opensearchpy/test_server/test_clients.py @@ -49,3 +49,15 @@ def test_bulk_works_with_bytestring_body(self) -> None: self.assertFalse(response["errors"]) self.assertEqual(1, len(response["items"])) + + +class TestClose(OpenSearchTestCase): + def test_close_doesnt_break_client(self) -> None: + self.client.cluster.health() + self.client.close() + self.client.cluster.health() + + def test_with_doesnt_break_client(self) -> None: + for _ in range(2): + with self.client as client: + client.cluster.health()