Skip to content

Commit

Permalink
fix(ctrl): monitor Pod ready condition
Browse files Browse the repository at this point in the history
When the user uses a startup probe, the Integration won't turn as running until the condition is reached

Closes #4977
  • Loading branch information
squakez committed Jan 8, 2024
1 parent 75596a5 commit 0e24b3d
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 119 deletions.
161 changes: 46 additions & 115 deletions pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func getIntegrationSecretsAndConfigmaps(ctx context.Context, client client.Clien
type controller interface {
checkReadyCondition(ctx context.Context) (bool, error)
getPodSpec() corev1.PodSpec
updateReadyCondition(readyPods []corev1.Pod) bool
updateReadyCondition(readyPods int) bool
}

func (action *monitorAction) newController(env *trait.Environment, integration *v1.Integration) (controller, error) {
Expand Down Expand Up @@ -318,33 +318,32 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(
if err != nil {
return err
}

readyPods, unreadyPods := filterPodsByReadyStatus(environment, runningPods, controller.getPodSpec())

if done, err := controller.checkReadyCondition(ctx); done || err != nil {
if len(readyPods) > 0 || len(unreadyPods) > 0 {
// There may be pods that are not ready but still probable for getting error messages.
// Ignore returned error from probing as it's expected when the ctrl obj is not ready.
_ = action.probeReadiness(ctx, environment, integration, unreadyPods, readyPods)
}
// There may be pods that are not ready but still probable for getting error messages.
// Ignore returned error from probing as it's expected when the ctrl obj is not ready.
action.probeReadiness(ctx, environment, integration, runningPods)
return err
}
if done := checkPodStatuses(integration, pendingPods, runningPods); done {
if arePodsFailingStatuses(integration, pendingPods, runningPods) {
return nil
}
integration.Status.Phase = v1.IntegrationPhaseRunning

if done := controller.updateReadyCondition(readyPods); done {
readyPods, probeOk, err := action.probeReadiness(ctx, environment, integration, runningPods)
if err != nil {
return err
}
if !probeOk {
integration.Status.Phase = v1.IntegrationPhaseError
return nil
}
if err := action.probeReadiness(ctx, environment, integration, unreadyPods, readyPods); err != nil {
return err
if done := controller.updateReadyCondition(readyPods); done {
integration.Status.Phase = v1.IntegrationPhaseRunning
return nil
}

return nil
}

func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool {
func arePodsFailingStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool {
// Check Pods statuses
for _, pod := range pendingPods {
// Check the scheduled condition
Expand Down Expand Up @@ -396,114 +395,44 @@ func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, run
return false
}

func filterPodsByReadyStatus(environment *trait.Environment, runningPods []corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) {
var readyPods []corev1.Pod
var unreadyPods []corev1.Pod

integrationContainerName := environment.GetIntegrationContainerName()
for _, pod := range runningPods {
// We compare the Integration PodSpec to that of the Pod in order to make
// sure we account for up-to-date version.
if !comparePodSpec(integrationContainerName, podSpec, pod.Spec) {
continue
}
ready := kubernetes.GetPodCondition(pod, corev1.PodReady)
if ready == nil {
continue
}
switch ready.Status {
case corev1.ConditionTrue:
// We still account terminating Pods to handle rolling deployments
readyPods = append(readyPods, pod)
case corev1.ConditionFalse:
if pod.DeletionTimestamp != nil {
continue
}
unreadyPods = append(unreadyPods, pod)
}
}

return readyPods, unreadyPods
}

// comparePodSpec compares given pod spec according to integration specific information (e.g. digest, container image).
func comparePodSpec(integrationContainerName string, runningPodSpec corev1.PodSpec, referencePod corev1.PodSpec) bool {
runningPodContainer := findIntegrationContainer(integrationContainerName, runningPodSpec)
referencePodContainer := findIntegrationContainer(integrationContainerName, referencePod)

if runningPodContainer == nil || referencePodContainer == nil {
return false
}

// integration digest must be the same
if getIntegrationDigest(runningPodContainer.Env) != getIntegrationDigest(referencePodContainer.Env) {
return false
}

// integration container image must be the same (same integration kit)
if runningPodContainer.Image != referencePodContainer.Image {
return false
}

return true
}

func getIntegrationDigest(envs []corev1.EnvVar) string {
for _, env := range envs {
if env.Name == digest.IntegrationDigestEnvVar {
return env.Value
}
}

return ""
}

// findIntegrationContainer find if present the integration container in the pod spec using the integration specifications.
func findIntegrationContainer(integrationContainerName string, spec corev1.PodSpec) *corev1.Container {
for _, c := range spec.Containers {
if c.Name == integrationContainerName {
return &c
}
}

return nil
}

// probeReadiness calls the readiness probes of the non-ready Pods directly to retrieve insights from the Camel runtime.
func (action *monitorAction) probeReadiness(
ctx context.Context, environment *trait.Environment, integration *v1.Integration,
unreadyPods []corev1.Pod, readyPods []corev1.Pod,
) error {
// The func return the number of readyPods, the success of the probe and any error may have happened during its execution.
func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pods []corev1.Pod) (int, bool, error) {
// as a default we assume the Integration is Ready
readyCondition := v1.IntegrationCondition{
Type: v1.IntegrationConditionReady,
Status: corev1.ConditionFalse,
Pods: make([]v1.PodCondition, len(unreadyPods)),
Status: corev1.ConditionTrue,
Pods: make([]v1.PodCondition, len(pods)),
}

readyPods := 0
unreadyPods := 0

runtimeReady := true
runtimeFailed := false
probeReadinessOk := true

for i := range unreadyPods {
pod := &unreadyPods[i]
if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); ready.Reason != "ContainersNotReady" {
continue
}
container := getIntegrationContainer(environment, pod)
if container == nil {
return fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name)
}

for i := range pods {
pod := &pods[i]
readyCondition.Pods[i].Name = pod.Name

for p := range pod.Status.Conditions {
if pod.Status.Conditions[p].Type == corev1.PodReady {
readyCondition.Pods[i].Condition = pod.Status.Conditions[p]
break
}
}
// If it's in ready status, then we don't care to probe.
if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); ready.Status == corev1.ConditionTrue {
readyPods++
continue
}
unreadyPods++
container := getIntegrationContainer(environment, pod)
if container == nil {
return readyPods, false, fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name)
}
if probe := container.ReadinessProbe; probe != nil && probe.HTTPGet != nil {
body, err := proxyGetHTTPProbe(ctx, action.client, probe, pod, container)

// When invoking the HTTP probe, the kubernetes client exposes a very
// specific behavior:
//
Expand Down Expand Up @@ -559,7 +488,7 @@ func (action *monitorAction) probeReadiness(

health, err := NewHealthCheck(body)
if err != nil {
return err
return readyPods, false, err
}
for _, check := range health.Checks {
if check.Status == v1.HealthCheckStatusUp {
Expand All @@ -575,19 +504,21 @@ func (action *monitorAction) probeReadiness(
}

if runtimeFailed {
integration.Status.Phase = v1.IntegrationPhaseError
probeReadinessOk = false
readyCondition.Reason = v1.IntegrationConditionErrorReason
readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", len(unreadyPods), len(unreadyPods)+len(readyPods))
readyCondition.Status = corev1.ConditionFalse
readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods)
integration.Status.SetConditions(readyCondition)
}
if !runtimeReady {
integration.Status.Phase = v1.IntegrationPhaseError
probeReadinessOk = false
readyCondition.Reason = v1.IntegrationConditionRuntimeNotReadyReason
readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", len(unreadyPods), len(unreadyPods)+len(readyPods))
readyCondition.Status = corev1.ConditionFalse
readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods)
integration.Status.SetConditions(readyCondition)
}

integration.Status.SetConditions(readyCondition)

return nil
return readyPods, probeReadinessOk, nil
}

func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/integration/monitor_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *cronJobController) getPodSpec() corev1.PodSpec {
return c.obj.Spec.JobTemplate.Spec.Template.Spec
}

func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool {
func (c *cronJobController) updateReadyCondition(readyPods int) bool {
switch {
case c.obj.Status.LastScheduleTime == nil:
c.integration.SetReadyCondition(corev1.ConditionTrue,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/integration/monitor_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (c *deploymentController) getPodSpec() corev1.PodSpec {
return c.obj.Spec.Template.Spec
}

func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod) bool {
func (c *deploymentController) updateReadyCondition(readyPods int) bool {
replicas := int32(1)
if r := c.integration.Spec.Replicas; r != nil {
replicas = *r
}
// The Deployment status reports updated and ready replicas separately,
// so that the number of ready replicas also accounts for older versions.
readyReplicas := int32(len(readyPods))
readyReplicas := int32(readyPods)
switch {
case readyReplicas >= replicas:
// The Integration is considered ready when the number of replicas
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/integration/monitor_knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *knativeServiceController) getPodSpec() corev1.PodSpec {
return c.obj.Spec.Template.Spec.PodSpec
}

func (c *knativeServiceController) updateReadyCondition(readyPods []corev1.Pod) bool {
func (c *knativeServiceController) updateReadyCondition(readyPods int) bool {
ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady)
if ready.IsTrue() {
c.integration.SetReadyCondition(corev1.ConditionTrue,
Expand Down

0 comments on commit 0e24b3d

Please sign in to comment.