Skip to content

Commit

Permalink
Merge branch 'main' into 2199-add-pending-message-data-to-daily-and-u…
Browse files Browse the repository at this point in the history
…ser_daily-stats
  • Loading branch information
Beverly Nguyen authored and Beverly Nguyen committed Jan 22, 2025
2 parents 880237f + e6899ef commit 1fc78d8
Show file tree
Hide file tree
Showing 98 changed files with 1,678 additions and 743 deletions.
8 changes: 4 additions & 4 deletions .ds.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@
"filename": "tests/app/service/test_rest.py",
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
"is_verified": false,
"line_number": 1284,
"line_number": 1285,
"is_secret": false
}
],
Expand Down Expand Up @@ -341,15 +341,15 @@
"filename": "tests/app/user/test_rest.py",
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
"is_verified": false,
"line_number": 108,
"line_number": 110,
"is_secret": false
},
{
"type": "Secret Keyword",
"filename": "tests/app/user/test_rest.py",
"hashed_secret": "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33",
"is_verified": false,
"line_number": 826,
"line_number": 864,
"is_secret": false
}
],
Expand Down Expand Up @@ -384,5 +384,5 @@
}
]
},
"generated_at": "2024-10-31T21:25:32Z"
"generated_at": "2024-12-19T19:09:50Z"
}
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ test: export NEW_RELIC_ENVIRONMENT=test
test: ## Run tests and create coverage report
poetry run black .
poetry run flake8 .
poetry run isort --check-only ./app ./tests
poetry run isort ./app ./tests
poetry run coverage run --omit=*/migrations/*,*/tests/* -m pytest --maxfail=10

## TODO set this back to 95 asap
Expand Down
16 changes: 15 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy

from app import config
from app.clients import NotificationProviderClients
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.clients.document_download import DocumentDownloadClient
Expand Down Expand Up @@ -58,15 +59,28 @@ class SQLAlchemy(_SQLAlchemy):

def apply_driver_hacks(self, app, info, options):
sa_url, options = super().apply_driver_hacks(app, info, options)

if "connect_args" not in options:
options["connect_args"] = {}
options["connect_args"]["options"] = "-c statement_timeout={}".format(
int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000
)

return (sa_url, options)


db = SQLAlchemy()
# Set db engine settings here for now.
# They were not being set previous (despite environmental variables with appropriate
# sounding names) and were defaulting to low values
db = SQLAlchemy(
engine_options={
"pool_size": config.Config.SQLALCHEMY_POOL_SIZE,
"max_overflow": 10,
"pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT,
"pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE,
"pool_pre_ping": True,
}
)
migrate = Migrate()
ma = Marshmallow()
notify_celery = NotifyCelery()
Expand Down
3 changes: 2 additions & 1 deletion app/billing/rest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from flask import Blueprint, jsonify, request

from app import db
from app.billing.billing_schemas import (
create_or_update_free_sms_fragment_limit_schema,
serialize_ft_billing_remove_emails,
Expand Down Expand Up @@ -60,7 +61,7 @@ def get_free_sms_fragment_limit(service_id):
)

if annual_billing is None:
service = Service.query.get(service_id)
service = db.session.get(Service, service_id)
# An entry does not exist in annual_billing table for that service and year.
# Set the annual billing to the default free allowance based on the organization type of the service.

Expand Down
95 changes: 80 additions & 15 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import timedelta
import json
from datetime import datetime, timedelta

from flask import current_app
from sqlalchemy import between
from sqlalchemy import between, select, union
from sqlalchemy.exc import SQLAlchemyError

from app import notify_celery, zendesk_client
from app import db, notify_celery, redis_store, zendesk_client
from app.celery.tasks import (
get_recipient_csv_and_template_and_sender_id,
process_incomplete_jobs,
Expand All @@ -19,11 +20,13 @@
from app.dao.invited_user_dao import expire_invitations_created_more_than_two_days_ago
from app.dao.jobs_dao import (
dao_set_scheduled_jobs_to_pending,
dao_update_job,
dao_update_job_status_to_error,
find_jobs_with_missing_rows,
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_batch_insert_notifications,
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
Expand All @@ -33,7 +36,7 @@
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
Expand Down Expand Up @@ -112,30 +115,34 @@ def check_job_status():
end_minutes_ago = utc_now() - timedelta(minutes=END_MINUTES)
start_minutes_ago = utc_now() - timedelta(minutes=START_MINUTES)

incomplete_in_progress_jobs = Job.query.filter(
incomplete_in_progress_jobs = select(Job).where(
Job.job_status == JobStatus.IN_PROGRESS,
between(Job.processing_started, start_minutes_ago, end_minutes_ago),
)
incomplete_pending_jobs = Job.query.filter(
incomplete_pending_jobs = select(Job).where(
Job.job_status == JobStatus.PENDING,
Job.scheduled_for.isnot(None),
between(Job.scheduled_for, start_minutes_ago, end_minutes_ago),
)

jobs_not_complete_after_allotted_time = (
incomplete_in_progress_jobs.union(incomplete_pending_jobs)
.order_by(Job.processing_started, Job.scheduled_for)
.all()
jobs_not_completed_after_allotted_time = union(
incomplete_in_progress_jobs, incomplete_pending_jobs
)
jobs_not_completed_after_allotted_time = (
jobs_not_completed_after_allotted_time.order_by(
Job.processing_started, Job.scheduled_for
)
)

jobs_not_complete_after_allotted_time = db.session.execute(
jobs_not_completed_after_allotted_time
).all()

# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time.
job_ids = []
for job in jobs_not_complete_after_allotted_time:
job.job_status = JobStatus.ERROR
dao_update_job(job)
dao_update_job_status_to_error(job)
job_ids.append(str(job.id))

if job_ids:
current_app.logger.info("Job(s) {} have not completed.".format(job_ids))
process_incomplete_jobs.apply_async([job_ids], queue=QueueNames.JOBS)
Expand Down Expand Up @@ -165,6 +172,7 @@ def replay_created_notifications():

@notify_celery.task(name="check-for-missing-rows-in-completed-jobs")
def check_for_missing_rows_in_completed_jobs():

jobs = find_jobs_with_missing_rows()
for job in jobs:
(
Expand Down Expand Up @@ -242,6 +250,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
# If we need to check db settings do it here for convenience
# current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}")
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
Expand Down Expand Up @@ -278,3 +288,58 @@ def process_delivery_receipts(self):
current_app.logger.error(
"Failed process delivery receipts after max retries"
)


@notify_celery.task(
bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts"
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()


@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
batch = []

# TODO We probably need some way to clear the list if
# things go haywire. A command?

# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
current_len = redis_store.llen("message_queue")
with redis_store.pipeline():
# since this list is being fed by other processes, just grab what is available when
# this call is made and process that.

count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
if not notification_dict.get("created_at"):
notification_dict["created_at"] = utc_now()
elif isinstance(notification_dict["created_at"], list):
notification_dict["created_at"] = notification_dict["created_at"][0]
notification = Notification(**notification_dict)
if notification is not None:
batch.append(notification)
try:
dao_batch_insert_notifications(batch)
except Exception:
current_app.logger.exception("Notification batch insert failed")
for n in batch:
# Use 'created_at' as a TTL so we don't retry infinitely
notification_time = n.created_at
if isinstance(notification_time, str):
notification_time = datetime.fromisoformat(n.created_at)
if notification_time < utc_now() - timedelta(seconds=50):
current_app.logger.warning(
f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}"
)
continue
else:
redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n)))
2 changes: 1 addition & 1 deletion app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
[str(saved_notification.id)], queue=QueueNames.SEND_SMS, countdown=60
)

current_app.logger.debug(
Expand Down
3 changes: 1 addition & 2 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
max_pool_connections=10,
max_pool_connections=50, # This should be equal or greater than our celery concurrency
)


Expand Down
15 changes: 13 additions & 2 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def populate_annual_billing_with_defaults(year, missing_services_only):
AnnualBilling.financial_year_start == year,
),
)
.filter(AnnualBilling.id == None) # noqa
.where(AnnualBilling.id == None) # noqa
)
active_services = db.session.execute(stmt).scalars().all()
else:
Expand All @@ -665,7 +665,7 @@ def populate_annual_billing_with_defaults(year, missing_services_only):
previous_year = year - 1
services_with_zero_free_allowance = (
db.session.query(AnnualBilling.service_id)
.filter(
.where(
AnnualBilling.financial_year_start == previous_year,
AnnualBilling.free_sms_fragment_limit == 0,
)
Expand Down Expand Up @@ -789,6 +789,17 @@ def _update_template(id, name, template_type, content, subject):
db.session.commit()


@notify_command(name="clear-redis-list")
@click.option("-n", "--name_of_list", required=True)
def clear_redis_list(name_of_list):
my_len_before = redis_store.llen(name_of_list)
redis_store.ltrim(name_of_list, 1, 0)
my_len_after = redis_store.llen(name_of_list)
current_app.logger.info(
f"Cleared redis list {name_of_list}. Before: {my_len_before} after {my_len_after}"
)


@notify_command(name="update-templates")
def update_templates():
with open(current_app.config["CONFIG_FILES"] + "/templates.json") as f:
Expand Down
24 changes: 23 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from datetime import datetime, timedelta
from os import getenv, path

from boto3 import Session
from celery.schedules import crontab
from kombu import Exchange, Queue

import notifications_utils
from app.clients import AWS_CLIENT_CONFIG
from app.cloudfoundry_config import cloud_config


Expand Down Expand Up @@ -51,6 +53,13 @@ class TaskNames(object):
SCAN_FILE = "scan-file"


session = Session(
aws_access_key_id=getenv("CSV_AWS_ACCESS_KEY_ID"),
aws_secret_access_key=getenv("CSV_AWS_SECRET_ACCESS_KEY"),
region_name=getenv("CSV_AWS_REGION"),
)


class Config(object):
NOTIFY_APP_NAME = "api"
DEFAULT_REDIS_EXPIRE_TIME = 4 * 24 * 60 * 60
Expand Down Expand Up @@ -81,7 +90,7 @@ class Config(object):
SQLALCHEMY_DATABASE_URI = cloud_config.database_url
SQLALCHEMY_RECORD_QUERIES = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5))
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40))
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = 300
SQLALCHEMY_STATEMENT_TIMEOUT = 1200
Expand Down Expand Up @@ -166,6 +175,9 @@ class Config(object):

current_minute = (datetime.now().minute + 1) % 60

S3_CLIENT = session.client("s3")
S3_RESOURCE = session.resource("s3", config=AWS_CLIENT_CONFIG)

CELERY = {
"worker_max_tasks_per_child": 500,
"task_ignore_result": True,
Expand Down Expand Up @@ -203,6 +215,16 @@ class Config(object):
"schedule": timedelta(minutes=2),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-delivery-receipts": {
"task": "cleanup-delivery-receipts",
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"batch-insert-notifications": {
"task": "batch-insert-notifications",
"schedule": 10.0,
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
"task": "expire-or-delete-invitations",
"schedule": timedelta(minutes=66),
Expand Down
Loading

0 comments on commit 1fc78d8

Please sign in to comment.