From 7e73a35cd7e93f2809943ee618d35ff7666c0d72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 27 Oct 2023 17:02:40 +0200 Subject: [PATCH] Support super stream creation/deletion --- .../java/com/rabbitmq/stream/Constants.java | 2 + .../java/com/rabbitmq/stream/impl/Client.java | 212 ++++++++++++------ 2 files changed, 143 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/Constants.java b/src/main/java/com/rabbitmq/stream/Constants.java index 99995a0dd1..f3530767dc 100644 --- a/src/main/java/com/rabbitmq/stream/Constants.java +++ b/src/main/java/com/rabbitmq/stream/Constants.java @@ -72,6 +72,8 @@ public final class Constants { public static final short COMMAND_CONSUMER_UPDATE = 26; public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27; public static final short COMMAND_STREAM_STATS = 28; + public static final short COMMAND_CREATE_SUPER_STREAM = 29; + public static final short COMMAND_DELETE_SUPER_STREAM = 30; public static final short VERSION_1 = 1; public static final short VERSION_2 = 2; diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index b443bddf51..86fa36a654 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -22,7 +22,9 @@ import static com.rabbitmq.stream.impl.Utils.noOpConsumer; import static java.lang.String.format; import static java.lang.String.join; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.StreamSupport.stream; import com.rabbitmq.stream.AuthenticationFailureException; import com.rabbitmq.stream.ByteCapacity; @@ -83,16 +85,10 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -127,6 +123,7 @@ */ public class Client implements AutoCloseable { + private static final Charset CHARSET = StandardCharsets.UTF_8; public static final int DEFAULT_PORT = 5552; public static final int DEFAULT_TLS_PORT = 5551; static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK = @@ -446,12 +443,7 @@ int maxFrameSize() { } private Map peerProperties() { - int clientPropertiesSize = 4; // size of the map, always there - if (!clientProperties.isEmpty()) { - for (Map.Entry entry : clientProperties.entrySet()) { - clientPropertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length(); - } - } + int clientPropertiesSize = mapSize(this.clientProperties); int length = 2 + 2 + 4 + clientPropertiesSize; int correlationId = correlationSequence.incrementAndGet(); try { @@ -460,13 +452,7 @@ private Map peerProperties() { bb.writeShort(encodeRequestCode(COMMAND_PEER_PROPERTIES)); bb.writeShort(VERSION_1); bb.writeInt(correlationId); - bb.writeInt(clientProperties.size()); - for (Map.Entry entry : clientProperties.entrySet()) { - bb.writeShort(entry.getKey().length()) - .writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8)) - .writeShort(entry.getValue().length()) - .writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8)); - } + writeMap(bb, this.clientProperties); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -540,7 +526,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate( bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(saslMechanism.getName().length()); - bb.writeBytes(saslMechanism.getName().getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(saslMechanism.getName().getBytes(CHARSET)); if (challengeResponse == null) { bb.writeInt(-1); } else { @@ -570,7 +556,7 @@ private Map open(String virtualHost) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(virtualHost.length()); - bb.writeBytes(virtualHost.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(virtualHost.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -612,7 +598,7 @@ private void sendClose(short code, String reason) { bb.writeInt(correlationId); bb.writeShort(code); bb.writeShort(reason.length()); - bb.writeBytes(reason.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reason.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -662,10 +648,7 @@ public Response create(String stream) { } public Response create(String stream, Map arguments) { - int length = 2 + 2 + 4 + 2 + stream.length() + 4; - for (Map.Entry argument : arguments.entrySet()) { - length = length + 2 + argument.getKey().length() + 2 + argument.getValue().length(); - } + int length = 2 + 2 + 4 + 2 + stream.length() + mapSize(arguments); int correlationId = correlationSequence.incrementAndGet(); try { ByteBuf bb = allocate(length + 4); @@ -674,14 +657,8 @@ public Response create(String stream, Map arguments) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); - bb.writeInt(arguments.size()); - for (Map.Entry argument : arguments.entrySet()) { - bb.writeShort(argument.getKey().length()); - bb.writeBytes(argument.getKey().getBytes(StandardCharsets.UTF_8)); - bb.writeShort(argument.getValue().length()); - bb.writeBytes(argument.getValue().getBytes(StandardCharsets.UTF_8)); - } + bb.writeBytes(stream.getBytes(CHARSET)); + writeMap(bb, arguments); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -696,6 +673,116 @@ public Response create(String stream, Map arguments) { } } + Response createSuperStream( + String superStream, + List partitions, + List routingKeys, + Map arguments) { + if (partitions.isEmpty() || routingKeys.isEmpty()) { + throw new IllegalArgumentException( + "Partitions and routing keys of a super stream cannot be empty"); + } + if (partitions.size() != routingKeys.size()) { + throw new IllegalArgumentException( + "Partitions and routing keys of a super stream must have " + + "the same number of elements"); + } + int length = + 2 + + 2 + + 4 + + 2 + + superStream.length() + + collectionSize(partitions) + + collectionSize(routingKeys) + + mapSize(arguments); + int correlationId = correlationSequence.incrementAndGet(); + try { + ByteBuf bb = allocate(length + 4); + bb.writeInt(length); + bb.writeShort(encodeRequestCode(COMMAND_CREATE_SUPER_STREAM)); + bb.writeShort(VERSION_1); + bb.writeInt(correlationId); + bb.writeShort(superStream.length()); + bb.writeBytes(superStream.getBytes(CHARSET)); + writeCollection(bb, partitions); + writeCollection(bb, routingKeys); + writeMap(bb, arguments); + OutstandingRequest request = outstandingRequest(); + outstandingRequests.put(correlationId, request); + channel.writeAndFlush(bb); + request.block(); + return request.response.get(); + } catch (StreamException e) { + outstandingRequests.remove(correlationId); + throw e; + } catch (RuntimeException e) { + outstandingRequests.remove(correlationId); + throw new StreamException(format("Error while creating super stream '%s'", superStream), e); + } + } + + Response deleteSuperStream(String superStream) { + int length = 2 + 2 + 4 + 2 + superStream.length(); + int correlationId = correlationSequence.incrementAndGet(); + try { + ByteBuf bb = allocate(length + 4); + bb.writeInt(length); + bb.writeShort(encodeRequestCode(COMMAND_DELETE_SUPER_STREAM)); + bb.writeShort(VERSION_1); + bb.writeInt(correlationId); + bb.writeShort(superStream.length()); + bb.writeBytes(superStream.getBytes(CHARSET)); + OutstandingRequest request = outstandingRequest(); + outstandingRequests.put(correlationId, request); + channel.writeAndFlush(bb); + request.block(); + return request.response.get(); + } catch (StreamException e) { + outstandingRequests.remove(correlationId); + throw e; + } catch (RuntimeException e) { + outstandingRequests.remove(correlationId); + throw new StreamException(format("Error while deleting stream '%s'", superStream), e); + } + } + + private static int collectionSize(Collection elements) { + return 4 + elements.stream().mapToInt(v -> 2 + v.length()).sum(); + } + + private static int arraySize(String... elements) { + return 4 + collectionSize(asList(elements)); + } + + private static int mapSize(Map elements) { + return 4 + + elements.entrySet().stream() + .mapToInt(e -> 2 + e.getKey().length() + 2 + e.getValue().length()) + .sum(); + } + + private static ByteBuf writeCollection(ByteBuf bb, Collection elements) { + bb.writeInt(elements.size()); + elements.forEach(e -> bb.writeShort(e.length()).writeBytes(e.getBytes(CHARSET))); + return bb; + } + + private static ByteBuf writeArray(ByteBuf bb, String... elements) { + return writeCollection(bb, asList(elements)); + } + + private static ByteBuf writeMap(ByteBuf bb, Map elements) { + bb.writeInt(elements.size()); + elements.forEach( + (key, value) -> + bb.writeShort(key.length()) + .writeBytes(key.getBytes(CHARSET)) + .writeShort(value.length()) + .writeBytes(value.getBytes(CHARSET))); + return bb; + } + ByteBuf allocate(ByteBufAllocator allocator, int capacity) { if (frameSizeCopped && capacity > this.maxFrameSize()) { throw new IllegalArgumentException( @@ -729,7 +816,7 @@ public Response delete(String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -748,11 +835,7 @@ public Map metadata(String... streams) { if (streams == null || streams.length == 0) { throw new IllegalArgumentException("At least one stream must be specified"); } - int length = 2 + 2 + 4 + 4; // API code, version, correlation ID, size of array - for (String stream : streams) { - length += 2; - length += stream.length(); - } + int length = 2 + 2 + 4 + arraySize(streams); // API code, version, correlation ID, array size int correlationId = correlationSequence.incrementAndGet(); try { ByteBuf bb = allocate(length + 4); @@ -760,11 +843,7 @@ public Map metadata(String... streams) { bb.writeShort(encodeRequestCode(COMMAND_METADATA)); bb.writeShort(VERSION_1); bb.writeInt(correlationId); - bb.writeInt(streams.length); - for (String stream : streams) { - bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); - } + writeArray(bb, streams); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -800,10 +879,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St bb.writeByte(publisherId); bb.writeShort(publisherReferenceSize); if (publisherReferenceSize > 0) { - bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(publisherReference.getBytes(CHARSET)); } bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1142,10 +1221,7 @@ public Response subscribe( } int propertiesSize = 0; if (properties != null && !properties.isEmpty()) { - propertiesSize = 4; // size of the map - for (Map.Entry entry : properties.entrySet()) { - propertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length(); - } + propertiesSize = mapSize(properties); } length += propertiesSize; int correlationId = correlationSequence.getAndIncrement(); @@ -1157,20 +1233,14 @@ public Response subscribe( bb.writeInt(correlationId); bb.writeByte(subscriptionId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); bb.writeShort(offsetSpecification.getType()); if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) { bb.writeLong(offsetSpecification.getOffset()); } bb.writeShort(initialCredits); if (properties != null && !properties.isEmpty()) { - bb.writeInt(properties.size()); - for (Map.Entry entry : properties.entrySet()) { - bb.writeShort(entry.getKey().length()) - .writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8)) - .writeShort(entry.getValue().length()) - .writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8)); - } + writeMap(bb, properties); } OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); @@ -1205,9 +1275,9 @@ public void storeOffset(String reference, String stream, long offset) { bb.writeShort(encodeRequestCode(COMMAND_STORE_OFFSET)); bb.writeShort(VERSION_1); bb.writeShort(reference.length()); - bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); bb.writeLong(offset); channel.writeAndFlush(bb); } @@ -1230,9 +1300,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(reference.length()); - bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1271,9 +1341,9 @@ public long queryPublisherSequence(String publisherReference, String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(publisherReference.length()); - bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(publisherReference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1436,9 +1506,9 @@ public List route(String routingKey, String superStream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(routingKey.length()); - bb.writeBytes(routingKey.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(routingKey.getBytes(CHARSET)); bb.writeShort(superStream.length()); - bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1471,7 +1541,7 @@ public List partitions(String superStream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(superStream.length()); - bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1532,7 +1602,7 @@ StreamStatsResponse streamStats(String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb);