From 5a7dfbbfdccb726eec49adbb7ec50c1597ed9f43 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sat, 18 Jan 2020 16:31:53 -0500 Subject: [PATCH 1/6] support tornado-6, support coroutine message_handler also enable yielding / awaiting Writer.pub() also switch ioloop add_timeout() for call_later() to automatically use the IOLoop's time base (time.time() is not always the correct base) tests need a lot of updating ... some mocked object methods need to return Futures --- nsq/conn.py | 17 ++++++---- nsq/reader.py | 54 +++++++++++++++++++------------ nsq/writer.py | 36 +++++++++++++-------- setup.py | 2 +- tests/reader_unit_test_helpers.py | 13 ++++++-- tests/test_async.py | 38 ++++++++++++++++------ 6 files changed, 107 insertions(+), 53 deletions(-) diff --git a/nsq/conn.py b/nsq/conn.py index fafb217..c232dfa 100644 --- a/nsq/conn.py +++ b/nsq/conn.py @@ -235,9 +235,11 @@ def connect(self): self.on(event.CONNECT, self._on_connect) self.on(event.DATA, self._on_data) - self.stream.connect((self.host, self.port), self._connect_callback) + fut = self.stream.connect((self.host, self.port)) + tornado.ioloop.IOLoop.current().add_future(fut, self._connect_callback) - def _connect_callback(self): + def _connect_callback(self, fut): + fut.result() self.state = CONNECTED self.stream.write(protocol.MAGIC_V2) self._start_read() @@ -245,7 +247,8 @@ def _connect_callback(self): def _read_bytes(self, size, callback): try: - self.stream.read_bytes(size, callback) + fut = self.stream.read_bytes(size) + tornado.ioloop.IOLoop.current().add_future(fut, callback) except IOError: self.close() self.trigger( @@ -266,8 +269,9 @@ def _socket_close(self): def close(self): self.stream.close() - def _read_size(self, data): + def _read_size(self, fut): try: + data = fut.result() size = struct_l.unpack(data)[0] except Exception: self.close() @@ -279,15 +283,16 @@ def _read_size(self, data): return self._read_bytes(size, self._read_body) - def _read_body(self, data): + def _read_body(self, fut): try: + data = fut.result() self.trigger(event.DATA, conn=self, data=data) except Exception: logger.exception('uncaught exception in data event') self._start_read() def send(self, data): - self.stream.write(self.encoder.encode(data)) + return self.stream.write(self.encoder.encode(data)) def upgrade_to_tls(self, options=None): # in order to upgrade to TLS we need to *replace* the IOStream... diff --git a/nsq/reader.py b/nsq/reader.py index ba4459b..96a6f8f 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -10,6 +10,7 @@ from tornado.ioloop import PeriodicCallback import tornado.httpclient +import tornado.gen from ._compat import integer_types from ._compat import iteritems @@ -115,7 +116,7 @@ def process_message(message): this value will be divided evenly amongst the configured/discovered nsqd producers :param lookupd_poll_interval: the amount of time in seconds between querying all of the supplied - nsqlookupd instances. a random amount of time based on thie value will be initially + nsqlookupd instances. a random amount of time based on this value will be initially introduced in order to add jitter when multiple readers are running :param lookupd_poll_jitter: The maximum fractional amount of jitter to add to the @@ -211,7 +212,6 @@ def __init__( self.conn_kwargs = kwargs self.backoff_timer = BackoffTimer(0, max_backoff_duration) - self.backoff_timeout = None self.backoff_block = False self.backoff_block_completed = True @@ -242,8 +242,9 @@ def _run(self): if not self.lookupd_http_addresses: return + # trigger the first lookup query manually - self.query_lookupd() + self.io_loop.spawn_callback(self.query_lookupd) self.query_periodic = PeriodicCallback( self.query_lookupd, @@ -253,7 +254,7 @@ def _run(self): # randomize the time we start this poll loop so that all # consumers don't query at exactly the same time delay = random.random() * self.lookupd_poll_interval * self.lookupd_poll_jitter - self.io_loop.add_timeout(time.time() + delay, self.query_periodic.start) + self.io_loop.call_later(delay, self.query_periodic.start) def close(self): """ @@ -313,7 +314,7 @@ def _on_message(self, conn, message, **kwargs): def _handle_message(self, conn, message): self._maybe_update_rdy(conn) - success = False + result = False try: if 0 < self.max_tries < message.attempts: self.giving_up(message) @@ -321,19 +322,35 @@ def _handle_message(self, conn, message): pre_processed_message = self.preprocess_message(message) if not self.validate_message(pre_processed_message): return message.finish() - success = self.process_message(message) + result = self.process_message(message) except Exception: logger.exception('[%s:%s] uncaught exception while handling message %s body:%r', conn.id, self.name, message.id, message.body) if not message.has_responded(): return message.requeue() - if not message.is_async() and not message.has_responded(): - assert success is not None, 'ambiguous return value for synchronous mode' - if success: + if result not in (True, False, None): + # assume handler returned a Future or Coroutine + message.enable_async() + fut = tornado.gen.convert_yielded(result) + fut.add_done_callback(functools.partial(self._maybe_finish, message)) + + elif not message.is_async() and not message.has_responded(): + assert result is not None, 'ambiguous return value for synchronous mode' + if result: return message.finish() return message.requeue() + def _maybe_finish(self, message, fut): + if not message.has_responded(): + try: + if fut.result(): + message.finish() + return + except Exception: + pass + message.requeue() + def _maybe_update_rdy(self, conn): if self.backoff_timer.get_interval() or self.max_in_flight == 0: return @@ -349,7 +366,6 @@ def _maybe_update_rdy(self, conn): self._send_rdy(conn, conn_max_in_flight) def _finish_backoff_block(self): - self.backoff_timeout = None self.backoff_block = False # we must have raced and received a message out of order that resumed @@ -418,8 +434,7 @@ def _start_backoff_block(self): for c in self.conns.values(): self._send_rdy(c, 0) - self.backoff_timeout = self.io_loop.add_timeout(time.time() + backoff_interval, - self._finish_backoff_block) + self.io_loop.call_later(backoff_interval, self._finish_backoff_block) def _rdy_retry(self, conn, value): conn.rdy_timeout = None @@ -433,7 +448,7 @@ def _send_rdy(self, conn, value): if value and (self.disabled() or self.max_in_flight == 0): logger.info('[%s:%s] disabled, delaying RDY state change', conn.id, self.name) rdy_retry_callback = functools.partial(self._rdy_retry, conn, value) - conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 15, rdy_retry_callback) + conn.rdy_timeout = self.io_loop.call_later(15, rdy_retry_callback) return if value > conn.max_rdy_count: @@ -543,8 +558,9 @@ def _on_connection_close(self, conn, **kwargs): logger.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name) reconnect_callback = functools.partial(self.connect_to_nsqd, host=conn.host, port=conn.port) - self.io_loop.add_timeout(time.time() + 15, reconnect_callback) + self.io_loop.call_later(15, reconnect_callback) + @tornado.gen.coroutine def query_lookupd(self): """ Trigger a query of the configured ``nsq_lookupd_http_addresses``. @@ -571,15 +587,13 @@ def query_lookupd(self): headers={'Accept': 'application/vnd.nsq; version=1.0'}, connect_timeout=self.lookupd_connect_timeout, request_timeout=self.lookupd_request_timeout) - callback = functools.partial(self._finish_query_lookupd, lookupd_url=lookupd_url) - self.http_client.fetch(req, callback=callback) - def _finish_query_lookupd(self, response, lookupd_url): - if response.error: + try: + response = yield self.http_client.fetch(req) + except Exception as e: logger.warning('[%s] lookupd %s query error: %s', - self.name, lookupd_url, response.error) + self.name, lookupd_url, e) return - try: lookup_data = json.loads(response.body.decode("utf8")) except ValueError: diff --git a/nsq/writer.py b/nsq/writer.py index 77a562a..bfecbb7 100644 --- a/nsq/writer.py +++ b/nsq/writer.py @@ -2,10 +2,11 @@ from __future__ import absolute_import import logging -import time import functools import random +import tornado.concurrent + from ._compat import string_types from ._compat import bytes_types from ._compat import func_args @@ -16,6 +17,13 @@ logger = logging.getLogger(__name__) +def make_exc_future(exc): + fut = tornado.concurrent.Future() + fut.set_exception(exc) + fut.exception() # suppress not-yielded-future-exception warning + return fut + + class Writer(Client): r""" A high-level producer class built on top of the `Tornado IOLoop `_ @@ -121,7 +129,7 @@ def pub(self, topic, msg, callback=None): :param msg: message body (bytes) :param callback: function which takes (conn, data) (data may be nsq.Error) """ - self._pub('pub', topic, msg, callback=callback) + return self._pub('pub', topic, msg, callback=callback) def mpub(self, topic, msg, callback=None): """ @@ -135,7 +143,7 @@ def mpub(self, topic, msg, callback=None): msg = [msg] assert isinstance(msg, (list, set, tuple)) - self._pub('mpub', topic, msg, callback=callback) + return self._pub('mpub', topic, msg, callback=callback) def dpub(self, topic, delay_ms, msg, callback=None): """ @@ -146,20 +154,20 @@ def dpub(self, topic, delay_ms, msg, callback=None): :param msg: message body (bytes) :param callback: function which takes (conn, data) (data may be nsq.Error) """ - self._pub('dpub', topic, msg, delay_ms, callback=callback) + return self._pub('dpub', topic, msg, delay_ms, callback=callback) def _pub(self, command, topic, msg, delay_ms=None, callback=None): if not callback: - callback = functools.partial(self._finish_pub, command=command, - topic=topic, msg=msg) - + callback = functools.partial(self._finish_pub, + command=command, topic=topic, msg=msg) open_connections = [ conn for conn in self.conns.values() if conn.connected() ] if not open_connections: - callback(None, protocol.SendError('no open connections')) - return + exc = protocol.SendError('no open connections') + callback(None, exc) + return make_exc_future(exc) conn = random.choice(open_connections) conn.callback_queue.append(callback) @@ -171,11 +179,13 @@ def _pub(self, command, topic, msg, delay_ms=None, callback=None): args = (topic, msg) try: - conn.send(cmd(*args)) - except Exception: + return conn.send(cmd(*args)) + except Exception as e: logger.exception('[%s] failed to send %s' % (conn.id, command)) - callback(None, protocol.SendError('send error')) + exc = protocol.SendError('send error', e) + callback(None, exc) conn.close() + return make_exc_future(exc) def _on_connection_error(self, conn, error, **kwargs): super(Writer, self)._on_connection_error(conn, error, **kwargs) @@ -238,7 +248,7 @@ def _on_connection_close(self, conn, **kwargs): logger.info('[%s] attempting to reconnect in %0.2fs', conn.id, self.reconnect_interval) reconnect_callback = functools.partial(self.connect_to_nsqd, host=conn.host, port=conn.port) - self.io_loop.add_timeout(time.time() + self.reconnect_interval, reconnect_callback) + self.io_loop.call_later(self.reconnect_interval, reconnect_callback) def _finish_pub(self, conn, data, command, topic, msg): if isinstance(data, protocol.Error): diff --git a/setup.py b/setup.py index 9d2bec7..e56d61c 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def run_tests(self): version ), packages=['nsq'], - install_requires=['tornado<6'], + install_requires=['tornado'], include_package_data=True, zip_safe=False, tests_require=['pytest>=3.3.1', 'mock', 'python-snappy'], diff --git a/tests/reader_unit_test_helpers.py b/tests/reader_unit_test_helpers.py index 85a300b..a527ca4 100644 --- a/tests/reader_unit_test_helpers.py +++ b/tests/reader_unit_test_helpers.py @@ -1,7 +1,10 @@ from __future__ import absolute_import +import time + from mock import patch, create_autospec from tornado.ioloop import IOLoop +from tornado.concurrent import Future import nsq from nsq import event @@ -19,14 +22,18 @@ def get_reader(max_in_flight=5): def get_ioloop(): - ioloop = create_autospec(IOLoop) - ioloop.time.return_value = 0 + ioloop = create_autospec(IOLoop, instance=True) + ioloop.time.side_effect = time.time + ioloop.call_later.side_effect = lambda dt, cb: ioloop.add_timeout(time.time() + dt, cb) return ioloop def get_conn(reader): global _conn_port - with patch('nsq.conn.tornado.iostream.IOStream', autospec=True): + with patch('nsq.conn.tornado.iostream.IOStream', autospec=True) as iostream: + instance = iostream.return_value + instance.connect.return_value = Future() + instance.read_bytes.return_value = Future() conn = reader.connect_to_nsqd('localhost', _conn_port) _conn_port += 1 conn.trigger(event.READY, conn=conn) diff --git a/tests/test_async.py b/tests/test_async.py index ea531cc..d947c80 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -4,6 +4,7 @@ from mock import patch, create_autospec from tornado.iostream import IOStream +from tornado.concurrent import Future # shunt '..' into sys.path since we are in a 'tests' subdirectory base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) @@ -24,6 +25,10 @@ def _get_test_conn(): conn = AsyncConn('test', 4150) # now set the stream attribute, which is ordinarily set in conn.connect() conn.stream = create_autospec(IOStream) + fut = Future() + fut.set_result(conn.stream) + conn.stream.connect.return_value = fut + conn.stream.read_bytes.return_value = Future() return conn @@ -35,10 +40,15 @@ def _get_test_conn(): @patch('nsq.conn.socket') @patch('nsq.conn.tornado.iostream.IOStream', autospec=True) def test_connect(mock_iostream, mock_socket): + instance = mock_iostream.return_value + fut = Future() + fut.set_result(instance) + instance.connect.return_value = fut + conn = _get_test_conn() conn.connect() conn.stream.set_close_callback.assert_called_once_with(conn._socket_close) - conn.stream.connect.assert_called_once_with((conn.host, conn.port), conn._connect_callback) + conn.stream.connect.assert_called_once_with((conn.host, conn.port)) assert conn.state == 'CONNECTING' # now ensure that we will bail if we were already in a connecting or connected state @@ -62,7 +72,9 @@ def test_connect_callback(): # simulate having called `conn.connect` by setting state to `connecting` conn.state = 'CONNECTING' with patch.object(conn, '_start_read', autospec=True) as mock_start_read: - conn._connect_callback() + fut = Future() + fut.set_result(conn.stream) + conn._connect_callback(fut) assert conn.state == 'CONNECTED' conn.stream.write.assert_called_once_with(protocol.MAGIC_V2) mock_start_read.assert_called_once_with() @@ -72,21 +84,25 @@ def test_connect_callback(): def test_start_read(): conn = _get_test_conn() conn._start_read() - conn.stream.read_bytes.assert_called_once_with(4, conn._read_size) + conn.stream.read_bytes.assert_called_once_with(4) def test_read_size(): conn = _get_test_conn() body_size = 6 body_size_packed = struct_l.pack(body_size) - conn._read_size(body_size_packed) - conn.stream.read_bytes.assert_called_once_with(body_size, conn._read_body) + fut = Future() + fut.set_result(body_size_packed) + conn._read_size(fut) + conn.stream.read_bytes.assert_called_once_with(body_size) # now test that we get the right behavior when we get malformed data # for this, we'll want to stick on mock on conn.close conn.stream.read_bytes.reset_mock() with patch.object(conn, 'close', autospec=True) as mock_close: - conn._read_size('asdfasdf') + fut = Future() + fut.set_result('asdfasdf') + conn._read_size(fut) mock_close.assert_called_once_with() assert not conn.stream.read_bytes.called @@ -97,14 +113,16 @@ def test_read_body(): conn.on('data', on_data) data = 'NSQ' - conn._read_body(data) + fut = Future() + fut.set_result(data) + conn._read_body(fut) on_data.assert_called_once_with(conn=conn, data=data) - conn.stream.read_bytes.assert_called_once_with(4, conn._read_size) + conn.stream.read_bytes.assert_called_once_with(4) # now test functionality when the data_callback fails on_data.reset_mock() conn.stream.read_bytes.reset_mock() on_data.return_value = Exception("Boom.") - conn._read_body(data) + conn._read_body(fut) # verify that the next _start_read was called - conn.stream.read_bytes.assert_called_once_with(4, conn._read_size) + conn.stream.read_bytes.assert_called_once_with(4) From 9d63c6c1d7d66c6f4259692b82e6895c13b23cac Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sat, 18 Jan 2020 16:36:55 -0500 Subject: [PATCH 2/6] tests: add tornado-6.0.4 to GitHub Actions test matrix but skip it on python-2.7 and python-3.4 (tornado-6 requires python-3.5.2+) --- .github/workflows/main.yml | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bcb9637..e5247ba 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -25,6 +25,12 @@ jobs: tornado_ver: - "4.5.3" - "5.1.1" + - "6.0.4" + exclude: + - imgtag: "python:2.7-buster" + tornado_ver: "6.0.4" + - imgtag: "python:3.4-jessie" + tornado_ver: "6.0.4" container: "${{matrix.imgtag}}" steps: diff --git a/setup.py b/setup.py index e56d61c..d79a350 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ def run_tests(self): install_requires=['tornado'], include_package_data=True, zip_safe=False, - tests_require=['pytest>=3.3.1', 'mock', 'python-snappy'], + tests_require=['pytest>=3.6.3', 'mock', 'python-snappy'], cmdclass={'test': PyTest}, classifiers=[ 'Development Status :: 6 - Mature', From f430a555473b8976c8a6324e0c3ebebb88979355 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sun, 19 Jan 2020 00:23:27 -0500 Subject: [PATCH 3/6] shorter tornado IOLoop and IOStream references --- nsq/__init__.py | 7 ++++--- nsq/conn.py | 12 ++++++------ tests/reader_unit_test_helpers.py | 2 +- tests/test_async.py | 2 +- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/nsq/__init__.py b/nsq/__init__.py index 7d5b09b..6fa5f56 100644 --- a/nsq/__init__.py +++ b/nsq/__init__.py @@ -1,9 +1,10 @@ from __future__ import absolute_import import signal -import tornado.ioloop import logging +import tornado.ioloop + from .protocol import ( Error, unpack_response, @@ -36,7 +37,7 @@ def _handle_term_signal(sig_num, frame): logging.getLogger(__name__).info( 'TERM Signal handler called with signal %r', sig_num) - tornado.ioloop.IOLoop.instance().stop() + tornado.ioloop.IOLoop.current().stop() def run(): @@ -45,7 +46,7 @@ def run(): """ signal.signal(signal.SIGTERM, _handle_term_signal) signal.signal(signal.SIGINT, _handle_term_signal) - tornado.ioloop.IOLoop.instance().start() + tornado.ioloop.IOLoop.current().start() __author__ = "Matt Reiferson " diff --git a/nsq/conn.py b/nsq/conn.py index c232dfa..ef8a4cc 100644 --- a/nsq/conn.py +++ b/nsq/conn.py @@ -15,8 +15,8 @@ except ImportError: SnappyEncoder = SnappySocket = None # pyflakes.ignore -import tornado.iostream -import tornado.ioloop +from tornado.iostream import IOStream +from tornado.ioloop import IOLoop from nsq import event, protocol from .deflate_socket import DeflateSocket, DeflateEncoder @@ -227,7 +227,7 @@ def connect(self): self.socket.settimeout(self.timeout) self.socket.setblocking(0) - self.stream = tornado.iostream.IOStream(self.socket) + self.stream = IOStream(self.socket) self.stream.set_close_callback(self._socket_close) self.stream.set_nodelay(True) @@ -236,7 +236,7 @@ def connect(self): self.on(event.DATA, self._on_data) fut = self.stream.connect((self.host, self.port)) - tornado.ioloop.IOLoop.current().add_future(fut, self._connect_callback) + IOLoop.current().add_future(fut, self._connect_callback) def _connect_callback(self, fut): fut.result() @@ -248,7 +248,7 @@ def _connect_callback(self, fut): def _read_bytes(self, size, callback): try: fut = self.stream.read_bytes(size) - tornado.ioloop.IOLoop.current().add_future(fut, callback) + IOLoop.current().add_future(fut, callback) except IOError: self.close() self.trigger( @@ -318,7 +318,7 @@ def finish_upgrade_tls(fut): error=protocol.SendError('failed to upgrade to TLS', e), ) - tornado.ioloop.IOLoop.current().add_future(fut, finish_upgrade_tls) + IOLoop.current().add_future(fut, finish_upgrade_tls) def upgrade_to_snappy(self): assert SnappySocket, 'snappy requires the python-snappy package' diff --git a/tests/reader_unit_test_helpers.py b/tests/reader_unit_test_helpers.py index a527ca4..a837982 100644 --- a/tests/reader_unit_test_helpers.py +++ b/tests/reader_unit_test_helpers.py @@ -30,7 +30,7 @@ def get_ioloop(): def get_conn(reader): global _conn_port - with patch('nsq.conn.tornado.iostream.IOStream', autospec=True) as iostream: + with patch('nsq.conn.IOStream', autospec=True) as iostream: instance = iostream.return_value instance.connect.return_value = Future() instance.read_bytes.return_value = Future() diff --git a/tests/test_async.py b/tests/test_async.py index d947c80..d1dff03 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -38,7 +38,7 @@ def _get_test_conn(): # https://github.com/testing-cabal/mock/issues/323 # @patch('nsq.conn.socket') -@patch('nsq.conn.tornado.iostream.IOStream', autospec=True) +@patch('nsq.conn.IOStream', autospec=True) def test_connect(mock_iostream, mock_socket): instance = mock_iostream.return_value fut = Future() From 39a4ef3dd4ab153cd04c01c10e16309cf89948ef Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Mon, 27 Apr 2020 01:28:16 -0400 Subject: [PATCH 4/6] tests: add coroutine reader and writer test cases --- tests/test_reader.py | 27 ++++++++++++++++++++++++++- tests/test_writer.py | 21 +++++++++++++++++++-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/tests/test_reader.py b/tests/test_reader.py index 3e72b16..98395d5 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -9,10 +9,11 @@ import time import ssl +import tornado.gen import tornado.httpclient +import tornado.httpserver import tornado.testing import tornado.web -import tornado.httpserver # shunt '..' into sys.path since we are in a 'tests' subdirectory base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) @@ -194,6 +195,30 @@ def handler(msg): self.wait() r.close() + assert self.msg_count == num_messages + + def test_reader_coro(self): + self.msg_count = 0 + num_messages = 20 + + topic = 'test_reader_msgs_%s' % time.time() + self._send_messages(topic, num_messages, b'sup') + + @tornado.gen.coroutine + def handler(msg): + yield tornado.gen.sleep(0.1) + self.msg_count += 1 + if self.msg_count >= num_messages: + self.stop() + raise tornado.gen.Return(True) + + r = Reader(nsqd_tcp_addresses=['127.0.0.1:4150'], topic=topic, channel='ch', + message_handler=handler, max_in_flight=10, + **self.identify_options) + + self.wait() + r.close() + assert self.msg_count == num_messages def test_reader_heartbeat(self): this = self diff --git a/tests/test_writer.py b/tests/test_writer.py index 479b4a1..01c30a7 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1,9 +1,10 @@ from __future__ import absolute_import - import time +import unittest + +import tornado.gen import nsq -import unittest from .test_reader import IntegrationBase @@ -63,3 +64,19 @@ def pubcb(conn, data): result = self.wait() print(str(result)) assert not isinstance(result, Exception) + + def test_writer_await_pub(self): + topic = 'test_writer_mpub_%s' % time.time() + + w = nsq.Writer(nsqd_tcp_addresses=['127.0.0.1:4150'], **self.identify_options) + + @tornado.gen.coroutine + def trypub(): + yield w.pub(topic, b'{"one": 1}') + yield w.pub(topic, b'{"two": 2}') + self.stop("OK") + + self.io_loop.call_later(0.1, trypub) + result = self.wait() + print(str(result)) + assert result == "OK" From aff4d9b34bcf96554e484d6795987b7f8b6993de Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Thu, 18 Jun 2020 10:56:44 -0400 Subject: [PATCH 5/6] reader: message_handler docs update - may be coroutine --- nsq/reader.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/nsq/reader.py b/nsq/reader.py index 96a6f8f..0c24245 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -49,14 +49,20 @@ class Reader(Client): It maintains a sufficient RDY count based on the # of producers and your configured ``max_in_flight``. - Handlers should be defined as shown in the examples below. The handler receives a - :class:`nsq.Message` object that has instance methods :meth:`nsq.Message.finish`, - :meth:`nsq.Message.requeue`, and :meth:`nsq.Message.touch` to respond to ``nsqd``. - - When messages are not responded to explicitly, it is responsible for sending - ``FIN`` or ``REQ`` commands based on return value of ``message_handler``. When - re-queueing, it will backoff from processing additional messages for an increasing - delay (calculated exponentially based on consecutive failures up to ``max_backoff_duration``). + Handlers should be defined as shown in the examples below. The ``message_handler`` + callback function receives a :class:`nsq.Message` object that has instance methods + :meth:`nsq.Message.finish`, :meth:`nsq.Message.requeue`, and :meth:`nsq.Message.touch` + which can be used to respond to ``nsqd``. As an alternative to explicitly calling these + response methods, the handler function can simply return ``True`` to finish the message, + or ``False`` to requeue it. If the handler function calls :meth:`nsq.Message.enable_async`, + then automatic finish/requeue is disabled, allowing the :class:`nsq.Message` to finish or + requeue in a later async callback or context. The handler function may also be a coroutine, + in which case Message async handling is enabled automatically, but the coroutine + can still return a final value of True/False to automatically finish/requeue the message. + + After re-queueing a message, the handler will backoff from processing additional messages + for an increasing delay (calculated exponentially based on consecutive failures up to + ``max_backoff_duration``). Synchronous example:: From 2fa6c0f71e9806245caa994b7d9e815a9669957e Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Thu, 18 Jun 2020 10:59:26 -0400 Subject: [PATCH 6/6] docs: update tornado requirement to tornado-5.1.1 --- docs/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/requirements.txt b/docs/requirements.txt index 11dd243..46690a7 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1 +1 @@ -tornado==3.0.2 +tornado==5.1.1