diff --git a/nsq/reader.py b/nsq/reader.py index da1a205..c7e0016 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -9,6 +9,8 @@ import cgi import warnings import inspect +import datetime +import os.path try: import simplejson as json @@ -131,6 +133,8 @@ def process_message(message): :param max_backoff_duration: the maximum time we will allow a backoff state to last in seconds + :param data_path: the directory failed messages will be archived to after ``max_tries`` + :param \*\*kwargs: passed to :class:`nsq.AsyncConn` initialization """ def __init__( @@ -149,6 +153,7 @@ def __init__( lookupd_poll_jitter=0.3, lookupd_connect_timeout=1, lookupd_request_timeout=2, + data_path=None, **kwargs): super(Reader, self).__init__(**kwargs) @@ -161,6 +166,7 @@ def __init__( assert isinstance(lookupd_poll_jitter, float) assert isinstance(lookupd_connect_timeout, int) assert isinstance(lookupd_request_timeout, int) + assert isinstance(data_path, (str, unicode, None.__class__)) assert lookupd_poll_jitter >= 0 and lookupd_poll_jitter <= 1 @@ -222,6 +228,7 @@ def __init__( self.redist_periodic = None self.query_periodic = None + self.data_path = data_path def _run(self): assert self.message_handler, "you must specify the Reader's message_handler" @@ -702,14 +709,31 @@ def process_message(self, message): def giving_up(self, message): """ - Called when a message has been received where ``msg.attempts > max_tries`` + Called when a message has been received where ``msg.attempts > max_tries``. + + Failed messages will be archived to ``$data_path/$channel_$topic/%Y%m%d-%H%M%S-%f_$sequence.failed.msg`` - This is useful to subclass and override to perform a task (such as writing to disk, etc.) + This is useful to subclass and override to perform a custom task (such as writing to disk, etc.) :param message: the :class:`nsq.Message` received """ - logger.warning('[%s] giving up on message %s after %d tries (max:%d) %r', - self.name, message.id, message.attempts, self.max_tries, message.body) + + self.failed_count += 1 + + path = os.path.join(self.data_path or "", self.topic + '_' + self.channel) + if not os.path.exists(path): + os.makedirs(path) + + date_str = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S-%f") + filename = "%s_%d.failed.msg" % (date_str, self.failed_count) + filename = os.path.join(path, filename) + + logging.warning('[%s] giving up on message %s after %d tries (max:%d). Archived to %s %r', + self.name, message.id, message.attempts, self.max_tries, filename, message.body) + + f = open(filename, 'wb') + f.write(message.body + '\n') + f.close() def _on_connection_identify_response(self, conn, data, **kwargs):