diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 9fcfeeb04..12c721114 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -292,39 +292,42 @@ def cleanup_delivery_receipts(self): @notify_celery.task(bind=True, name="batch-insert-notifications") def batch_insert_notifications(self): - current_app.logger.info("ENTER SCHEDULED TASK") batch = [] + + # TODO We probably need some way to clear the list if + # things go haywire. A command? + # with redis_store.pipeline(): # while redis_store.llen("message_queue") > 0: # redis_store.lpop("message_queue") # current_app.logger.info("EMPTY!") # return + current_len = redis_store.llen("message_queue") with redis_store.pipeline(): - current_app.logger.info("PIPELINE") - # since this list is always growing, just grab what is available when + # since this list is being fed by other processes, just grab what is available when # this call is made and process that. - current_len = redis_store.llen("message_queue") + count = 0 while count < current_len: count = count + 1 notification_bytes = redis_store.lpop("message_queue") notification_dict = json.loads(notification_bytes.decode("utf-8")) notification_dict["status"] = notification_dict.pop("notification_status") - notification_dict["created_at"] = utc_now() + if not notification_dict.get("created_at"): + notification_dict["created_at"] = utc_now() notification = Notification(**notification_dict) - current_app.logger.info( - f"WHAT IS THIS NOTIFICATION {type(notification)} {notification}" - ) if notification is not None: - current_app.logger.info( - f"SCHEDULED adding notification {notification.id} to batch" - ) batch.append(notification) try: - current_app.logger.info("GOING TO DO BATCH INSERT") dao_batch_insert_notifications(batch) except Exception as e: current_app.logger.exception(f"Notification batch insert failed {e}") - - for msg in batch: - redis_store.rpush("notification_queue", json.dumps(msg)) + for n in batch: + # Use 'created_at' as a TTL so we don't retry infinitely + if n.created_at < utc_now() - timedelta(minutes=1): + current_app.logger.warning( + f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}" + ) + continue + else: + redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n))) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 92dcc234c..fece5b3d2 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -2,7 +2,6 @@ from datetime import timedelta from time import time -import sqlalchemy from flask import current_app from sqlalchemy import ( TIMESTAMP, @@ -803,11 +802,8 @@ def dao_close_out_delivery_receipts(): def dao_batch_insert_notifications(batch): - current_app.logger.info("DOING BATCH INSERT IN DAO") - try: - db.session.bulk_save_objects(batch) - db.session.commit() - current_app.logger.info(f"SUCCESSFULLY INSERTED: {len(batch)}") - return len(batch) - except sqlalchemy.exc.SQLAlchemyError as e: - current_app.logger.exception(f"Error during batch insert {e}") + + db.session.bulk_save_objects(batch) + db.session.commit() + current_app.logger.info(f"Batch inserted notifications: {len(batch)}") + return len(batch) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 0c285ea94..8b5fc6be9 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -1,12 +1,14 @@ +import json from collections import namedtuple from datetime import timedelta from unittest import mock -from unittest.mock import ANY, call +from unittest.mock import ANY, MagicMock, call import pytest from app.celery import scheduled_tasks from app.celery.scheduled_tasks import ( + batch_insert_notifications, check_for_missing_rows_in_completed_jobs, check_for_services_with_high_failure_rates_or_sending_to_tv_numbers, check_job_status, @@ -523,3 +525,25 @@ def test_check_for_services_with_high_failure_rates_or_sending_to_tv_numbers( technical_ticket=True, ) mock_send_ticket_to_zendesk.assert_called_once() + + +def test_batch_insert_with_valid_notifications(mocker): + mocker.patch("app.celery.scheduled_tasks.dao_batch_insert_notifications") + rs = MagicMock() + mocker.patch("app.celery.scheduled_tasks.redis_store", rs) + notifications = [ + {"id": 1, "notification_status": "pending"}, + {"id": 2, "notification_status": "pending"}, + ] + serialized_notifications = [json.dumps(n).encode("utf-8") for n in notifications] + + pipeline_mock = MagicMock() + + rs.pipeline.return_value.__enter__.return_value = pipeline_mock + rs.llen.return_value = len(notifications) + rs.lpop.side_effect = serialized_notifications + + batch_insert_notifications() + + rs.llen.assert_called_once_with("message_queue") + rs.lpop.assert_called_with("message_queue")