Skip to content

Commit

Permalink
Merge pull request #1510 from GSA/main
Browse files Browse the repository at this point in the history
1/2/2025 Production Deploy
  • Loading branch information
ccostino authored Jan 6, 2025
2 parents 1b37fa9 + 5f0dd6a commit 21e61ea
Show file tree
Hide file tree
Showing 15 changed files with 10,429 additions and 409 deletions.
103 changes: 3 additions & 100 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,19 @@
import json
import os
from datetime import timedelta

from botocore.exceptions import ClientError
from flask import current_app
from sqlalchemy.orm.exc import NoResultFound

from app import aws_cloudwatch_client, notify_celery, redis_store
from app import notify_celery, redis_store
from app.clients.email import EmailClientNonRetryableException
from app.clients.email.aws_ses import AwsSesClientThrottlingSendRateException
from app.clients.sms import SmsClientResponseException
from app.config import Config, QueueNames
from app.dao import notifications_dao
from app.dao.notifications_dao import (
sanitize_successful_notification_by_id,
update_notification_status_by_id,
)
from app.dao.notifications_dao import update_notification_status_by_id
from app.delivery import send_to_providers
from app.enums import NotificationStatus
from app.exceptions import NotificationTechnicalFailureException
from app.utils import utc_now

# This is the amount of time to wait after sending an sms message before we check the aws logs and look for delivery
# receipts
DELIVERY_RECEIPT_DELAY_IN_SECONDS = 30


@notify_celery.task(
bind=True,
name="check_sms_delivery_receipt",
max_retries=48,
default_retry_delay=300,
)
def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
"""
This is called after deliver_sms to check the status of the message. This uses the same number of
retries and the same delay period as deliver_sms. In addition, this fires five minutes after
deliver_sms initially. So the idea is that most messages will succeed and show up in the logs quickly.
Other message will resolve successfully after a retry or to. A few will fail but it will take up to
4 hours to know for sure. The call to check_sms will raise an exception if neither a success nor a
failure appears in the cloudwatch logs, so this should keep retrying until the log appears, or until
we run out of retries.
"""
# TODO the localstack cloudwatch doesn't currently have our log groups. Possibly create them with awslocal?
if aws_cloudwatch_client.is_localstack():
status = "success"
provider_response = "this is a fake successful localstack sms message"
carrier = "unknown"
else:
try:
status, provider_response, carrier = aws_cloudwatch_client.check_sms(
message_id, notification_id, sent_at
)
except NotificationTechnicalFailureException as ntfe:
provider_response = "Unable to find carrier response -- still looking"
status = "pending"
carrier = ""
update_notification_status_by_id(
notification_id,
status,
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=ntfe)
except ClientError as err:
# Probably a ThrottlingException but could be something else
error_code = err.response["Error"]["Code"]
provider_response = (
f"{error_code} while checking sms receipt -- still looking"
)
status = "pending"
carrier = ""
update_notification_status_by_id(
notification_id,
status,
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=err)

if status == "success":
status = NotificationStatus.DELIVERED
elif status == "failure":
status = NotificationStatus.FAILED
# if status is not success or failure the client raised an exception and this method will retry

if status == NotificationStatus.DELIVERED:
sanitize_successful_notification_by_id(
notification_id, carrier=carrier, provider_response=provider_response
)
current_app.logger.info(
f"Sanitized notification {notification_id} that was successfully delivered"
)
else:
update_notification_status_by_id(
notification_id,
status,
carrier=carrier,
provider_response=provider_response,
)
current_app.logger.info(
f"Updated notification {notification_id} with response '{provider_response}'"
)


@notify_celery.task(
Expand All @@ -127,17 +39,8 @@ def deliver_sms(self, notification_id):
ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset
)
# Code branches off to send_to_providers.py
message_id = send_to_providers.send_sms_to_provider(notification)
send_to_providers.send_sms_to_provider(notification)

# DEPRECATED
# We have to put it in UTC. For other timezones, the delay
# will be ignored and it will fire immediately (although this probably only affects developer testing)
my_eta = utc_now() + timedelta(seconds=DELIVERY_RECEIPT_DELAY_IN_SECONDS)
check_sms_delivery_receipt.apply_async(
[message_id, notification_id, notification.created_at],
eta=my_eta,
queue=QueueNames.CHECK_SMS,
)
except Exception as e:
update_notification_status_by_id(
notification_id,
Expand Down
71 changes: 59 additions & 12 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
process_job,
process_row,
)
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.config import QueueNames
from app.dao.invited_org_user_dao import (
delete_org_invitations_created_more_than_two_days_ago,
Expand All @@ -22,7 +23,10 @@
find_jobs_with_missing_rows,
find_missing_row_for_job,
)
from app.dao.notifications_dao import notifications_not_yet_sent
from app.dao.notifications_dao import (
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
from app.dao.services_dao import (
dao_find_services_sending_to_tv_numbers,
dao_find_services_with_high_failure_rates,
Expand All @@ -32,6 +36,7 @@
from app.models import Job
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket

MAX_NOTIFICATION_FAILS = 10000
Expand Down Expand Up @@ -95,27 +100,29 @@ def check_job_status():
select
from jobs
where job_status == 'in progress'
and processing started between 30 and 35 minutes ago
and processing started some time ago
OR where the job_status == 'pending'
and the job scheduled_for timestamp is between 30 and 35 minutes ago.
and the job scheduled_for timestamp is some time ago.
if any results then
update the job_status to 'error'
process the rows in the csv that are missing (in another task) just do the check here.
"""
thirty_minutes_ago = utc_now() - timedelta(minutes=30)
thirty_five_minutes_ago = utc_now() - timedelta(minutes=35)
START_MINUTES = 245
END_MINUTES = 240
end_minutes_ago = utc_now() - timedelta(minutes=END_MINUTES)
start_minutes_ago = utc_now() - timedelta(minutes=START_MINUTES)

incomplete_in_progress_jobs = Job.query.filter(
Job.job_status == JobStatus.IN_PROGRESS,
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago),
between(Job.processing_started, start_minutes_ago, end_minutes_ago),
)
incomplete_pending_jobs = Job.query.filter(
Job.job_status == JobStatus.PENDING,
Job.scheduled_for.isnot(None),
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago),
between(Job.scheduled_for, start_minutes_ago, end_minutes_ago),
)

jobs_not_complete_after_30_minutes = (
jobs_not_complete_after_allotted_time = (
incomplete_in_progress_jobs.union(incomplete_pending_jobs)
.order_by(Job.processing_started, Job.scheduled_for)
.all()
Expand All @@ -124,7 +131,7 @@ def check_job_status():
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time.
job_ids = []
for job in jobs_not_complete_after_30_minutes:
for job in jobs_not_complete_after_allotted_time:
job.job_status = JobStatus.ERROR
dao_update_job(job)
job_ids.append(str(job.id))
Expand Down Expand Up @@ -169,9 +176,7 @@ def check_for_missing_rows_in_completed_jobs():
for row_to_process in missing_rows:
row = recipient_csv[row_to_process.missing_row]
current_app.logger.info(
"Processing missing row: {} for job: {}".format(
row_to_process.missing_row, job.id
)
f"Processing missing row: {row_to_process.missing_row} for job: {job.id}"
)
process_row(row, template, job, job.service, sender_id=sender_id)

Expand Down Expand Up @@ -231,3 +236,45 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
technical_ticket=True,
)
zendesk_client.send_ticket_to_zendesk(ticket)


@notify_celery.task(
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
is intentional. We don't mind re-updating things, it is better than losing data.
We also set this to retry with exponential backoff in the case of failure. The only way this would
fail is if, for example the db went down, or redis filled causing the app to stop processing. But if
it does fail, we need to go back over at some point when things are running again and process those results.
"""
try:
batch_size = 1000 # in theory with postgresql this could be 10k to 20k?

cloudwatch = AwsCloudwatchClient()
cloudwatch.init_app(current_app)
start_time = aware_utcnow() - timedelta(minutes=10)
end_time = aware_utcnow()
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
start_time, end_time
)
delivered_receipts = list(delivered_receipts)
for i in range(0, len(delivered_receipts), batch_size):
batch = delivered_receipts[i : i + batch_size]
dao_update_delivery_receipts(batch, True)
failed_receipts = list(failed_receipts)
for i in range(0, len(failed_receipts), batch_size):
batch = failed_receipts[i : i + batch_size]
dao_update_delivery_receipts(batch, False)
except Exception as ex:
retry_count = self.request.retries
wait_time = 3600 * 2**retry_count
try:
raise self.retry(ex=ex, countdown=wait_time)
except self.MaxRetriesExceededError:
current_app.logger.error(
"Failed process delivery receipts after max retries"
)
72 changes: 41 additions & 31 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from time import sleep

from celery.signals import task_postrun
from flask import current_app
Expand Down Expand Up @@ -38,9 +39,7 @@ def process_job(job_id, sender_id=None):
start = utc_now()
job = dao_get_job_by_id(job_id)
current_app.logger.info(
"Starting process-job task for job id {} with status: {}".format(
job_id, job.job_status
)
f"Starting process-job task for job id {job_id} with status: {job.job_status}"
)

if job.job_status != JobStatus.PENDING:
Expand All @@ -56,7 +55,7 @@ def process_job(job_id, sender_id=None):
job.job_status = JobStatus.CANCELLED
dao_update_job(job)
current_app.logger.warning(
"Job {} has been cancelled, service {} is inactive".format(
f"Job {job_id} has been cancelled, service {service.id} is inactive".format(
job_id, service.id
)
)
Expand All @@ -70,13 +69,21 @@ def process_job(job_id, sender_id=None):
)

current_app.logger.info(
"Starting job {} processing {} notifications".format(
job_id, job.notification_count
)
f"Starting job {job_id} processing {job.notification_count} notifications"
)

# notify-api-1495 we are going to sleep periodically to give other
# jobs running at the same time a chance to get some of their messages
# sent. Sleep for 1 second after every 3 sends, which gives us throughput
# of about 3600*3 per hour and would keep the queue clear assuming only one sender.
# It will also hopefully eliminate throttling when we send messages which we are
# currently seeing.
count = 0
for row in recipient_csv.get_rows():
process_row(row, template, job, service, sender_id=sender_id)
count = count + 1
if count % 3 == 0:
sleep(1)

# End point/Exit point for message send flow.
job_complete(job, start=start)
Expand Down Expand Up @@ -206,9 +213,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
f"service not allowed to send for job_id {notification.get('job', None)}, aborting"
)
)
current_app.logger.debug(
"SMS {} failed as restricted service".format(notification_id)
)
current_app.logger.debug(f"SMS {notification_id} failed as restricted service")
return

try:
Expand All @@ -218,22 +223,30 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
job = dao_get_job_by_id(job_id)
created_by_id = job.created_by_id

saved_notification = persist_notification(
template_id=notification["template"],
template_version=notification["template_version"],
recipient=notification["to"],
service=service,
personalisation=notification.get("personalisation"),
notification_type=NotificationType.SMS,
api_key_id=None,
key_type=KeyType.NORMAL,
created_at=utc_now(),
created_by_id=created_by_id,
job_id=notification.get("job", None),
job_row_number=notification.get("row_number", None),
notification_id=notification_id,
reply_to_text=reply_to_text,
)
try:
saved_notification = persist_notification(
template_id=notification["template"],
template_version=notification["template_version"],
recipient=notification["to"],
service=service,
personalisation=notification.get("personalisation"),
notification_type=NotificationType.SMS,
api_key_id=None,
key_type=KeyType.NORMAL,
created_at=utc_now(),
created_by_id=created_by_id,
job_id=notification.get("job", None),
job_row_number=notification.get("row_number", None),
notification_id=notification_id,
reply_to_text=reply_to_text,
)
except IntegrityError:
current_app.logger.warning(
f"{NotificationType.SMS}: {notification_id} already exists."
)
# If we don't have the return statement here, we will fall through and end
# up retrying because IntegrityError is a subclass of SQLAlchemyError
return

# Kick off sns process in provider_tasks.py
sn = saved_notification
Expand All @@ -247,11 +260,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)

current_app.logger.debug(
"SMS {} created at {} for job {}".format(
saved_notification.id,
saved_notification.created_at,
notification.get("job", None),
)
f"SMS {saved_notification.id} created at {saved_notification.created_at} "
f"for job {notification.get('job', None)}"
)

except SQLAlchemyError as e:
Expand Down
Loading

0 comments on commit 21e61ea

Please sign in to comment.