Skip to content

Commit

Permalink
fix(provider/kafka-connect): fix NPE on KafkaConnectorResourceValidat…
Browse files Browse the repository at this point in the history
…ion (#454)
  • Loading branch information
fhussonnois committed May 28, 2024
1 parent 459f44c commit 64dd232
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.streamthoughts.jikkou.common.utils.Strings;
import io.streamthoughts.jikkou.core.annotation.Reflectable;
import java.beans.ConstructorProperties;
import java.io.Serializable;
Expand Down Expand Up @@ -138,6 +139,14 @@ public Optional<NamedValue> findLabelByKey(final String key) {
return Optional.ofNullable(labels.get(key)).map(val -> new NamedValue(key, val));
}

/**
* Checks whether a non-empty name is defined.
* @return {@code true} if the name is present, otherwise {@code false}.
*/
public boolean hasName() {
return !Strings.isBlank(getName());
}

/**
* Checks whether a label exists for the specified key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnectorSpec;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;

@Enabled
Expand All @@ -31,39 +32,40 @@ public class KafkaConnectorResourceValidation implements Validation<V1KafkaConne
@Override
public ValidationResult validate(@NotNull V1KafkaConnector resource) {
List<ValidationError> errors = new ArrayList<>();
ObjectMeta metadata = resource.getMetadata();
if (metadata == null) {
errors.add(newError(resource, "Missing or empty field: 'metadata'"));
return new ValidationResult(errors);
}

String name = metadata.getName();
if (Strings.isBlank(name)) {
errors.add(newError(resource, "Missing or empty field: 'metadata.name'."));
}
Optional<ObjectMeta> optionalObjectMeta = resource.optionalMetadata();

if (!metadata.hasLabel(KAFKA_CONNECT_CLUSTER)) {
errors.add(newError(resource, "Missing or empty field: 'metadata.labels." + KAFKA_CONNECT_CLUSTER + "'."));
}

V1KafkaConnectorSpec spec = resource.getSpec();
if (Strings.isBlank(spec.getConnectorClass())) {
errors.add(newError(resource, "Missing or empty field: 'spec.connectorClass'."));
}

if (spec.getTasksMax() == null) {
errors.add(newError(resource, "Missing or empty field: 'spec.tasksMax'."));
}
//Validate metadata
optionalObjectMeta.ifPresentOrElse(objectMeta -> {
if (!objectMeta.hasName()) {
errors.add(newError(resource, "Missing or empty field: 'metadata.name'."));
}
if (!objectMeta.hasLabel(KAFKA_CONNECT_CLUSTER)) {
errors.add(newError(resource, "Missing or empty field: 'metadata.labels." + KAFKA_CONNECT_CLUSTER + "'."));
}
}, () -> errors.add(newError(resource, "Missing or empty field: 'metadata'"))
);

//Validate spec
Optional<V1KafkaConnectorSpec> optionalSpec = Optional.ofNullable(resource.getSpec());
optionalSpec.ifPresentOrElse(spec -> {
if (Strings.isBlank(spec.getConnectorClass())) {
errors.add(newError(resource, "Missing or empty field: 'spec.connectorClass'."));
}
if (Strings.isBlank(spec.getConnectorClass())) {
errors.add(newError(resource, "Missing or empty field: 'spec.tasksMax'."));
}
}, () -> errors.add(newError(resource, "Missing or empty field: 'spec'"))
);
return new ValidationResult(errors);
}

@NotNull
private ValidationError newError(@NotNull V1KafkaConnector resource, String message) {
return new ValidationError(
getName(),
resource,
message
getName(),
resource,
message
);
}
}

0 comments on commit 64dd232

Please sign in to comment.