From 47240a32773cc81cbd6ed63a5f3f8e17e8bedb89 Mon Sep 17 00:00:00 2001 From: Tom Harnasz Date: Sun, 11 Apr 2021 18:33:47 +0100 Subject: [PATCH 1/4] DB API - Added support for dynamic parameters --- README.md | 43 ++++++++ pydruid/db/api.py | 67 ++++++++++++- tests/db/test_cursor.py | 216 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 322 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index ba26b47c..07c4f14e 100644 --- a/README.md +++ b/README.md @@ -213,6 +213,49 @@ for row in curs: print(row) ``` +## Dynamic Parameters + +Druid 0.18.0 introduced support for [Dynamic Parameters](https://druid.apache.org/docs/latest/querying/sql.html#dynamic-parameters) where parameters are bound to `?` placeholders at execution time. This can be set using `dynamic_parameters`. on `connect`. + +Parameters support the types below, additionally allowing for tuples and lists for convenience with each value coerced to the appropriate Druid type. + +``` +| Instance Type | Druid Type | +|---------------|------------| +| int | INTEGER | +| float | FLOAT | +| str | VARCHAR | +| bool | BOOLEAN | +``` + +Example: + +```python +from pydruid.db import connect + +conn = connect(host='localhost', port=8082, path='/druid/v2/sql/', scheme='http', dynamic_parameters=True) +curs = conn.cursor() +parameters = { + "start_dt": "2015-09-12 00:00:00", + "channels": ("#en.wikipedia", "#es.wikipedia"), + "added_gt": 10, +} +curs.execute(""" + SELECT + channel, + page, + SUM(added) + FROM wikipedia + WHERE + __time >= TIMESTAMP %(start_dt)s + AND + channel IN (%(channels)s) + GROUP BY channel, page + ORDER BY SUM(added) DESC + HAVING SUM(added) >= %(added_gt)s +""", parameters) +``` + # SQLAlchemy ```python diff --git a/pydruid/db/api.py b/pydruid/db/api.py index 86b64e5e..ee78b3a4 100644 --- a/pydruid/db/api.py +++ b/pydruid/db/api.py @@ -1,5 +1,6 @@ import itertools import json +import re from collections import namedtuple, OrderedDict from urllib import parse @@ -26,6 +27,7 @@ def connect( ssl_verify_cert=True, ssl_client_cert=None, proxies=None, + dynamic_parameters=False, ): # noqa: E125 """ Constructor for creating a connection to the database. @@ -48,6 +50,7 @@ def connect( ssl_verify_cert, ssl_client_cert, proxies, + dynamic_parameters, ) @@ -130,6 +133,7 @@ def __init__( ssl_verify_cert=True, ssl_client_cert=None, proxies=None, + dynamic_parameters=False, ): netloc = "{host}:{port}".format(host=host, port=port) self.url = parse.urlunparse((scheme, netloc, path, None, None, None)) @@ -142,6 +146,7 @@ def __init__( self.ssl_verify_cert = ssl_verify_cert self.ssl_client_cert = ssl_client_cert self.proxies = proxies + self.dynamic_parameters = dynamic_parameters @check_closed def close(self): @@ -175,6 +180,7 @@ def cursor(self): self.ssl_verify_cert, self.ssl_client_cert, self.proxies, + self.dynamic_parameters, ) self.cursors.append(cursor) @@ -206,6 +212,7 @@ def __init__( ssl_verify_cert=True, proxies=None, ssl_client_cert=None, + dynamic_parameters=False, ): self.url = url self.context = context or {} @@ -215,6 +222,7 @@ def __init__( self.ssl_verify_cert = ssl_verify_cert self.ssl_client_cert = ssl_client_cert self.proxies = proxies + self.dynamic_parameters = dynamic_parameters # This read/write attribute specifies the number of rows to fetch at a # time with .fetchmany(). It defaults to 1 meaning to fetch a single @@ -246,8 +254,14 @@ def close(self): @check_closed def execute(self, operation, parameters=None): - query = apply_parameters(operation, parameters) - results = self._stream_query(query) + dynamic_parameters = None + + if self.dynamic_parameters: + query, dynamic_parameters = apply_dynamic_parameters(operation, parameters) + else: + query = apply_parameters(operation, parameters) + + results = self._stream_query(query, dynamic_parameters) # `_stream_query` returns a generator that produces the rows; we need to # consume the first row so that `description` is properly set, so let's @@ -321,7 +335,7 @@ def __next__(self): next = __next__ - def _stream_query(self, query): + def _stream_query(self, query, dynamic_parameters): """ Stream rows from a query. @@ -334,6 +348,9 @@ def _stream_query(self, query): payload = {"query": query, "context": self.context, "header": self.header} + if dynamic_parameters is not None: + payload["parameters"] = dynamic_parameters + auth = ( requests.auth.HTTPBasicAuth(self.user, self.password) if self.user else None ) @@ -448,3 +465,47 @@ def escape(value): return value elif isinstance(value, (list, tuple)): return ", ".join(escape(element) for element in value) + + +def apply_dynamic_parameters(operation, parameters): + if not parameters: + return operation, None + + p = re.compile("%\\((.*?)\\)s") + operation_parameters = p.findall(operation) + + if set(parameters.keys()) != set(operation_parameters): + raise exceptions.OperationalError("Parameters and placeholders do not match") + + values = [] + + for op_parameter in operation_parameters: + if isinstance(parameters[op_parameter], (tuple, list)): + values.extend(list(parameters[op_parameter])) + else: + values.append(parameters[op_parameter]) + + param_placements = { + key: dynamic_placeholder(value) for key, value in parameters.items() + } + + dynamic_parameters = [dynamic_parameter(v) for v in values] + + return operation % param_placements, dynamic_parameters + + +def dynamic_parameter(value): + types_map = { + "str": "VARCHAR", + "int": "INTEGER", + "float": "FLOAT", + "bool": "BOOLEAN", + } + + return {"value": value, "type": types_map[value.__class__.__name__]} + + +def dynamic_placeholder(value): + if isinstance(value, (list, tuple)): + return ", ".join("?" * len(value)) + return "?" diff --git a/tests/db/test_cursor.py b/tests/db/test_cursor.py index 9287b403..10160e5a 100644 --- a/tests/db/test_cursor.py +++ b/tests/db/test_cursor.py @@ -7,7 +7,15 @@ from requests.models import Response -from pydruid.db.api import apply_parameters, Cursor +from pydruid.db.api import ( + apply_parameters, + Cursor, + apply_dynamic_parameters, + dynamic_parameter, + dynamic_placeholder, +) + +from pydruid.db.exceptions import OperationalError class CursorTestSuite(unittest.TestCase): @@ -147,6 +155,212 @@ def test_apply_parameters(self): apply_parameters("SELECT %(key)s", {"key": False}), "SELECT FALSE" ) + @patch("requests.post") + def test_execute_with_dynamic_parameter(self, requests_post_mock): + response = Response() + response.status_code = 200 + response.raw = BytesIO(b"[]") + requests_post_mock.return_value = response + + url = "http://example.com/" + parameters = {"name": "Druid"} + query = "SELECT * FROM table where name = %(name)s" + + cursor = Cursor(url, user=None, password=None, dynamic_parameters=True) + cursor.execute(query, parameters) + + assert_query = "SELECT * FROM table where name = ?" + assert_params = [{"value": "Druid", "type": "VARCHAR"}] + + requests_post_mock.assert_called_with( + "http://example.com/", + auth=None, + stream=True, + headers={"Content-Type": "application/json"}, + json={ + "query": assert_query, + "parameters": assert_params, + "context": {}, + "header": False, + }, + verify=True, + cert=None, + proxies=None, + ) + + def test_apply_dynamic_parameters(self): + + parameters = { + "start_dt": "2015-09-12 00:00:00", + "end_dt": "2015-09-13 00:00:00", + "channels": ("#en.wikipedia", "#es.wikipedia"), + "pages": ["Apache Druid", "Apache%"], + "added_gt": 10, + } + + operation = """ + SELECT + channel, + page, + SUM(added) + FROM wikipedia + WHERE + __time BETWEEN TIMESTAMP %(start_dt)s AND TIMESTAMP %(end_dt)s + AND + channel IN (%(channels)s) + AND + page IN (%(pages)s) + GROUP BY channel, page + ORDER BY SUM(added) DESC + HAVING SUM(added) >= %(added_gt)s + """ + + r_op, r_params = apply_dynamic_parameters(operation, parameters) + + assert_op = """ + SELECT + channel, + page, + SUM(added) + FROM wikipedia + WHERE + __time BETWEEN TIMESTAMP ? AND TIMESTAMP ? + AND + channel IN (?, ?) + AND + page IN (?, ?) + GROUP BY channel, page + ORDER BY SUM(added) DESC + HAVING SUM(added) >= ? + """ + + self.assertEqual(r_op, assert_op) + + self.assertEqual( + (r_params[0]["value"], r_params[0]["type"]), + (parameters["start_dt"], "VARCHAR"), + ) + + self.assertEqual( + (r_params[1]["value"], r_params[1]["type"]), + (parameters["end_dt"], "VARCHAR"), + ) + + self.assertEqual( + (r_params[2]["value"], r_params[2]["type"]), + (parameters["channels"][0], "VARCHAR"), + ) + + self.assertEqual( + (r_params[3]["value"], r_params[3]["type"]), + (parameters["channels"][1], "VARCHAR"), + ) + + self.assertEqual( + (r_params[4]["value"], r_params[4]["type"]), + (parameters["pages"][0], "VARCHAR"), + ) + self.assertEqual( + (r_params[5]["value"], r_params[5]["type"]), + (parameters["pages"][1], "VARCHAR"), + ) + + self.assertEqual( + (r_params[6]["value"], r_params[6]["type"]), + (parameters["added_gt"], "INTEGER"), + ) + + def test_apply_dynamic_parameters_out_of_order(self): + + parameters = { + "channels": ("#en.wikipedia", "#es.wikipedia"), + "added_gt": 10, + "start_dt": "2015-09-12 00:00:00", + } + + operation = """ + SELECT + channel, + page, + SUM(added) + FROM wikipedia + WHERE + __time >= TIMESTAMP %(start_dt)s + AND + channel IN (%(channels)s) + GROUP BY channel, page + ORDER BY SUM(added) DESC + HAVING SUM(added) >= %(added_gt)s + """ + + r_op, r_params = apply_dynamic_parameters(operation, parameters) + + assert_op = """ + SELECT + channel, + page, + SUM(added) + FROM wikipedia + WHERE + __time >= TIMESTAMP ? + AND + channel IN (?, ?) + GROUP BY channel, page + ORDER BY SUM(added) DESC + HAVING SUM(added) >= ? + """ + + self.assertEqual(r_op, assert_op) + + self.assertEqual( + (r_params[0]["value"], r_params[0]["type"]), + (parameters["start_dt"], "VARCHAR"), + ) + + self.assertEqual( + (r_params[1]["value"], r_params[1]["type"]), + (parameters["channels"][0], "VARCHAR"), + ) + + self.assertEqual( + (r_params[2]["value"], r_params[2]["type"]), + (parameters["channels"][1], "VARCHAR"), + ) + + self.assertEqual( + (r_params[3]["value"], r_params[3]["type"]), + (parameters["added_gt"], "INTEGER"), + ) + + def test_apply_dynamic_parameters_no_params(self): + self.assertEqual( + apply_dynamic_parameters('SELECT 100 AS "100%"', None), + ('SELECT 100 AS "100%"', None), + ) + + def test_apply_dynamic_parameters_no_match_error(self): + + with self.assertRaises(OperationalError) as cm: + apply_dynamic_parameters("SELECT %(name)s", {"age": 15}) + + self.assertEqual("Parameters and placeholders do not match", str(cm.exception)) + + def test_dynamic_parameter(self): + self.assertEqual(dynamic_parameter("str"), {"value": "str", "type": "VARCHAR"}) + + self.assertEqual(dynamic_parameter(5), {"value": 5, "type": "INTEGER"}) + + self.assertEqual(dynamic_parameter(5.5), {"value": 5.5, "type": "FLOAT"}) + + self.assertEqual(dynamic_parameter(True), {"value": True, "type": "BOOLEAN"}) + + def test_dynamic_placeholder(self): + self.assertEqual(dynamic_placeholder(("a", "b")), "?, ?") + + self.assertEqual(dynamic_placeholder(["a", "b"]), "?, ?") + + self.assertEqual(dynamic_placeholder("a"), "?") + if __name__ == "__main__": unittest.main() From a6ef9d77cecb84292332c22d90dd6124615b1dec Mon Sep 17 00:00:00 2001 From: Tom Harnasz Date: Mon, 12 Apr 2021 09:43:10 +0100 Subject: [PATCH 2/4] DB API - Fix SQLAlchemy CompileError import error. Exception was being raised when using the DB API - independent of SQLAlchemy, it was attempting to import `CompileError` but this is an optional dependency as defined in `extras_require`. This was introduced in the enhancement #243 - the fix facilitates backwards compatibility but includes the new enhancement. --- pydruid/db/exceptions.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pydruid/db/exceptions.py b/pydruid/db/exceptions.py index 0d334bf4..4c4ac90b 100644 --- a/pydruid/db/exceptions.py +++ b/pydruid/db/exceptions.py @@ -1,6 +1,3 @@ -from sqlalchemy.exc import CompileError - - class Error(Exception): pass @@ -37,5 +34,15 @@ class DataError(DatabaseError): pass -class NotSupportedError(CompileError): +support_error_child_cls = None + +try: + from sqlalchemy.exc import CompileError + + support_error_child_cls = CompileError +except ImportError: + support_error_child_cls = DatabaseError + + +class NotSupportedError(support_error_child_cls): pass From c5068977daa74d3d8caec7c8aa5c0c95bdad311f Mon Sep 17 00:00:00 2001 From: Tom Harnasz Date: Tue, 13 Apr 2021 14:40:26 +0100 Subject: [PATCH 3/4] DB API - dynamic parameters - small tweaks - (#258) --- pydruid/db/api.py | 9 ++++----- tests/db/test_cursor.py | 3 +-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pydruid/db/api.py b/pydruid/db/api.py index ee78b3a4..6520c7ec 100644 --- a/pydruid/db/api.py +++ b/pydruid/db/api.py @@ -471,23 +471,22 @@ def apply_dynamic_parameters(operation, parameters): if not parameters: return operation, None + # Search for params in the following format `%(param_one)s` p = re.compile("%\\((.*?)\\)s") operation_parameters = p.findall(operation) - if set(parameters.keys()) != set(operation_parameters): + if set(parameters) != set(operation_parameters): raise exceptions.OperationalError("Parameters and placeholders do not match") values = [] for op_parameter in operation_parameters: if isinstance(parameters[op_parameter], (tuple, list)): - values.extend(list(parameters[op_parameter])) + values.extend(parameters[op_parameter]) else: values.append(parameters[op_parameter]) - param_placements = { - key: dynamic_placeholder(value) for key, value in parameters.items() - } + param_placements = {key: dynamic_placeholder(v) for key, v in parameters.items()} dynamic_parameters = [dynamic_parameter(v) for v in values] diff --git a/tests/db/test_cursor.py b/tests/db/test_cursor.py index 10160e5a..4901a06e 100644 --- a/tests/db/test_cursor.py +++ b/tests/db/test_cursor.py @@ -8,13 +8,12 @@ from requests.models import Response from pydruid.db.api import ( + apply_dynamic_parameters, apply_parameters, Cursor, - apply_dynamic_parameters, dynamic_parameter, dynamic_placeholder, ) - from pydruid.db.exceptions import OperationalError From 351c4add907185f3dff0b021df4370264f216daa Mon Sep 17 00:00:00 2001 From: Tom Harnasz Date: Fri, 16 Apr 2021 09:37:56 +0100 Subject: [PATCH 4/4] DB API - dynamic parameters - small tweaks - (#258) --- README.md | 4 ++-- pydruid/db/api.py | 4 ++-- pydruid/db/exceptions.py | 2 ++ tests/db/test_cursor.py | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 07c4f14e..ad406234 100644 --- a/README.md +++ b/README.md @@ -215,9 +215,9 @@ for row in curs: ## Dynamic Parameters -Druid 0.18.0 introduced support for [Dynamic Parameters](https://druid.apache.org/docs/latest/querying/sql.html#dynamic-parameters) where parameters are bound to `?` placeholders at execution time. This can be set using `dynamic_parameters`. on `connect`. +Druid 0.18.0 introduced support for [Dynamic Parameters](https://druid.apache.org/docs/latest/querying/sql.html#dynamic-parameters) where parameters are bound to `?` placeholders at execution time. Dynamic parameters can be optionally used by setting `dynamic_parameters` to `True` on `connect`. -Parameters support the types below, additionally allowing for tuples and lists for convenience with each value coerced to the appropriate Druid type. +Parameters support the instance types specified below, additionally allowing for tuples and lists for convenience with each value coerced to the appropriate Druid type. ``` | Instance Type | Druid Type | diff --git a/pydruid/db/api.py b/pydruid/db/api.py index 6520c7ec..12b16847 100644 --- a/pydruid/db/api.py +++ b/pydruid/db/api.py @@ -486,11 +486,11 @@ def apply_dynamic_parameters(operation, parameters): else: values.append(parameters[op_parameter]) - param_placements = {key: dynamic_placeholder(v) for key, v in parameters.items()} + placeholders = {key: dynamic_placeholder(v) for key, v in parameters.items()} dynamic_parameters = [dynamic_parameter(v) for v in values] - return operation % param_placements, dynamic_parameters + return operation % placeholders, dynamic_parameters def dynamic_parameter(value): diff --git a/pydruid/db/exceptions.py b/pydruid/db/exceptions.py index 4c4ac90b..f78745b5 100644 --- a/pydruid/db/exceptions.py +++ b/pydruid/db/exceptions.py @@ -34,6 +34,8 @@ class DataError(DatabaseError): pass +# Allow for the support of using `sqlalchemy.exc.CompileError` when using the +# `extra_require` of sqlalchemy - implemented in #243 support_error_child_cls = None try: diff --git a/tests/db/test_cursor.py b/tests/db/test_cursor.py index 4901a06e..c096f0d8 100644 --- a/tests/db/test_cursor.py +++ b/tests/db/test_cursor.py @@ -155,7 +155,7 @@ def test_apply_parameters(self): ) @patch("requests.post") - def test_execute_with_dynamic_parameter(self, requests_post_mock): + def test_execute_with_dynamic_parameters(self, requests_post_mock): response = Response() response.status_code = 200 response.raw = BytesIO(b"[]")