Skip to content

Commit

Permalink
Prevent crashing when job was deleted externally (#147)
Browse files Browse the repository at this point in the history
Signed-off-by: shuheng <[email protected]>

Co-authored-by: shuheng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored May 23, 2022
1 parent e579a01 commit 7324f17
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
10 changes: 9 additions & 1 deletion python/feast_spark/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
start_stream_to_online_ingestion,
unschedule_offline_to_online_ingestion,
)
from feast_spark.pyspark.launchers.k8s.k8s import JobNotFoundException
from feast_spark.third_party.grpc.health.v1.HealthService_pb2 import (
HealthCheckResponse,
ServingStatus,
Expand Down Expand Up @@ -437,9 +438,16 @@ def ensure_stream_ingestion_jobs(client: Client, all_projects: bool):
opt.JOB_SERVICE_RETRY_FAILED_JOBS
)
):
status = None
try:
status = job.get_status()
except JobNotFoundException:
logger.warning(f"{job.get_id()} was already removed")

if (
isinstance(job, StreamIngestionJob)
and job.get_status() != SparkJobStatus.COMPLETED
and status is not None
and status != SparkJobStatus.COMPLETED
):
jobs_by_hash[job.get_hash()] = job

Expand Down
13 changes: 10 additions & 3 deletions python/feast_spark/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def _truncate_label(label: str) -> str:
return label[:63]


class JobNotFoundException(Exception):
pass


class KubernetesJobMixin:
def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str):
self._api = api
Expand All @@ -85,17 +89,20 @@ def get_id(self) -> str:

def get_error_message(self) -> str:
job = _get_job_by_id(self._api, self._namespace, self._job_id)
assert job is not None
if job is None:
raise JobNotFoundException()
return job.job_error_message

def get_status(self) -> SparkJobStatus:
job = _get_job_by_id(self._api, self._namespace, self._job_id)
assert job is not None
if job is None:
raise JobNotFoundException
return job.state

def get_start_time(self) -> datetime:
job = _get_job_by_id(self._api, self._namespace, self._job_id)
assert job is not None
if job is None:
raise JobNotFoundException
return job.start_time

def cancel(self):
Expand Down

0 comments on commit 7324f17

Please sign in to comment.