Skip to content

Commit

Permalink
Merge pull request #244 from ploxiln/tornado6
Browse files Browse the repository at this point in the history
support tornado-6.x, support coroutine message_handler
  • Loading branch information
mreiferson authored Jun 19, 2020
2 parents fa8bbb9 + 2fa6c0f commit 8a5a3d8
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 74 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ jobs:
tornado_ver:
- "4.5.3"
- "5.1.1"
- "6.0.4"
exclude:
- imgtag: "python:2.7-buster"
tornado_ver: "6.0.4"
- imgtag: "python:3.4-jessie"
tornado_ver: "6.0.4"

container: "${{matrix.imgtag}}"
steps:
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
tornado==3.0.2
tornado==5.1.1
7 changes: 4 additions & 3 deletions nsq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import absolute_import

import signal
import tornado.ioloop
import logging

import tornado.ioloop

from .protocol import (
Error,
unpack_response,
Expand Down Expand Up @@ -36,7 +37,7 @@
def _handle_term_signal(sig_num, frame):
logging.getLogger(__name__).info(
'TERM Signal handler called with signal %r', sig_num)
tornado.ioloop.IOLoop.instance().stop()
tornado.ioloop.IOLoop.current().stop()


def run():
Expand All @@ -45,7 +46,7 @@ def run():
"""
signal.signal(signal.SIGTERM, _handle_term_signal)
signal.signal(signal.SIGINT, _handle_term_signal)
tornado.ioloop.IOLoop.instance().start()
tornado.ioloop.IOLoop.current().start()


__author__ = "Matt Reiferson <[email protected]>"
Expand Down
25 changes: 15 additions & 10 deletions nsq/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
except ImportError:
SnappyEncoder = SnappySocket = None # pyflakes.ignore

import tornado.iostream
import tornado.ioloop
from tornado.iostream import IOStream
from tornado.ioloop import IOLoop

from nsq import event, protocol
from .deflate_socket import DeflateSocket, DeflateEncoder
Expand Down Expand Up @@ -227,25 +227,28 @@ def connect(self):
self.socket.settimeout(self.timeout)
self.socket.setblocking(0)

self.stream = tornado.iostream.IOStream(self.socket)
self.stream = IOStream(self.socket)
self.stream.set_close_callback(self._socket_close)
self.stream.set_nodelay(True)

self.state = CONNECTING
self.on(event.CONNECT, self._on_connect)
self.on(event.DATA, self._on_data)

self.stream.connect((self.host, self.port), self._connect_callback)
fut = self.stream.connect((self.host, self.port))
IOLoop.current().add_future(fut, self._connect_callback)

def _connect_callback(self):
def _connect_callback(self, fut):
fut.result()
self.state = CONNECTED
self.stream.write(protocol.MAGIC_V2)
self._start_read()
self.trigger(event.CONNECT, conn=self)

def _read_bytes(self, size, callback):
try:
self.stream.read_bytes(size, callback)
fut = self.stream.read_bytes(size)
IOLoop.current().add_future(fut, callback)
except IOError:
self.close()
self.trigger(
Expand All @@ -266,8 +269,9 @@ def _socket_close(self):
def close(self):
self.stream.close()

def _read_size(self, data):
def _read_size(self, fut):
try:
data = fut.result()
size = struct_l.unpack(data)[0]
except Exception:
self.close()
Expand All @@ -279,15 +283,16 @@ def _read_size(self, data):
return
self._read_bytes(size, self._read_body)

def _read_body(self, data):
def _read_body(self, fut):
try:
data = fut.result()
self.trigger(event.DATA, conn=self, data=data)
except Exception:
logger.exception('uncaught exception in data event')
self._start_read()

def send(self, data):
self.stream.write(self.encoder.encode(data))
return self.stream.write(self.encoder.encode(data))

def upgrade_to_tls(self, options=None):
# in order to upgrade to TLS we need to *replace* the IOStream...
Expand All @@ -313,7 +318,7 @@ def finish_upgrade_tls(fut):
error=protocol.SendError('failed to upgrade to TLS', e),
)

tornado.ioloop.IOLoop.current().add_future(fut, finish_upgrade_tls)
IOLoop.current().add_future(fut, finish_upgrade_tls)

def upgrade_to_snappy(self):
assert SnappySocket, 'snappy requires the python-snappy package'
Expand Down
76 changes: 48 additions & 28 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from tornado.ioloop import PeriodicCallback
import tornado.httpclient
import tornado.gen

from ._compat import integer_types
from ._compat import iteritems
Expand Down Expand Up @@ -48,14 +49,20 @@ class Reader(Client):
It maintains a sufficient RDY count based on the # of producers and your configured
``max_in_flight``.
Handlers should be defined as shown in the examples below. The handler receives a
:class:`nsq.Message` object that has instance methods :meth:`nsq.Message.finish`,
:meth:`nsq.Message.requeue`, and :meth:`nsq.Message.touch` to respond to ``nsqd``.
When messages are not responded to explicitly, it is responsible for sending
``FIN`` or ``REQ`` commands based on return value of ``message_handler``. When
re-queueing, it will backoff from processing additional messages for an increasing
delay (calculated exponentially based on consecutive failures up to ``max_backoff_duration``).
Handlers should be defined as shown in the examples below. The ``message_handler``
callback function receives a :class:`nsq.Message` object that has instance methods
:meth:`nsq.Message.finish`, :meth:`nsq.Message.requeue`, and :meth:`nsq.Message.touch`
which can be used to respond to ``nsqd``. As an alternative to explicitly calling these
response methods, the handler function can simply return ``True`` to finish the message,
or ``False`` to requeue it. If the handler function calls :meth:`nsq.Message.enable_async`,
then automatic finish/requeue is disabled, allowing the :class:`nsq.Message` to finish or
requeue in a later async callback or context. The handler function may also be a coroutine,
in which case Message async handling is enabled automatically, but the coroutine
can still return a final value of True/False to automatically finish/requeue the message.
After re-queueing a message, the handler will backoff from processing additional messages
for an increasing delay (calculated exponentially based on consecutive failures up to
``max_backoff_duration``).
Synchronous example::
Expand Down Expand Up @@ -115,7 +122,7 @@ def process_message(message):
this value will be divided evenly amongst the configured/discovered nsqd producers
:param lookupd_poll_interval: the amount of time in seconds between querying all of the supplied
nsqlookupd instances. a random amount of time based on thie value will be initially
nsqlookupd instances. a random amount of time based on this value will be initially
introduced in order to add jitter when multiple readers are running
:param lookupd_poll_jitter: The maximum fractional amount of jitter to add to the
Expand Down Expand Up @@ -211,7 +218,6 @@ def __init__(
self.conn_kwargs = kwargs

self.backoff_timer = BackoffTimer(0, max_backoff_duration)
self.backoff_timeout = None
self.backoff_block = False
self.backoff_block_completed = True

Expand Down Expand Up @@ -242,8 +248,9 @@ def _run(self):

if not self.lookupd_http_addresses:
return

# trigger the first lookup query manually
self.query_lookupd()
self.io_loop.spawn_callback(self.query_lookupd)

self.query_periodic = PeriodicCallback(
self.query_lookupd,
Expand All @@ -253,7 +260,7 @@ def _run(self):
# randomize the time we start this poll loop so that all
# consumers don't query at exactly the same time
delay = random.random() * self.lookupd_poll_interval * self.lookupd_poll_jitter
self.io_loop.add_timeout(time.time() + delay, self.query_periodic.start)
self.io_loop.call_later(delay, self.query_periodic.start)

def close(self):
"""
Expand Down Expand Up @@ -313,27 +320,43 @@ def _on_message(self, conn, message, **kwargs):
def _handle_message(self, conn, message):
self._maybe_update_rdy(conn)

success = False
result = False
try:
if 0 < self.max_tries < message.attempts:
self.giving_up(message)
return message.finish()
pre_processed_message = self.preprocess_message(message)
if not self.validate_message(pre_processed_message):
return message.finish()
success = self.process_message(message)
result = self.process_message(message)
except Exception:
logger.exception('[%s:%s] uncaught exception while handling message %s body:%r',
conn.id, self.name, message.id, message.body)
if not message.has_responded():
return message.requeue()

if not message.is_async() and not message.has_responded():
assert success is not None, 'ambiguous return value for synchronous mode'
if success:
if result not in (True, False, None):
# assume handler returned a Future or Coroutine
message.enable_async()
fut = tornado.gen.convert_yielded(result)
fut.add_done_callback(functools.partial(self._maybe_finish, message))

elif not message.is_async() and not message.has_responded():
assert result is not None, 'ambiguous return value for synchronous mode'
if result:
return message.finish()
return message.requeue()

def _maybe_finish(self, message, fut):
if not message.has_responded():
try:
if fut.result():
message.finish()
return
except Exception:
pass
message.requeue()

def _maybe_update_rdy(self, conn):
if self.backoff_timer.get_interval() or self.max_in_flight == 0:
return
Expand All @@ -349,7 +372,6 @@ def _maybe_update_rdy(self, conn):
self._send_rdy(conn, conn_max_in_flight)

def _finish_backoff_block(self):
self.backoff_timeout = None
self.backoff_block = False

# we must have raced and received a message out of order that resumed
Expand Down Expand Up @@ -418,8 +440,7 @@ def _start_backoff_block(self):
for c in self.conns.values():
self._send_rdy(c, 0)

self.backoff_timeout = self.io_loop.add_timeout(time.time() + backoff_interval,
self._finish_backoff_block)
self.io_loop.call_later(backoff_interval, self._finish_backoff_block)

def _rdy_retry(self, conn, value):
conn.rdy_timeout = None
Expand All @@ -433,7 +454,7 @@ def _send_rdy(self, conn, value):
if value and (self.disabled() or self.max_in_flight == 0):
logger.info('[%s:%s] disabled, delaying RDY state change', conn.id, self.name)
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 15, rdy_retry_callback)
conn.rdy_timeout = self.io_loop.call_later(15, rdy_retry_callback)
return

if value > conn.max_rdy_count:
Expand Down Expand Up @@ -543,8 +564,9 @@ def _on_connection_close(self, conn, **kwargs):
logger.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name)
reconnect_callback = functools.partial(self.connect_to_nsqd,
host=conn.host, port=conn.port)
self.io_loop.add_timeout(time.time() + 15, reconnect_callback)
self.io_loop.call_later(15, reconnect_callback)

@tornado.gen.coroutine
def query_lookupd(self):
"""
Trigger a query of the configured ``nsq_lookupd_http_addresses``.
Expand All @@ -571,15 +593,13 @@ def query_lookupd(self):
headers={'Accept': 'application/vnd.nsq; version=1.0'},
connect_timeout=self.lookupd_connect_timeout,
request_timeout=self.lookupd_request_timeout)
callback = functools.partial(self._finish_query_lookupd, lookupd_url=lookupd_url)
self.http_client.fetch(req, callback=callback)

def _finish_query_lookupd(self, response, lookupd_url):
if response.error:
try:
response = yield self.http_client.fetch(req)
except Exception as e:
logger.warning('[%s] lookupd %s query error: %s',
self.name, lookupd_url, response.error)
self.name, lookupd_url, e)
return

try:
lookup_data = json.loads(response.body.decode("utf8"))
except ValueError:
Expand Down
Loading

0 comments on commit 8a5a3d8

Please sign in to comment.