Skip to content

Commit

Permalink
Celery distributed jobs (#79)
Browse files Browse the repository at this point in the history
* Remove support for "second" level schedules

* Install celery and packages

* Add custom celery OpenSearch backend

* Configure celery and celery worker

* Add celery tasks endpoints to frontend

* Change validate and suggest response format

* Remove env load from models task

* Fix validate and suggest tests

* ESLint fix

* bump actions/setup-python@v4

* PyCurl

* Install curl dependencies

* Install packages

* Update apt-get

* Update ap_scheduler to use celery

* Remove return value from suggestions celery task

* Frontend error handling

* Fix _get_object_from_dict and add Task model

* update manual_integration_test, add defaults

* Remove task ready endpoint

* Remove dependency on AsyncResult

* Remove username/password from result backend

* Remove python serializers

* Add tasks tests

* Use opensearch_save_meta_as_text=False

* Add task tests and TaskResult pre

* Swiple worker --loglevel=warning
  • Loading branch information
KentonParton authored Apr 22, 2023
1 parent 1f8778d commit ffc02d5
Show file tree
Hide file tree
Showing 34 changed files with 1,279 additions and 194 deletions.
7 changes: 6 additions & 1 deletion .github/actions/test-backend/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ runs:

steps:
- name: Set up Python 3.9
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: "3.9"
- name: Install packages
shell: bash
run: |
sudo apt-get update
sudo apt-get install libcurl4-gnutls-dev libgnutls28-dev
- name: Install dependencies
shell: bash
working-directory: backend
Expand Down
10 changes: 8 additions & 2 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ FROM python-base as builder-base
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
build-essential
build-essential \
libcurl4-openssl-dev \
libssl-dev

# install poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://install.python-poetry.org | python3 -
Expand All @@ -43,6 +45,10 @@ RUN poetry install --with postgres,redshift,mysql,trino,athena,snowflake,bigquer
FROM python-base as production
COPY --from=builder-base $PYSETUP_PATH $PYSETUP_PATH

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl

WORKDIR $PYSETUP_PATH
COPY app $PYSETUP_PATH/app
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
54 changes: 30 additions & 24 deletions backend/app/api/api_v1/endpoints/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
from app.api.shortcuts import delete_by_key_or_404, get_by_key_or_404
from app.core.sample import GetSampleException, get_dataset_sample
from app.core.users import current_active_user
from app.db.client import get_client
from app.models.dataset import BaseDataset, Dataset, DatasetCreate, DatasetUpdate, Sample
from app.models.validation import Validation
from app.repositories.base import NotFoundError
from app.models.task import TaskStatus, TaskIdResponse, TaskResultResponse
from app.repositories.dataset import DatasetRepository, get_dataset_repository
from app.repositories.datasource import DatasourceRepository, get_datasource_repository
from app.repositories.expectation import ExpectationRepository, get_expectation_repository
from app.repositories.task import get_task_repository, TaskRepository
from app.repositories.validation import get_validation_repository, ValidationRepository
from app.settings import settings
from app.models.users import UserDB
from app.core.runner import create_dataset_suggestions, run_dataset_validation
from opensearchpy import OpenSearch, RequestError
from opensearchpy import RequestError
import requests
from app.worker.tasks.validation import run_validation
from app.worker.tasks.suggestions import run_suggestions

router = APIRouter(
dependencies=[Depends(current_active_user)]
Expand Down Expand Up @@ -190,32 +190,38 @@ def update_sample(
return dataset


@router.post("/{key}/validate", response_model=Validation)
def validate_dataset(key: str, client: OpenSearch = Depends(get_client)):
try:
validation: Validation = run_dataset_validation(key, client)
except NotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) from e
@router.post("/{key}/validate", response_model=TaskIdResponse)
def validate_dataset(
key: str,
repository: DatasetRepository = Depends(get_dataset_repository),
):
get_by_key_or_404(key, repository)
task = run_validation.delay(dataset_id=key)
return {"task_id": task.id}

return validation

@router.get("/{key}/tasks", response_model=list[TaskResultResponse])
def get_tasks_by_dataset_id(
key: str,
status: Optional[TaskStatus] = None,
dataset_repository: DatasetRepository = Depends(get_dataset_repository),
repository: TaskRepository = Depends(get_task_repository),
):
get_by_key_or_404(key, dataset_repository)
return repository.query_by_dataset_id(
key,
status=status,
)

@router.post("/{key}/suggest")

@router.post("/{key}/suggest", response_model=TaskIdResponse)
def create_suggestions(
key: str,
client: OpenSearch = Depends(get_client),
repository: DatasetRepository = Depends(get_dataset_repository),
expectation_repository: ExpectationRepository = Depends(get_expectation_repository),
):
dataset = get_by_key_or_404(key, repository)

results = create_dataset_suggestions(key, client)
expectations = [expectation_repository._get_object_from_dict(e) for e in results]

expectation_repository.delete_by_filter(dataset_id=dataset.key, suggested=True, enabled=False)
expectation_repository.bulk_create(expectations)

return expectations
get_by_key_or_404(key, repository)
task = run_suggestions.delay(dataset_id=key)
return {"task_id": task.id}


def should_update_sample(dataset: Dataset, dataset_update: DatasetUpdate):
Expand Down
18 changes: 18 additions & 0 deletions backend/app/api/api_v1/endpoints/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from fastapi import APIRouter
from fastapi.params import Depends
from app.core.users import current_active_user
from app.models.task import TaskResultResponse
from app.repositories.task import TaskRepository, get_task_repository


router = APIRouter(
dependencies=[Depends(current_active_user)]
)


@router.get("/{task_id}", response_model=TaskResultResponse)
def get_task(
task_id: str,
repository: TaskRepository = Depends(get_task_repository),
):
return repository.get(task_id)
2 changes: 2 additions & 0 deletions backend/app/api/api_v1/swiple_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
destination,
action,
user,
task,
)

router = APIRouter()
Expand All @@ -27,3 +28,4 @@
router.include_router(destination.router, prefix="/destinations", tags=["Destinations"])
router.include_router(action.router, prefix="/actions", tags=["Actions"])
router.include_router(user.router, prefix="/user", tags=["Users"])
router.include_router(task.router, prefix="/tasks", tags=["Tasks"])
13 changes: 13 additions & 0 deletions backend/app/celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use for local development only
import os
from dotenv import load_dotenv


if __name__ == '__main__':
# Load environment variables from .env file
dotenv_path = os.path.join(os.path.dirname(__file__), '../../docker/.env-local')
load_dotenv(dotenv_path)

# Start the Celery worker
from app.worker.app import celery_app
celery_app.worker_main(['--app', 'app.worker.app.celery_app', 'worker', '-l', 'info', '-c', '4', '-Ofair', '--without-heartbeat', '--without-gossip', '--without-mingle'])
2 changes: 0 additions & 2 deletions backend/app/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
END_DATE = "Latest possible date/time to trigger on (inclusive)"

# CronTrigger
SECOND = "second (0-59)"
MINUTE = "minute (0-59)"
HOUR = "hour (0-23)"
DAY_OF_WEEK = "Number or name of weekday (0-5 or mon,tue,wed,thu,fri,sat,sun)"
Expand All @@ -75,7 +74,6 @@
YEAR = "4-digit year"

# IntervalTrigger
SECONDS = "Number of seconds to wait"
MINUTES = "Number of minutes to wait"
HOURS = "Number of hours to wait"
DAYS = "Number of days to wait"
Expand Down
12 changes: 3 additions & 9 deletions backend/app/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def validate(self) -> Validation:
validation = validator.validate().to_json_dict()
validation["meta"]["run_id"]["run_time"] = utils.remove_t_from_date_string(
validation["meta"]["run_id"]["run_time"])
validation["meta"]["run_id"]["run_name"] = str(uuid.uuid4())
validation["meta"]["run_id"]["run_name"] = self.identifiers.pop("task_id")
validation["meta"].update(self.identifiers)

for result in validation["results"]:
Expand Down Expand Up @@ -258,14 +258,15 @@ def _get_status(success: bool) -> Literal["success", "failure"]:
return action_status


def run_dataset_validation(dataset_id: str, client: OpenSearch = os_client):
def run_dataset_validation(dataset_id: str, task_id: str, client: OpenSearch = os_client):
dataset = DatasetRepository(client).get(dataset_id)
datasource = DatasourceRepository(client).get(dataset.datasource_id)
expectations = ExpectationRepository(client).query_by_filter(dataset_id=dataset.key, enabled=True)

identifiers = {
"datasource_id": datasource.key,
"dataset_id": dataset.key,
"task_id": task_id,
}

meta = {
Expand All @@ -287,13 +288,6 @@ def run_dataset_validation(dataset_id: str, client: OpenSearch = os_client):
identifiers=identifiers,
).validate()

client.index(
index=settings.VALIDATION_INDEX,
id=str(uuid.uuid4()),
body=validation.dict(),
refresh="wait_for",
)

return validation


Expand Down
9 changes: 4 additions & 5 deletions backend/app/core/schedulers/ap_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import List, Dict

from app.core.runner import run_dataset_validation
from app.core.schedulers.scheduler_interface import SchedulerInterface
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
Expand All @@ -12,7 +11,7 @@
from pytz import utc
from app.settings import settings
import app.constants as c

from app.worker.tasks.validation import run_validation
from app.models.schedule import Schedule
import uuid
import datetime
Expand Down Expand Up @@ -55,7 +54,7 @@ def shutdown(self):
def add_schedule(self, schedule: Schedule, datasource_id: str, dataset_id: str):
return self.ap_scheduler.add_job(
id=f"{datasource_id}__{dataset_id}__{uuid.uuid4()}",
func=run_dataset_validation,
func=run_validation.delay,
kwargs={"dataset_id": dataset_id},
misfire_grace_time=schedule.misfire_grace_time,
max_instances=schedule.max_instances,
Expand Down Expand Up @@ -114,11 +113,9 @@ def to_dict(self, job: Job):
"trigger": c.INTERVAL,
"start_date": job.trigger.start_date,
"end_date": job.trigger.end_date,
"seconds": seconds,
"minutes": minutes,
"hours": hours,
"days": days,
"weeks": weeks,
}
if isinstance(job.trigger, DateTrigger):
trigger_fields["trigger"] = c.DATE
Expand All @@ -128,6 +125,8 @@ def to_dict(self, job: Job):
trigger_fields["start_date"] = job.trigger.start_date
trigger_fields["end_date"] = job.trigger.end_date
for field in job.trigger.fields:
if field.name == "second":
continue
trigger_fields[field.name] = str(field)

job_state["trigger"] = trigger_fields
Expand Down
6 changes: 1 addition & 5 deletions backend/app/models/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ class Config:
trigger: Literal["interval"]
start_date: Optional[datetime] = Field(description=c.START_DATE)
end_date: Optional[datetime] = Field(description=c.END_DATE)
seconds: Optional[int] = Field(0, description=c.SECONDS, const=True)
minutes: Optional[int] = Field(description=c.MINUTES)
hours: Optional[int] = Field(description=c.HOURS)
days: Optional[int] = Field(description=c.DAYS)
weeks: Optional[int] = Field(description=c.WEEKS)

@validator("end_date")
def start_date_before_end_date(cls, v, values):
Expand All @@ -39,7 +37,6 @@ class Config:
trigger: Literal["cron"]
start_date: Optional[datetime] = Field(description=c.START_DATE)
end_date: Optional[datetime] = Field(description=c.END_DATE)
second: Optional[str] = Field("0", description=c.SECONDS, const=True)
minute: Optional[str] = Field(description=c.MINUTE)
hour: Optional[str] = Field(description=c.HOUR)
day: Optional[str] = Field(description=c.DAY)
Expand All @@ -54,7 +51,7 @@ def start_date_before_end_date(cls, v, values):
raise ValueError('end_date should not be before start_date')
return v

@validator("year", "month", "day_of_week", "week", "day", "hour", "minute", "second")
@validator("year", "month", "day_of_week", "week", "day", "hour", "minute")
def valid_expressions(cls, v, field):
fields_map = {
'year': BaseField,
Expand All @@ -64,7 +61,6 @@ def valid_expressions(cls, v, field):
'day_of_week': DayOfWeekField,
'hour': BaseField,
'minute': BaseField,
'second': BaseField
}
field_class = fields_map[field.name]
field_class(field.name, v, False)
Expand Down
73 changes: 73 additions & 0 deletions backend/app/models/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import json
from datetime import datetime
from enum import Enum
from typing import Optional, Dict, Any
from pydantic import Extra, validator
from app.models.base_model import BaseModel


class TaskStatus(str, Enum):
PENDING = "PENDING"
STARTED = "STARTED"
RETRY = "RETRY"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"


class Result(BaseModel):
dataset_id: Optional[str]
datasource_id: Optional[str]
exc_message: Optional[list[str]]
exc_module: Optional[str]
exc_type: Optional[str]

def dict(self, *args, **kwargs) -> Dict[str, Any]:
_ignored = kwargs.pop('exclude_none')
return super().dict(*args, exclude_none=True, **kwargs)

class Config:
extra = Extra.ignore


class TaskResult(BaseModel):
task_id: str
status: TaskStatus
kwargs: Optional[dict]
result: Optional[Result]
name: Optional[str]
retries: Optional[int]
date_done: Optional[datetime]

@validator('result', pre=True)
def ensure_result_is_dict(cls, value):
if value is None:
return None
elif isinstance(value, dict):
return value
elif isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
raise ValueError("Invalid JSON string for 'result' field")
else:
raise ValueError("Invalid value for 'result' field")

class Config:
extra = Extra.ignore


class TaskResultResponse(TaskResult):
pass


class TaskReadyResponse(BaseModel):
is_ready: bool


class TaskIdResponse(BaseModel):
task_id: str


class Task(BaseModel):
result: TaskResult
timestamp: str
Loading

0 comments on commit ffc02d5

Please sign in to comment.