Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB API - Adding support for dynamic parameters #258

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,49 @@ for row in curs:
print(row)
```

## Dynamic Parameters

Druid 0.18.0 introduced support for [Dynamic Parameters](https://druid.apache.org/docs/latest/querying/sql.html#dynamic-parameters) where parameters are bound to `?` placeholders at execution time. Dynamic parameters can be optionally used by setting `dynamic_parameters` to `True` on `connect`.

Parameters support the instance types specified below, additionally allowing for tuples and lists for convenience with each value coerced to the appropriate Druid type.

```
| Instance Type | Druid Type |
|---------------|------------|
| int | INTEGER |
| float | FLOAT |
| str | VARCHAR |
| bool | BOOLEAN |
```

Example:

```python
from pydruid.db import connect

conn = connect(host='localhost', port=8082, path='/druid/v2/sql/', scheme='http', dynamic_parameters=True)
curs = conn.cursor()
parameters = {
"start_dt": "2015-09-12 00:00:00",
"channels": ("#en.wikipedia", "#es.wikipedia"),
"added_gt": 10,
}
curs.execute("""
SELECT
channel,
page,
SUM(added)
FROM wikipedia
WHERE
__time >= TIMESTAMP %(start_dt)s
AND
channel IN (%(channels)s)
GROUP BY channel, page
ORDER BY SUM(added) DESC
HAVING SUM(added) >= %(added_gt)s
""", parameters)
```

# SQLAlchemy

```python
Expand Down
66 changes: 63 additions & 3 deletions pydruid/db/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import json
import re
from collections import namedtuple, OrderedDict
from urllib import parse

Expand All @@ -26,6 +27,7 @@ def connect(
ssl_verify_cert=True,
ssl_client_cert=None,
proxies=None,
dynamic_parameters=False,
): # noqa: E125
"""
Constructor for creating a connection to the database.
Expand All @@ -48,6 +50,7 @@ def connect(
ssl_verify_cert,
ssl_client_cert,
proxies,
dynamic_parameters,
)


Expand Down Expand Up @@ -130,6 +133,7 @@ def __init__(
ssl_verify_cert=True,
ssl_client_cert=None,
proxies=None,
dynamic_parameters=False,
):
netloc = "{host}:{port}".format(host=host, port=port)
self.url = parse.urlunparse((scheme, netloc, path, None, None, None))
Expand All @@ -142,6 +146,7 @@ def __init__(
self.ssl_verify_cert = ssl_verify_cert
self.ssl_client_cert = ssl_client_cert
self.proxies = proxies
self.dynamic_parameters = dynamic_parameters

@check_closed
def close(self):
Expand Down Expand Up @@ -175,6 +180,7 @@ def cursor(self):
self.ssl_verify_cert,
self.ssl_client_cert,
self.proxies,
self.dynamic_parameters,
)

self.cursors.append(cursor)
Expand Down Expand Up @@ -206,6 +212,7 @@ def __init__(
ssl_verify_cert=True,
proxies=None,
ssl_client_cert=None,
dynamic_parameters=False,
):
self.url = url
self.context = context or {}
Expand All @@ -215,6 +222,7 @@ def __init__(
self.ssl_verify_cert = ssl_verify_cert
self.ssl_client_cert = ssl_client_cert
self.proxies = proxies
self.dynamic_parameters = dynamic_parameters

# This read/write attribute specifies the number of rows to fetch at a
# time with .fetchmany(). It defaults to 1 meaning to fetch a single
Expand Down Expand Up @@ -246,8 +254,14 @@ def close(self):

@check_closed
def execute(self, operation, parameters=None):
query = apply_parameters(operation, parameters)
results = self._stream_query(query)
dynamic_parameters = None

if self.dynamic_parameters:
query, dynamic_parameters = apply_dynamic_parameters(operation, parameters)
else:
query = apply_parameters(operation, parameters)

results = self._stream_query(query, dynamic_parameters)

# `_stream_query` returns a generator that produces the rows; we need to
# consume the first row so that `description` is properly set, so let's
Expand Down Expand Up @@ -321,7 +335,7 @@ def __next__(self):

next = __next__

def _stream_query(self, query):
def _stream_query(self, query, dynamic_parameters):
"""
Stream rows from a query.

Expand All @@ -334,6 +348,9 @@ def _stream_query(self, query):

payload = {"query": query, "context": self.context, "header": self.header}

if dynamic_parameters is not None:
payload["parameters"] = dynamic_parameters

auth = (
requests.auth.HTTPBasicAuth(self.user, self.password) if self.user else None
)
Expand Down Expand Up @@ -448,3 +465,46 @@ def escape(value):
return value
elif isinstance(value, (list, tuple)):
return ", ".join(escape(element) for element in value)


def apply_dynamic_parameters(operation, parameters):
if not parameters:
return operation, None

# Search for params in the following format `%(param_one)s`
p = re.compile("%\\((.*?)\\)s")
operation_parameters = p.findall(operation)

if set(parameters) != set(operation_parameters):
raise exceptions.OperationalError("Parameters and placeholders do not match")

values = []

for op_parameter in operation_parameters:
if isinstance(parameters[op_parameter], (tuple, list)):
values.extend(parameters[op_parameter])
else:
values.append(parameters[op_parameter])

placeholders = {key: dynamic_placeholder(v) for key, v in parameters.items()}

dynamic_parameters = [dynamic_parameter(v) for v in values]

return operation % placeholders, dynamic_parameters


def dynamic_parameter(value):
types_map = {
"str": "VARCHAR",
"int": "INTEGER",
"float": "FLOAT",
"bool": "BOOLEAN",
}

return {"value": value, "type": types_map[value.__class__.__name__]}


def dynamic_placeholder(value):
if isinstance(value, (list, tuple)):
return ", ".join("?" * len(value))
return "?"
17 changes: 13 additions & 4 deletions pydruid/db/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from sqlalchemy.exc import CompileError


class Error(Exception):
pass

Expand Down Expand Up @@ -37,5 +34,17 @@ class DataError(DatabaseError):
pass


class NotSupportedError(CompileError):
# Allow for the support of using `sqlalchemy.exc.CompileError` when using the
# `extra_require` of sqlalchemy - implemented in #243
support_error_child_cls = None

try:
from sqlalchemy.exc import CompileError

support_error_child_cls = CompileError
except ImportError:
support_error_child_cls = DatabaseError


class NotSupportedError(support_error_child_cls):
pass
Loading