Skip to content

Commit

Permalink
Merge pull request #290 from vipyrsec/revert-238-jobs-cte
Browse files Browse the repository at this point in the history
Revert "Use a CTE for the `/POST jobs` endpoint"
  • Loading branch information
jonathan-d-zhang authored Jul 19, 2024
2 parents 642f40b + 852f44a commit 34aa964
Showing 1 changed file with 23 additions and 28 deletions.
51 changes: 23 additions & 28 deletions src/mainframe/endpoints/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

import structlog
from fastapi import APIRouter, Depends
from sqlalchemy import and_, or_, select, update
from sqlalchemy.orm import Session, joinedload, aliased
from sqlalchemy import and_, or_, select
from sqlalchemy.orm import Session, joinedload

from mainframe.constants import mainframe_settings
from mainframe.database import get_db
Expand Down Expand Up @@ -41,38 +41,33 @@ def get_jobs(
"""

with session, session.begin():
# Use a CTE to limit the number of rows we fetch
cte = (
select(Scan)
.where(
or_(
Scan.status == Status.QUEUED,
and_(
Scan.pending_at
< datetime.now(timezone.utc) - timedelta(seconds=mainframe_settings.job_timeout),
Scan.status == Status.PENDING,
),
scans = (
session.scalars(
select(Scan)
.where(
or_(
Scan.status == Status.QUEUED,
and_(
Scan.pending_at
< datetime.now(timezone.utc) - timedelta(seconds=mainframe_settings.job_timeout),
Scan.status == Status.PENDING,
),
)
)
.order_by(Scan.pending_at.nulls_first(), Scan.queued_at)
.limit(batch)
.options(joinedload(Scan.download_urls))
)
.order_by(Scan.pending_at.nulls_first(), Scan.queued_at)
.limit(batch)
.options(joinedload(Scan.download_urls))
.with_for_update(skip_locked=True)
.cte()
)

scan_cte = aliased(Scan, cte)

# Uses a Postgres `UPDATE .. FROM`. https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#update-from
scans = session.scalars(
update(Scan)
.where(Scan.scan_id == scan_cte.scan_id)
.values(status=Status.PENDING, pending_at=datetime.now(timezone.utc), pending_by=auth.subject)
.returning(Scan)
.unique()
.all()
)

response_body: list[JobResult] = []
for scan in scans:
scan.status = Status.PENDING
scan.pending_at = datetime.now(timezone.utc)
scan.pending_by = auth.subject

logger.info(
"Job given and status set to pending in database",
package={
Expand Down

0 comments on commit 34aa964

Please sign in to comment.