From 0d1a98914a4d6df8734973db9a94a75d7b10543d Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 8 Jan 2025 08:44:49 -0800 Subject: [PATCH] cleanup pending notifications --- app/celery/scheduled_tasks.py | 8 ++++++++ app/config.py | 5 +++++ app/dao/jobs_dao.py | 2 +- app/dao/notifications_dao.py | 17 +++++++++++++++++ .../notification_dao/test_notification_dao.py | 18 ++++++++++++++++++ 5 files changed, 49 insertions(+), 1 deletion(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 2dcd570cc..cb0e0886e 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -24,6 +24,7 @@ find_missing_row_for_job, ) from app.dao.notifications_dao import ( + dao_close_out_delivery_receipts, dao_update_delivery_receipts, notifications_not_yet_sent, ) @@ -278,3 +279,10 @@ def process_delivery_receipts(self): current_app.logger.error( "Failed process delivery receipts after max retries" ) + + +@notify_celery.task( + bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts" +) +def cleanup_delivery_receipts(self): + dao_close_out_delivery_receipts() diff --git a/app/config.py b/app/config.py index d3f2a5197..580495731 100644 --- a/app/config.py +++ b/app/config.py @@ -203,6 +203,11 @@ class Config(object): "schedule": timedelta(minutes=2), "options": {"queue": QueueNames.PERIODIC}, }, + "cleanup-delivery-receipts": { + "task": "cleanup-delivery-receipts", + "schedule": timedelta(minutes=82), + "options": {"queue": QueueNames.PERIODIC}, + }, "expire-or-delete-invitations": { "task": "expire-or-delete-invitations", "schedule": timedelta(minutes=66), diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index ddec26956..c969c4b53 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -45,7 +45,7 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id): def dao_get_unfinished_jobs(): stmt = select(Job).filter(Job.processing_finished.is_(None)) - return db.session.execute(stmt).all() + return db.session.execute(stmt).scalars().all() def dao_get_jobs_by_service_id( diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 139f7ae8a..36eeafa92 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -780,3 +780,20 @@ def dao_update_delivery_receipts(receipts, delivered): f"#loadtestperformance batch update query time: \ updated {len(receipts)} notification in {elapsed_time} ms" ) + + +def dao_close_out_delivery_receipts(): + THREE_DAYS_AGO = utc_now() - timedelta(minutes=3) + stmt = ( + update(Notification) + .where( + Notification.status == NotificationStatus.PENDING, + Notification.sent_at < THREE_DAYS_AGO, + ) + .values(status=NotificationStatus.FAILED, provider_response="Technical Failure") + ) + result = db.session.execute(stmt) + current_app.logger.info( + f"Marked {result.rowcount} notifications as technical failures" + ) + db.session.commit() diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 6e09f182a..f6905a749 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -11,6 +11,7 @@ from app import db from app.dao.notifications_dao import ( + dao_close_out_delivery_receipts, dao_create_notification, dao_delete_notifications_by_id, dao_get_last_notification_added_for_job_id, @@ -2026,6 +2027,23 @@ def test_update_delivery_receipts(mocker): assert "provider_response" in kwargs +def test_close_out_delivery_receipts(mocker): + mock_session = mocker.patch("app.dao.notifications_dao.db.session") + mock_update = MagicMock() + mock_where = MagicMock() + mock_values = MagicMock() + mock_update.where.return_value = mock_where + mock_where.values.return_value = mock_values + + mock_session.execute.return_value = None + with patch("app.dao.notifications_dao.update", return_value=mock_update): + dao_close_out_delivery_receipts() + mock_update.where.assert_called_once() + mock_where.values.assert_called_once() + mock_session.execute.assert_called_once_with(mock_values) + mock_session.commit.assert_called_once() + + @pytest.mark.parametrize( "created_at_utc,date_to_check,expected_count", [