Skip to content

Commit

Permalink
add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
Kenneth Kehl committed Jan 13, 2025
1 parent 2847046 commit a92eb91
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 25 deletions.
33 changes: 18 additions & 15 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,39 +292,42 @@ def cleanup_delivery_receipts(self):

@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
current_app.logger.info("ENTER SCHEDULED TASK")
batch = []

# TODO We probably need some way to clear the list if
# things go haywire. A command?

# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
current_len = redis_store.llen("message_queue")
with redis_store.pipeline():
current_app.logger.info("PIPELINE")
# since this list is always growing, just grab what is available when
# since this list is being fed by other processes, just grab what is available when
# this call is made and process that.
current_len = redis_store.llen("message_queue")

count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
notification_dict["created_at"] = utc_now()
if not notification_dict.get("created_at"):
notification_dict["created_at"] = utc_now()
notification = Notification(**notification_dict)
current_app.logger.info(
f"WHAT IS THIS NOTIFICATION {type(notification)} {notification}"
)
if notification is not None:
current_app.logger.info(
f"SCHEDULED adding notification {notification.id} to batch"
)
batch.append(notification)
try:
current_app.logger.info("GOING TO DO BATCH INSERT")
dao_batch_insert_notifications(batch)
except Exception as e:
current_app.logger.exception(f"Notification batch insert failed {e}")

for msg in batch:
redis_store.rpush("notification_queue", json.dumps(msg))
for n in batch:
# Use 'created_at' as a TTL so we don't retry infinitely
if n.created_at < utc_now() - timedelta(minutes=1):
current_app.logger.warning(
f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}"
)
continue
else:
redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n)))
14 changes: 5 additions & 9 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import timedelta
from time import time

import sqlalchemy
from flask import current_app
from sqlalchemy import (
TIMESTAMP,
Expand Down Expand Up @@ -803,11 +802,8 @@ def dao_close_out_delivery_receipts():


def dao_batch_insert_notifications(batch):
current_app.logger.info("DOING BATCH INSERT IN DAO")
try:
db.session.bulk_save_objects(batch)
db.session.commit()
current_app.logger.info(f"SUCCESSFULLY INSERTED: {len(batch)}")
return len(batch)
except sqlalchemy.exc.SQLAlchemyError as e:
current_app.logger.exception(f"Error during batch insert {e}")

db.session.bulk_save_objects(batch)
db.session.commit()
current_app.logger.info(f"Batch inserted notifications: {len(batch)}")
return len(batch)
26 changes: 25 additions & 1 deletion tests/app/celery/test_scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
from collections import namedtuple
from datetime import timedelta
from unittest import mock
from unittest.mock import ANY, call
from unittest.mock import ANY, MagicMock, call

import pytest

from app.celery import scheduled_tasks
from app.celery.scheduled_tasks import (
batch_insert_notifications,
check_for_missing_rows_in_completed_jobs,
check_for_services_with_high_failure_rates_or_sending_to_tv_numbers,
check_job_status,
Expand Down Expand Up @@ -523,3 +525,25 @@ def test_check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(
technical_ticket=True,
)
mock_send_ticket_to_zendesk.assert_called_once()


def test_batch_insert_with_valid_notifications(mocker):
mocker.patch("app.celery.scheduled_tasks.dao_batch_insert_notifications")
rs = MagicMock()
mocker.patch("app.celery.scheduled_tasks.redis_store", rs)
notifications = [
{"id": 1, "notification_status": "pending"},
{"id": 2, "notification_status": "pending"},
]
serialized_notifications = [json.dumps(n).encode("utf-8") for n in notifications]

pipeline_mock = MagicMock()

rs.pipeline.return_value.__enter__.return_value = pipeline_mock
rs.llen.return_value = len(notifications)
rs.lpop.side_effect = serialized_notifications

batch_insert_notifications()

rs.llen.assert_called_once_with("message_queue")
rs.lpop.assert_called_with("message_queue")

0 comments on commit a92eb91

Please sign in to comment.