Skip to content

Commit

Permalink
feat(Backend + SDK): Update kfp backend and kubernetes sdk to support…
Browse files Browse the repository at this point in the history
… tolerations (kubeflow#10471)

* feat(Backend + SDK): Update kfp backend and kubernetes sdk to support tolerations

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: edmondop <[email protected]>
Co-authored-by: tarat44 <[email protected]>

* Address PR review 1

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: edmondop <[email protected]>
Co-authored-by: tarat44 <[email protected]>

* Refactor add_toleration to use Python primitives

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: edmondop <[email protected]>
Co-authored-by: tarat44 <[email protected]>

* Update go.mod to pull in latest kubernetes_platform package

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: edmondop <[email protected]>
Co-authored-by: tarat44 <[email protected]>

---------

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: edmondop <[email protected]>
Co-authored-by: tarat44 <[email protected]>
  • Loading branch information
3 people authored and rimolive committed Mar 28, 2024
1 parent ebe49d0 commit 7a1f94d
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 2 deletions.
22 changes: 22 additions & 0 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,28 @@ func extendPodSpecPatch(
podSpec.NodeSelector = kubernetesExecutorConfig.GetNodeSelector().GetLabels()
}

if tolerations := kubernetesExecutorConfig.GetTolerations(); tolerations != nil {
var k8sTolerations []k8score.Toleration

glog.Infof("Tolerations passed: %+v", tolerations)

for _, toleration := range tolerations {
if toleration != nil {
k8sToleration := k8score.Toleration{
Key: toleration.Key,
Operator: k8score.TolerationOperator(toleration.Operator),
Value: toleration.Value,
Effect: k8score.TaintEffect(toleration.Effect),
TolerationSeconds: toleration.TolerationSeconds,
}

k8sTolerations = append(k8sTolerations, k8sToleration)
}
}

podSpec.Tolerations = k8sTolerations
}

// Get secret mount information
for _, secretAsVolume := range kubernetesExecutorConfig.GetSecretAsVolume() {
secretVolume := k8score.Volume{
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/driver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/758c91f76784/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/f51dc39614e4/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/e129b0501379/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/e1f0c010f800/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/mailru/easyjson,https://github.com/mailru/easyjson/blob/v0.7.7/LICENSE,MIT
github.com/modern-go/concurrent,https://github.com/modern-go/concurrent/blob/bacd9c7ef1dd/LICENSE,Apache-2.0
Expand Down
8 changes: 7 additions & 1 deletion kubernetes_platform/python/kfp/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
__version__ = '1.0.0'

__all__ = [
'add_node_selector',
'add_pod_annotation',
'add_pod_label',
'add_toleration',
'CreatePVC',
'DeletePVC',
'mount_pvc',
Expand Down Expand Up @@ -42,9 +46,11 @@
from kfp.kubernetes.pod_metadata import add_pod_label
from kfp.kubernetes.secret import use_secret_as_env
from kfp.kubernetes.secret import use_secret_as_volume
<<<<<<< HEAD
from kfp.kubernetes.timeout import set_timeout
=======
>>>>>>> 2983a7d49 (feat(Backend + SDK): Update kfp backend and kubernetes sdk to support tolerations (#10471))
from kfp.kubernetes.toleration import add_toleration
from kfp.kubernetes.volume import CreatePVC
from kfp.kubernetes.volume import DeletePVC
from kfp.kubernetes.volume import mount_pvc
from kfp.kubernetes.image import set_image_pull_secrets
81 changes: 81 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/toleration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb

try:
from typing import Literal
except ImportError:
from typing_extensions import Literal


def add_toleration(
task: PipelineTask,
key: Optional[str] = None,
operator: Optional[Literal["Equal", "Exists"]] = None,
value: Optional[str] = None,
effect: Optional[Literal["NoExecute", "NoSchedule", "PreferNoSchedule"]] = None,
toleration_seconds: Optional[int] = None,
):
"""Add a `toleration<https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/>`_. to a task.
Args:
task:
Pipeline task.
key:
key is the taint key that the toleration applies to. Empty means
match all taint keys. If the key is empty, operator must be Exists;
this combination means to match all values and all keys.
operator:
operator represents a key's relationship to the value. Valid
operators are Exists and Equal. Defaults to Equal. Exists is
equivalent to wildcard for value, so that a pod can tolerate all
taints of a particular category.
value:
value is the taint value the toleration matches to. If the operator
is Exists, the value should be empty, otherwise just a regular
string.
effect:
effect indicates the taint effect to match. Empty means match all
taint effects. When specified, allowed values are NoSchedule,
PreferNoSchedule and NoExecute.
toleration_seconds:
toleration_seconds represents the period of time the toleration
(which must be of effect NoExecute, otherwise this field is ignored)
tolerates the taint. By default, it is not set, which means tolerate
the taint forever (do not evict). Zero and negative values will be
treated as 0 (evict immediately) by the system.
Returns:
Task object with added toleration.
"""

msg = common.get_existing_kubernetes_config_as_message(task)
msg.tolerations.append(
pb.Toleration(
key=key,
operator=operator,
value=value,
effect=effect,
toleration_seconds=toleration_seconds,
)
)
task.platform_config["kubernetes"] = json_format.MessageToDict(msg)

return task
41 changes: 41 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/toleration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes
from kubernetes.client import V1Toleration


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.add_toleration(
task,
key="key1",
operator="Equal",
value="value1",
effect="NoExecute",
toleration_seconds=10,
)


if __name__ == "__main__":
from kfp import compiler

compiler.Compiler().compile(my_pipeline, __file__.replace(".py", ".yaml"))
61 changes: 61 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/toleration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.6.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
tolerations:
- effect: NoExecute
key: key1
operator: Equal
tolerationSeconds: '10'
value: value1
Loading

0 comments on commit 7a1f94d

Please sign in to comment.