Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ANY field in Protobuf descriptors #543

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 119 additions & 7 deletions src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.akhq.utils;

import com.google.protobuf.*;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
Expand All @@ -28,14 +28,17 @@ public class ProtobufToJsonDeserializer {
private final Map<String, List<Descriptor>> descriptors;
private final List<TopicsMapping> topicsMapping;
private final String protobufDescriptorsFolder;
private final Map<String, List<Descriptor>> descriptorByTypeName;

public ProtobufToJsonDeserializer(Connection.Deserialization.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping) {
if (protobufDeserializationTopicsMapping == null) {
this.descriptors = new HashMap<>();
this.topicsMapping = new ArrayList<>();
this.protobufDescriptorsFolder = null;
this.descriptorByTypeName = null;
} else {
this.protobufDescriptorsFolder = protobufDeserializationTopicsMapping.getDescriptorsFolder();
this.descriptorByTypeName = new HashMap<>();
this.topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
this.descriptors = buildAllDescriptors();
}
Expand Down Expand Up @@ -63,9 +66,62 @@ private Map<String, List<Descriptor>> buildAllDescriptors() {
throw new RuntimeException(String.format("Cannot build Protobuf descriptors for the topics regex [%s]", mapping.getTopicRegex()), e);
}
}
buildAdditionalDescriptors();
return allDescriptors;
}

/**
* Build descriptors for files from Protobuf descriptor folder, which are not specified in topics mapping.
* These descriptors can be used for deserializing dynamic messages with {@code Any} field.
*/
private void buildAdditionalDescriptors() {
List<String> filesFromTopicsMapping = topicsMapping.stream()
.map(TopicsMapping::getDescriptorFile)
.collect(Collectors.toList());
List<String> additionalFiles = getDescriptorFiles().stream()
.filter(file -> !filesFromTopicsMapping.contains(file))
.map(file -> protobufDescriptorsFolder + File.separator + file)
.collect(Collectors.toList());
for (String file : additionalFiles) {
try {
byte[] fileBytes = Files.readAllBytes(Path.of(file));
buildAllDescriptorsForDescriptorFile(fileBytes);
} catch (IOException | DescriptorValidationException e) {
e.printStackTrace();
}
}
buildDescriptorsForWellKnownTypes();
}

private void buildDescriptorsForWellKnownTypes() {
List<FileDescriptor> descriptors = new ArrayList<>();
descriptors.add(WrappersProto.getDescriptor());
descriptors.add(AnyProto.getDescriptor());
descriptors.add(ApiProto.getDescriptor());
descriptors.add(DurationProto.getDescriptor());
descriptors.add(DescriptorProtos.getDescriptor());
descriptors.add(EmptyProto.getDescriptor());
descriptors.add(FieldMaskProto.getDescriptor());
descriptors.add(SourceContextProto.getDescriptor());
descriptors.add(StructProto.getDescriptor());
descriptors.add(TimestampProto.getDescriptor());
descriptors.add(TypeProto.getDescriptor());

buildDescriptorsByTypeName(descriptors);
}

/**
* Extracts all descriptor files from Protobuf descriptor folder
*/
private List<String> getDescriptorFiles() {
if (protobufDescriptorsFolder != null && Files.exists(Path.of(protobufDescriptorsFolder))) {
File path = new File(protobufDescriptorsFolder);
String[] fileNames = path.list();
return fileNames == null ? Collections.emptyList() : Arrays.asList(fileNames);
}
return Collections.emptyList();
}

byte[] getDescriptorFileAsBytes(TopicsMapping mapping) throws IOException {
if (protobufDescriptorsFolder != null && Files.exists(Path.of(protobufDescriptorsFolder))) {
String descriptorFile = mapping.getDescriptorFile();
Expand Down Expand Up @@ -96,9 +152,18 @@ private List<Descriptor> buildAllDescriptorsForDescriptorFile(byte[] descriptorF
fileDescriptorsWithDependencies.add(fd);
}

return fileDescriptorsWithDependencies
.stream().flatMap(desc -> desc.getMessageTypes().stream())
.collect(Collectors.toList());
return buildDescriptorsByTypeName(fileDescriptorsWithDependencies);
}

private List<Descriptor> buildDescriptorsByTypeName(List<FileDescriptor> fileDescriptorsWithDependencies) {
List<Descriptor> result = new ArrayList<>();
for (FileDescriptor fd : fileDescriptorsWithDependencies) {
for (Descriptor messageType : fd.getMessageTypes()) {
descriptorByTypeName.put(messageType.getFullName(), fd.getMessageTypes());
result.add(messageType);
}
}
return result;
}

/**
Expand Down Expand Up @@ -136,8 +201,10 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) {
try {
result = tryToDeserializeWithMessageType(buffer, matchingConfig.getTopicRegex(), messageType);
} catch (Exception e) {
throw new SerializationException(String.format("Cannot deserialize message with Protobuf deserializer " +
"for topic [%s] and message type [%s]", topic, messageType), e);
String cannotSeserializeMessage = String.format("Cannot deserialize message with Protobuf deserializer " +
"for topic [%s] and message type [%s]", topic, messageType);
log.error(cannotSeserializeMessage + ". Raw message bytes [{}]", buffer, e);
throw new SerializationException(cannotSeserializeMessage, e);
}
return result;
}
Expand Down Expand Up @@ -176,8 +243,53 @@ private String tryToDeserializeWithMessageType(byte[] buffer, String topicRegex,

private String tryToParseDataToJsonWithDescriptor(byte[] buffer, Descriptor descriptor, List<Descriptor> allDependencies) throws IOException {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, buffer);
JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder().add(allDependencies).build();
JsonFormat.TypeRegistry.Builder builder = JsonFormat.TypeRegistry.newBuilder().add(allDependencies);

Set<Descriptor> descriptorsForFieldsWithTypeAny = getDescriptorsForFieldsWithTypeAny(message);
if (!descriptorsForFieldsWithTypeAny.isEmpty()) {
builder.add(descriptorsForFieldsWithTypeAny);
}

JsonFormat.TypeRegistry typeRegistry = builder.build();
JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry);
return printer.print(message);
}

/**
* If a message contains a field with type {@code Any}, we can find out an actual field type
* at the runtime from the message, get descriptors for this field type and add them to TypeRegistry.
* Multiple layer complex objects with {@code Any} fields on any layers are also supported using recursion.
*/
private Set<Descriptor> getDescriptorsForFieldsWithTypeAny(DynamicMessage message) {
Set<Descriptor> result = new HashSet<>();
for (Object fieldDescriptor : message.getAllFields().values()) {
if (isFieldTypeAny(fieldDescriptor)) {
var internalFields = ((DynamicMessage) fieldDescriptor).getAllFields().entrySet();
for (var internalField : internalFields) {
if (isFieldTypeAnyTypeUrl(internalField.getKey())) {
String typeNameFromUrl = internalField.getValue().toString().split("/")[1];
result.addAll(descriptorByTypeName.get(typeNameFromUrl));
}
}
} else if (isDynamicMessage(fieldDescriptor)) {
result.addAll(getDescriptorsForFieldsWithTypeAny((DynamicMessage) fieldDescriptor));
}
}
return result;
}

private boolean isDynamicMessage(Object fieldDescriptor) {
return fieldDescriptor instanceof DynamicMessage;
}

private boolean isFieldTypeAny(Object fieldDescriptor) {
if (fieldDescriptor instanceof DynamicMessage) {
return (((DynamicMessage) fieldDescriptor).getDescriptorForType().getFullName().equals("google.protobuf.Any"));
}
return false;
}

private boolean isFieldTypeAnyTypeUrl(Descriptors.FieldDescriptor field) {
return field.getFullName().equals("google.protobuf.Any.type_url");
}
}
Loading