Skip to content

Commit

Permalink
Support ANY field in Protobuf descriptors
Browse files Browse the repository at this point in the history
  • Loading branch information
Taisiia Goltseva committed Jan 12, 2021
1 parent 58da7ca commit b0615a7
Show file tree
Hide file tree
Showing 15 changed files with 4,030 additions and 125 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ test.classpath += configurations.developmentOnly
run.jvmArgs('-noverify',
'-XX:TieredStopAtLevel=1',
'-Dmicronaut.environments=dev',
'-Dmicronaut.io.watch.restart=true'
'-Dmicronaut.io.watch.restart=true',
'-Xmx2560m'
)

tasks.withType(JavaCompile){
Expand Down
107 changes: 101 additions & 6 deletions src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FileDescriptor;
Expand All @@ -28,14 +29,17 @@ public class ProtobufToJsonDeserializer {
private final Map<String, List<Descriptor>> descriptors;
private final List<TopicsMapping> topicsMapping;
private final String protobufDescriptorsFolder;
private final Map<String, List<Descriptor>> descriptorByTypeName;

public ProtobufToJsonDeserializer(Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping) {
if (protobufDeserializationTopicsMapping == null) {
this.descriptors = new HashMap<>();
this.topicsMapping = new ArrayList<>();
this.protobufDescriptorsFolder = null;
this.descriptorByTypeName = null;
} else {
this.protobufDescriptorsFolder = protobufDeserializationTopicsMapping.getDescriptorsFolder();
this.descriptorByTypeName = new HashMap<>();
this.topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
this.descriptors = buildAllDescriptors();
}
Expand Down Expand Up @@ -63,9 +67,44 @@ private Map<String, List<Descriptor>> buildAllDescriptors() {
throw new RuntimeException(String.format("Cannot build Protobuf descriptors for the topics regex [%s]", mapping.getTopicRegex()), e);
}
}
buildAdditionalDescriptors();
return allDescriptors;
}

/**
* Build descriptors for files from Protobuf descriptor folder, which are not specified in topics mapping.
* These descriptors can be used for deserializing dynamic messages with {@code Any} field.
*/
private void buildAdditionalDescriptors() {
List<String> filesFromTopicsMapping = topicsMapping.stream()
.map(TopicsMapping::getDescriptorFile)
.collect(Collectors.toList());
List<String> additionalFiles = getDescriptorFiles().stream()
.filter(file -> !filesFromTopicsMapping.contains(file))
.map(file -> protobufDescriptorsFolder + File.separator + file)
.collect(Collectors.toList());
for (String file : additionalFiles) {
try {
byte[] fileBytes = Files.readAllBytes(Path.of(file));
buildAllDescriptorsForDescriptorFile(fileBytes);
} catch (IOException | DescriptorValidationException e) {
e.printStackTrace();
}
}
}

/**
* Extracts all descriptor files from Protobuf descriptor folder
*/
private List<String> getDescriptorFiles() {
if (protobufDescriptorsFolder != null && Files.exists(Path.of(protobufDescriptorsFolder))) {
File path = new File(protobufDescriptorsFolder);
String[] fileNames = path.list();
return fileNames == null ? Collections.emptyList() : Arrays.asList(fileNames);
}
return Collections.emptyList();
}

byte[] getDescriptorFileAsBytes(TopicsMapping mapping) throws IOException {
if (protobufDescriptorsFolder != null && Files.exists(Path.of(protobufDescriptorsFolder))) {
String descriptorFile = mapping.getDescriptorFile();
Expand Down Expand Up @@ -96,9 +135,18 @@ private List<Descriptor> buildAllDescriptorsForDescriptorFile(byte[] descriptorF
fileDescriptorsWithDependencies.add(fd);
}

return fileDescriptorsWithDependencies
.stream().flatMap(desc -> desc.getMessageTypes().stream())
.collect(Collectors.toList());
return buildDescriptorsByTypeName(fileDescriptorsWithDependencies);
}

private List<Descriptor> buildDescriptorsByTypeName(List<FileDescriptor> fileDescriptorsWithDependencies) {
List<Descriptor> result = new ArrayList<>();
for (FileDescriptor fd : fileDescriptorsWithDependencies) {
for (Descriptor messageType : fd.getMessageTypes()) {
descriptorByTypeName.put(messageType.getFullName(), fd.getMessageTypes());
result.add(messageType);
}
}
return result;
}

/**
Expand Down Expand Up @@ -131,8 +179,10 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) {
try {
result = tryToDeserializeWithMessageType(buffer, matchingConfig.getTopicRegex(), messageType);
} catch (Exception e) {
throw new SerializationException(String.format("Cannot deserialize message with Protobuf deserializer " +
"for topic [%s] and message type [%s]", topic, messageType), e);
String cannotSeserializeMessage = String.format("Cannot deserialize message with Protobuf deserializer " +
"for topic [%s] and message type [%s]", topic, messageType);
log.error(cannotSeserializeMessage + ". Raw message bytes [{}]", buffer, e);
throw new SerializationException(cannotSeserializeMessage, e);
}
return result;
}
Expand Down Expand Up @@ -171,8 +221,53 @@ private String tryToDeserializeWithMessageType(byte[] buffer, String topicRegex,

private String tryToParseDataToJsonWithDescriptor(byte[] buffer, Descriptor descriptor, List<Descriptor> allDependencies) throws IOException {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, buffer);
JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder().add(allDependencies).build();
JsonFormat.TypeRegistry.Builder builder = JsonFormat.TypeRegistry.newBuilder().add(allDependencies);

Set<Descriptor> descriptorsForFieldsWithTypeAny = getDescriptorsForFieldsWithTypeAny(message);
if (!descriptorsForFieldsWithTypeAny.isEmpty()) {
builder.add(descriptorsForFieldsWithTypeAny);
}

JsonFormat.TypeRegistry typeRegistry = builder.build();
JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry);
return printer.print(message);
}

/**
* If a message contains a field with type {@code Any}, we can find out an actual field type
* at the runtime from the message, get descriptors for this field type and add them to TypeRegistry.
* Multiple layer complex objects with {@code Any} fields on any layers are also supported using recursion.
*/
private Set<Descriptor> getDescriptorsForFieldsWithTypeAny(DynamicMessage message) {
Set<Descriptor> result = new HashSet<>();
for (Object fieldDescriptor : message.getAllFields().values()) {
if (isFieldTypeAny(fieldDescriptor)) {
var internalFields = ((DynamicMessage) fieldDescriptor).getAllFields().entrySet();
for (var internalField : internalFields) {
if (isFieldTypeAnyTypeUrl(internalField.getKey())) {
String typeNameFromUrl = internalField.getValue().toString().split("/")[1];
result.addAll(descriptorByTypeName.get(typeNameFromUrl));
}
}
} else if (isDynamicMessage(fieldDescriptor)) {
result.addAll(getDescriptorsForFieldsWithTypeAny((DynamicMessage) fieldDescriptor));
}
}
return result;
}

private boolean isDynamicMessage(Object fieldDescriptor) {
return fieldDescriptor instanceof DynamicMessage;
}

private boolean isFieldTypeAny(Object fieldDescriptor) {
if (fieldDescriptor instanceof DynamicMessage) {
return (((DynamicMessage) fieldDescriptor).getDescriptorForType().getFullName().equals("google.protobuf.Any"));
}
return false;
}

private boolean isFieldTypeAnyTypeUrl(Descriptors.FieldDescriptor field) {
return field.getFullName().equals("google.protobuf.Any.type_url");
}
}
Loading

0 comments on commit b0615a7

Please sign in to comment.