Skip to content

Commit

Permalink
Support super stream creation/deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Oct 27, 2023
1 parent 4cd9d53 commit 7e73a35
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 71 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/rabbitmq/stream/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public final class Constants {
public static final short COMMAND_CONSUMER_UPDATE = 26;
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
public static final short COMMAND_STREAM_STATS = 28;
public static final short COMMAND_CREATE_SUPER_STREAM = 29;
public static final short COMMAND_DELETE_SUPER_STREAM = 30;

public static final short VERSION_1 = 1;
public static final short VERSION_2 = 2;
Expand Down
212 changes: 141 additions & 71 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.StreamSupport.stream;

import com.rabbitmq.stream.AuthenticationFailureException;
import com.rabbitmq.stream.ByteCapacity;
Expand Down Expand Up @@ -83,16 +85,10 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -127,6 +123,7 @@
*/
public class Client implements AutoCloseable {

private static final Charset CHARSET = StandardCharsets.UTF_8;
public static final int DEFAULT_PORT = 5552;
public static final int DEFAULT_TLS_PORT = 5551;
static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK =
Expand Down Expand Up @@ -446,12 +443,7 @@ int maxFrameSize() {
}

private Map<String, String> peerProperties() {
int clientPropertiesSize = 4; // size of the map, always there
if (!clientProperties.isEmpty()) {
for (Map.Entry<String, String> entry : clientProperties.entrySet()) {
clientPropertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length();
}
}
int clientPropertiesSize = mapSize(this.clientProperties);
int length = 2 + 2 + 4 + clientPropertiesSize;
int correlationId = correlationSequence.incrementAndGet();
try {
Expand All @@ -460,13 +452,7 @@ private Map<String, String> peerProperties() {
bb.writeShort(encodeRequestCode(COMMAND_PEER_PROPERTIES));
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeInt(clientProperties.size());
for (Map.Entry<String, String> entry : clientProperties.entrySet()) {
bb.writeShort(entry.getKey().length())
.writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8))
.writeShort(entry.getValue().length())
.writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8));
}
writeMap(bb, this.clientProperties);
OutstandingRequest<Map<String, String>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -540,7 +526,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(saslMechanism.getName().length());
bb.writeBytes(saslMechanism.getName().getBytes(StandardCharsets.UTF_8));
bb.writeBytes(saslMechanism.getName().getBytes(CHARSET));
if (challengeResponse == null) {
bb.writeInt(-1);
} else {
Expand Down Expand Up @@ -570,7 +556,7 @@ private Map<String, String> open(String virtualHost) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(virtualHost.length());
bb.writeBytes(virtualHost.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(virtualHost.getBytes(CHARSET));
OutstandingRequest<OpenResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -612,7 +598,7 @@ private void sendClose(short code, String reason) {
bb.writeInt(correlationId);
bb.writeShort(code);
bb.writeShort(reason.length());
bb.writeBytes(reason.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(reason.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -662,10 +648,7 @@ public Response create(String stream) {
}

public Response create(String stream, Map<String, String> arguments) {
int length = 2 + 2 + 4 + 2 + stream.length() + 4;
for (Map.Entry<String, String> argument : arguments.entrySet()) {
length = length + 2 + argument.getKey().length() + 2 + argument.getValue().length();
}
int length = 2 + 2 + 4 + 2 + stream.length() + mapSize(arguments);
int correlationId = correlationSequence.incrementAndGet();
try {
ByteBuf bb = allocate(length + 4);
Expand All @@ -674,14 +657,8 @@ public Response create(String stream, Map<String, String> arguments) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeInt(arguments.size());
for (Map.Entry<String, String> argument : arguments.entrySet()) {
bb.writeShort(argument.getKey().length());
bb.writeBytes(argument.getKey().getBytes(StandardCharsets.UTF_8));
bb.writeShort(argument.getValue().length());
bb.writeBytes(argument.getValue().getBytes(StandardCharsets.UTF_8));
}
bb.writeBytes(stream.getBytes(CHARSET));
writeMap(bb, arguments);
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand All @@ -696,6 +673,116 @@ public Response create(String stream, Map<String, String> arguments) {
}
}

Response createSuperStream(
String superStream,
List<String> partitions,
List<String> routingKeys,
Map<String, String> arguments) {
if (partitions.isEmpty() || routingKeys.isEmpty()) {
throw new IllegalArgumentException(
"Partitions and routing keys of a super stream cannot be empty");
}
if (partitions.size() != routingKeys.size()) {
throw new IllegalArgumentException(
"Partitions and routing keys of a super stream must have "
+ "the same number of elements");
}
int length =
2
+ 2
+ 4
+ 2
+ superStream.length()
+ collectionSize(partitions)
+ collectionSize(routingKeys)
+ mapSize(arguments);
int correlationId = correlationSequence.incrementAndGet();
try {
ByteBuf bb = allocate(length + 4);
bb.writeInt(length);
bb.writeShort(encodeRequestCode(COMMAND_CREATE_SUPER_STREAM));
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(superStream.length());
bb.writeBytes(superStream.getBytes(CHARSET));
writeCollection(bb, partitions);
writeCollection(bb, routingKeys);
writeMap(bb, arguments);
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
throw new StreamException(format("Error while creating super stream '%s'", superStream), e);
}
}

Response deleteSuperStream(String superStream) {
int length = 2 + 2 + 4 + 2 + superStream.length();
int correlationId = correlationSequence.incrementAndGet();
try {
ByteBuf bb = allocate(length + 4);
bb.writeInt(length);
bb.writeShort(encodeRequestCode(COMMAND_DELETE_SUPER_STREAM));
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(superStream.length());
bb.writeBytes(superStream.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
throw new StreamException(format("Error while deleting stream '%s'", superStream), e);
}
}

private static int collectionSize(Collection<String> elements) {
return 4 + elements.stream().mapToInt(v -> 2 + v.length()).sum();
}

private static int arraySize(String... elements) {
return 4 + collectionSize(asList(elements));
}

private static int mapSize(Map<String, String> elements) {
return 4
+ elements.entrySet().stream()
.mapToInt(e -> 2 + e.getKey().length() + 2 + e.getValue().length())
.sum();
}

private static ByteBuf writeCollection(ByteBuf bb, Collection<String> elements) {
bb.writeInt(elements.size());
elements.forEach(e -> bb.writeShort(e.length()).writeBytes(e.getBytes(CHARSET)));
return bb;
}

private static ByteBuf writeArray(ByteBuf bb, String... elements) {
return writeCollection(bb, asList(elements));
}

private static ByteBuf writeMap(ByteBuf bb, Map<String, String> elements) {
bb.writeInt(elements.size());
elements.forEach(
(key, value) ->
bb.writeShort(key.length())
.writeBytes(key.getBytes(CHARSET))
.writeShort(value.length())
.writeBytes(value.getBytes(CHARSET)));
return bb;
}

ByteBuf allocate(ByteBufAllocator allocator, int capacity) {
if (frameSizeCopped && capacity > this.maxFrameSize()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -729,7 +816,7 @@ public Response delete(String stream) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand All @@ -748,23 +835,15 @@ public Map<String, StreamMetadata> metadata(String... streams) {
if (streams == null || streams.length == 0) {
throw new IllegalArgumentException("At least one stream must be specified");
}
int length = 2 + 2 + 4 + 4; // API code, version, correlation ID, size of array
for (String stream : streams) {
length += 2;
length += stream.length();
}
int length = 2 + 2 + 4 + arraySize(streams); // API code, version, correlation ID, array size
int correlationId = correlationSequence.incrementAndGet();
try {
ByteBuf bb = allocate(length + 4);
bb.writeInt(length);
bb.writeShort(encodeRequestCode(COMMAND_METADATA));
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeInt(streams.length);
for (String stream : streams) {
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
}
writeArray(bb, streams);
OutstandingRequest<Map<String, StreamMetadata>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -800,10 +879,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
bb.writeByte(publisherId);
bb.writeShort(publisherReferenceSize);
if (publisherReferenceSize > 0) {
bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(publisherReference.getBytes(CHARSET));
}
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -1142,10 +1221,7 @@ public Response subscribe(
}
int propertiesSize = 0;
if (properties != null && !properties.isEmpty()) {
propertiesSize = 4; // size of the map
for (Map.Entry<String, String> entry : properties.entrySet()) {
propertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length();
}
propertiesSize = mapSize(properties);
}
length += propertiesSize;
int correlationId = correlationSequence.getAndIncrement();
Expand All @@ -1157,20 +1233,14 @@ public Response subscribe(
bb.writeInt(correlationId);
bb.writeByte(subscriptionId);
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(stream.getBytes(CHARSET));
bb.writeShort(offsetSpecification.getType());
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
bb.writeLong(offsetSpecification.getOffset());
}
bb.writeShort(initialCredits);
if (properties != null && !properties.isEmpty()) {
bb.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
bb.writeShort(entry.getKey().length())
.writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8))
.writeShort(entry.getValue().length())
.writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8));
}
writeMap(bb, properties);
}
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
Expand Down Expand Up @@ -1205,9 +1275,9 @@ public void storeOffset(String reference, String stream, long offset) {
bb.writeShort(encodeRequestCode(COMMAND_STORE_OFFSET));
bb.writeShort(VERSION_1);
bb.writeShort(reference.length());
bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(reference.getBytes(CHARSET));
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(stream.getBytes(CHARSET));
bb.writeLong(offset);
channel.writeAndFlush(bb);
}
Expand All @@ -1230,9 +1300,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(reference.length());
bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(reference.getBytes(CHARSET));
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<QueryOffsetResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -1271,9 +1341,9 @@ public long queryPublisherSequence(String publisherReference, String stream) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(publisherReference.length());
bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(publisherReference.getBytes(CHARSET));
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<QueryPublisherSequenceResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -1436,9 +1506,9 @@ public List<String> route(String routingKey, String superStream) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(routingKey.length());
bb.writeBytes(routingKey.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(routingKey.getBytes(CHARSET));
bb.writeShort(superStream.length());
bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(superStream.getBytes(CHARSET));
OutstandingRequest<List<String>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -1471,7 +1541,7 @@ public List<String> partitions(String superStream) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(superStream.length());
bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(superStream.getBytes(CHARSET));
OutstandingRequest<List<String>> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down Expand Up @@ -1532,7 +1602,7 @@ StreamStatsResponse streamStats(String stream) {
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
bb.writeShort(stream.length());
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
bb.writeBytes(stream.getBytes(CHARSET));
OutstandingRequest<StreamStatsResponse> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
channel.writeAndFlush(bb);
Expand Down

0 comments on commit 7e73a35

Please sign in to comment.