Skip to content

Commit

Permalink
merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
Kenneth Kehl committed Jan 16, 2025
2 parents fdf6158 + e9206c3 commit 025a10a
Show file tree
Hide file tree
Showing 19 changed files with 309 additions and 49 deletions.
57 changes: 54 additions & 3 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from datetime import timedelta
import json
from datetime import datetime, timedelta

from flask import current_app
from sqlalchemy import between, select, union
from sqlalchemy.exc import SQLAlchemyError

from app import db, notify_celery, zendesk_client
from app import notify_celery, redis_store, zendesk_client

from app.celery.tasks import (
get_recipient_csv_and_template_and_sender_id,
process_incomplete_jobs,
Expand All @@ -24,6 +26,7 @@
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_batch_insert_notifications,
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
Expand All @@ -34,7 +37,7 @@
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
Expand Down Expand Up @@ -293,3 +296,51 @@ def process_delivery_receipts(self):
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()


@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
batch = []

# TODO We probably need some way to clear the list if
# things go haywire. A command?

# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
current_len = redis_store.llen("message_queue")
with redis_store.pipeline():
# since this list is being fed by other processes, just grab what is available when
# this call is made and process that.

count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
if not notification_dict.get("created_at"):
notification_dict["created_at"] = utc_now()
elif isinstance(notification_dict["created_at"], list):
notification_dict["created_at"] = notification_dict["created_at"][0]
notification = Notification(**notification_dict)
if notification is not None:
batch.append(notification)
try:
dao_batch_insert_notifications(batch)
except Exception:
current_app.logger.exception("Notification batch insert failed")
for n in batch:
# Use 'created_at' as a TTL so we don't retry infinitely
notification_time = n.created_at
if isinstance(notification_time, str):
notification_time = datetime.fromisoformat(n.created_at)
if notification_time < utc_now() - timedelta(seconds=50):
current_app.logger.warning(
f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}"
)
continue
else:
redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n)))
2 changes: 1 addition & 1 deletion app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
[str(saved_notification.id)], queue=QueueNames.SEND_SMS, countdown=60
)

current_app.logger.debug(
Expand Down
11 changes: 11 additions & 0 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,17 @@ def _update_template(id, name, template_type, content, subject):
db.session.commit()


@notify_command(name="clear-redis-list")
@click.option("-n", "--name_of_list", required=True)
def clear_redis_list(name_of_list):
my_len_before = redis_store.llen(name_of_list)
redis_store.ltrim(name_of_list, 1, 0)
my_len_after = redis_store.llen(name_of_list)
current_app.logger.info(
f"Cleared redis list {name_of_list}. Before: {my_len_before} after {my_len_after}"
)


@notify_command(name="update-templates")
def update_templates():
with open(current_app.config["CONFIG_FILES"] + "/templates.json") as f:
Expand Down
5 changes: 5 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ class Config(object):
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"batch-insert-notifications": {
"task": "batch-insert-notifications",
"schedule": 10.0,
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
"task": "expire-or-delete-invitations",
"schedule": timedelta(minutes=66),
Expand Down
37 changes: 36 additions & 1 deletion app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from datetime import timedelta
import os
from datetime import datetime, timedelta
from time import time

from flask import current_app
Expand Down Expand Up @@ -96,6 +97,32 @@ def dao_create_notification(notification):
# notify-api-1454 insert only if it doesn't exist
if not dao_notification_exists(notification.id):
db.session.add(notification)
# There have been issues with invites expiring.
# Ensure the created at value is set and debug.
if notification.notification_type == "email":
orig_time = notification.created_at
now_time = utc_now()
try:
diff_time = now_time - orig_time
except TypeError:
try:
orig_time = datetime.strptime(orig_time, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
orig_time = datetime.strptime(orig_time, "%Y-%m-%d")
diff_time = now_time - orig_time
current_app.logger.error(
f"dao_create_notification orig created at: {orig_time} and now created at: {now_time}"
)
if diff_time.total_seconds() > 300:
current_app.logger.error(
"Something is wrong with notification.created_at in email!"
)
if os.getenv("NOTIFY_ENVIRONMENT") not in ["test"]:
notification.created_at = now_time
dao_update_notification(notification)
current_app.logger.error(
f"Email notification created_at reset to {notification.created_at}"
)


def country_records_delivery(phone_prefix):
Expand Down Expand Up @@ -817,3 +844,11 @@ def dao_close_out_delivery_receipts():
current_app.logger.info(
f"Marked {result.rowcount} notifications as technical failures"
)


def dao_batch_insert_notifications(batch):

db.session.bulk_save_objects(batch)
db.session.commit()
current_app.logger.info(f"Batch inserted notifications: {len(batch)}")
return len(batch)
29 changes: 28 additions & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import CheckConstraint, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr
from sqlalchemy.orm import validates
from sqlalchemy.orm.collections import attribute_mapped_collection

Expand Down Expand Up @@ -1694,6 +1694,33 @@ def get_created_by_email_address(self):
else:
return None

def serialize_for_redis(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for column in obj.__table__.columns:
if column.name == "notification_status":
new_name = "status"
value = getattr(obj, new_name)
elif column.name == "created_at":
if isinstance(obj.created_at, str):
value = obj.created_at
else:
value = (obj.created_at.strftime("%Y-%m-%d %H:%M:%S"),)
elif column.name in ["sent_at", "completed_at"]:
value = None
elif column.name.endswith("_id"):
value = getattr(obj, column.name)
value = str(value)
else:
value = getattr(obj, column.name)
if column.name in ["message_id", "api_key_id"]:
pass # do nothing because we don't have the message id yet
else:
fields[column.name] = value

return fields
raise ValueError("Provided object is not a SQLAlchemy instance")

def serialize_for_csv(self):
serialized = {
"row_number": (
Expand Down
26 changes: 14 additions & 12 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json
import os
import uuid

from flask import current_app
Expand All @@ -11,7 +13,7 @@
dao_notification_exists,
get_notification_by_id,
)
from app.enums import KeyType, NotificationStatus, NotificationType
from app.enums import NotificationStatus, NotificationType
from app.errors import BadRequestError
from app.models import Notification
from app.utils import hilite, utc_now
Expand Down Expand Up @@ -139,18 +141,18 @@ def persist_notification(

# if simulated create a Notification model to return but do not persist the Notification to the dB
if not simulated:
current_app.logger.info("Firing dao_create_notification")
dao_create_notification(notification)
if key_type != KeyType.TEST and current_app.config["REDIS_ENABLED"]:
current_app.logger.info(
"Redis enabled, querying cache key for service id: {}".format(
service.id
if notification.notification_type == NotificationType.SMS:
# it's just too hard with redis and timing to test this here
if os.getenv("NOTIFY_ENVIRONMENT") == "test":
dao_create_notification(notification)
else:
redis_store.rpush(
"message_queue",
json.dumps(notification.serialize_for_redis(notification)),
)
)
else:
dao_create_notification(notification)

current_app.logger.info(
f"{notification_type} {notification_id} created at {notification_created_at}"
)
return notification


Expand All @@ -172,7 +174,7 @@ def send_notification_to_queue_detached(
deliver_task = provider_tasks.deliver_email

try:
deliver_task.apply_async([str(notification_id)], queue=queue)
deliver_task.apply_async([str(notification_id)], queue=queue, countdown=60)
except Exception:
dao_delete_notifications_by_id(notification_id)
raise
Expand Down
3 changes: 2 additions & 1 deletion app/service_invite/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _create_service_invite(invited_user, nonce, state):
"service_name": invited_user.service.name,
"url": url,
}

created_at = utc_now()
saved_notification = persist_notification(
template_id=template.id,
template_version=template.version,
Expand All @@ -78,6 +78,7 @@ def _create_service_invite(invited_user, nonce, state):
api_key_id=None,
key_type=KeyType.NORMAL,
reply_to_text=invited_user.from_user.email_address,
created_at=created_at,
)
saved_notification.personalisation = personalisation
redis_store.set(
Expand Down
19 changes: 19 additions & 0 deletions notifications_utils/clients/redis/redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class RedisClient:
active = False
scripts = {}

def pipeline(self):
return self.redis_store.pipeline()

def init_app(self, app):
self.active = app.config.get("REDIS_ENABLED")
if self.active:
Expand Down Expand Up @@ -156,6 +159,22 @@ def get(self, key, raise_exception=False):

return None

def rpush(self, key, value):
if self.active:
self.redis_store.rpush(key, value)

def lpop(self, key):
if self.active:
return self.redis_store.lpop(key)

def llen(self, key):
if self.active:
return self.redis_store.llen(key)

def ltrim(self, key, start, end):
if self.active:
return self.redis_store.ltrim(key, start, end)

def delete(self, *keys, raise_exception=False):
keys = [prepare_value(k) for k in keys]
if self.active:
Expand Down
1 change: 0 additions & 1 deletion tests/app/celery/test_reporting_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def test_create_nightly_notification_status_triggers_relevant_tasks(
mock_celery = mocker.patch(
"app.celery.reporting_tasks.create_nightly_notification_status_for_service_and_day"
).apply_async

for notification_type in NotificationType:
template = create_template(sample_service, template_type=notification_type)
create_notification(template=template, created_at=notification_date)
Expand Down
Loading

0 comments on commit 025a10a

Please sign in to comment.