Skip to content

Commit

Permalink
add Writer.close() and Client.close(), improve Reader.close()
Browse files Browse the repository at this point in the history
make Reader.close() prevent nsqd_tcp_addresses from reconnecting

Writer.close() also closes conns and stops from reconnecting

Client.close() stops the periodic _check_last_recv_timestamps()
  • Loading branch information
ploxiln committed Jun 21, 2020
1 parent b6d2bc7 commit 1ec059c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
10 changes: 7 additions & 3 deletions nsq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import time
import logging

import tornado.ioloop
from tornado.ioloop import IOLoop, PeriodicCallback

logger = logging.getLogger(__name__)


class Client(object):
def __init__(self, **kwargs):
self.io_loop = tornado.ioloop.IOLoop.current()
tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000).start()
self.io_loop = IOLoop.current()
self.periodic_check = PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000)
self.periodic_check.start()

def _on_connection_identify(self, conn, data, **kwargs):
logger.info('[%s:%s] IDENTIFY sent %r' % (conn.id, self.name, data))
Expand Down Expand Up @@ -75,3 +76,6 @@ def heartbeat(self, conn):
:param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received
"""
pass

def close(self):
self.periodic_check.stop()
14 changes: 11 additions & 3 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def __init__(
self.backoff_block = False
self.backoff_block_completed = True

self._closed = False
self.conns = {}
self.connection_attempts = {}
self.http_client = tornado.httpclient.AsyncHTTPClient()
Expand Down Expand Up @@ -264,15 +265,19 @@ def _run(self):

def close(self):
"""
Closes all connections stops all periodic callbacks
Closes all connections and stops all periodic callbacks
"""
self._closed = True

for conn in self.conns.values():
conn.close()

self.redist_periodic.stop()
if self.query_periodic is not None:
self.query_periodic.stop()

super(Reader, self).close()

def set_message_handler(self, message_handler):
"""
Assigns the callback method to be executed for each message received
Expand Down Expand Up @@ -494,8 +499,8 @@ def connect_to_nsqd(self, host, port):
# only attempt to re-connect once every 10s per destination
# this throttles reconnects to failed endpoints
now = time.time()
last_connect_attempt = self.connection_attempts.get(conn.id)
if last_connect_attempt and last_connect_attempt > now - 10:
last_connect_attempt = self.connection_attempts.get(conn.id, 0)
if last_connect_attempt > now - 10:
return
self.connection_attempts[conn.id] = now

Expand Down Expand Up @@ -559,6 +564,9 @@ def _on_connection_close(self, conn, **kwargs):
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None

if self._closed:
return

if not self.lookupd_http_addresses:
# automatically reconnect to nsqd addresses when not using lookupd
logger.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name)
Expand Down
15 changes: 15 additions & 0 deletions nsq/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, nsqd_tcp_addresses, reconnect_interval=15.0, name=None, **kwa

self.name = name or nsqd_tcp_addresses[0]
self.nsqd_tcp_addresses = nsqd_tcp_addresses
self._closed = False
self.conns = {}

# Verify keyword arguments
Expand Down Expand Up @@ -245,6 +246,9 @@ def _on_connection_close(self, conn, **kwargs):
logger.exception('[%s] uncaught exception in callback', conn.id)

logger.warning('[%s] connection closed', conn.id)
if self._closed:
return

logger.info('[%s] attempting to reconnect in %0.2fs', conn.id, self.reconnect_interval)
reconnect_callback = functools.partial(self.connect_to_nsqd,
host=conn.host, port=conn.port)
Expand All @@ -254,3 +258,14 @@ def _finish_pub(self, conn, data, command, topic, msg):
if isinstance(data, protocol.Error):
logger.error('[%s] failed to %s (%s, %s), data is %s',
conn.id if conn else 'NA', command, topic, msg, data)

def close(self):
"""
Closes all connections and stops all periodic callbacks
"""
self._closed = True

for conn in self.conns.values():
conn.close()

super(Writer, self).close()

0 comments on commit 1ec059c

Please sign in to comment.