Skip to content

Commit

Permalink
INTERNAL: read while END/PIPE_ERROR received in the pipe operation
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Nov 28, 2024
1 parent 381b25a commit a752f86
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static boolean isBulkOperation(Operation op, Collection<Operation> ops)
return op.isBulkOperation();
}

public static String createCompositeMessage(List<Exception> exceptions) {
public static String createCompositeMessage(List<? extends Exception> exceptions) {
if (exceptions == null || exceptions.isEmpty()) {
throw new IllegalArgumentException("At least one exception must be specified");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -90,16 +91,21 @@ public Map<String, T> get(long duration,
MemcachedConnection.opsSucceeded(ops);
}

List<Exception> exceptions = new ArrayList<>();
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
exceptions.addAll(op.getExceptions());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
exceptions.add(new RuntimeException(op.getCancelCause()));
}
}

if (!exceptions.isEmpty()) {
throw new CompositeException(exceptions);
}

return failedResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class CompositeException extends ExecutionException {
private final ArrayList<Exception> exceptions = new ArrayList<>();
private final Throwable cause;

public CompositeException(List<Exception> exceptions) {
public CompositeException(List<? extends Exception> exceptions) {
super(ExceptionMessageFactory.createCompositeMessage(exceptions));

if (exceptions.size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -83,16 +84,21 @@ public Map<K, V> get(long duration, TimeUnit unit)
MemcachedConnection.opSucceeded(ops.iterator().next());
}

List<Exception> exceptions = new ArrayList<>();
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
exceptions.addAll(op.getExceptions());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
exceptions.add(new RuntimeException(op.getCancelCause()));
}
}

if (!exceptions.isEmpty()) {
throw new CompositeException(exceptions);
}

return failedResult;
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/net/spy/memcached/ops/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import net.spy.memcached.MemcachedNode;
import net.spy.memcached.RedirectHandler;
Expand All @@ -44,6 +45,11 @@ public interface Operation {
*/
OperationException getException();

/**
* Get the all exceptions that occurred (or empty if no exception occurred).
*/
List<OperationException> getExceptions();

/**
* Get the callback for this get operation.
*/
Expand Down
33 changes: 25 additions & 8 deletions src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import net.spy.memcached.MemcachedNode;
Expand All @@ -46,12 +49,12 @@ public abstract class BaseOperationImpl extends SpyObject {
*/
public static final OperationStatus CANCELLED =
new CancelledOperationStatus();
private final AtomicBoolean callbacked = new AtomicBoolean(false);
private final List<OperationException> exceptions = new ArrayList<>();
private OperationState state = OperationState.WRITE_QUEUED;
private ByteBuffer cmd = null;
private boolean cancelled = false;
private final AtomicBoolean callbacked = new AtomicBoolean(false);
private String cancelCause = null;
private OperationException exception = null;
protected OperationCallback callback = null;
private volatile MemcachedNode handlingNode = null;

Expand Down Expand Up @@ -85,11 +88,22 @@ public final boolean isCancelled() {
}

public final boolean hasErrored() {
return exception != null;
return !exceptions.isEmpty();
}

public final OperationException getException() {
return exception;
if (exceptions.isEmpty()) {
return null;
}
return exceptions.get(0);
}

public final List<OperationException> getExceptions() {
return Collections.unmodifiableList(exceptions);
}

protected final void addException(OperationException e) {
exceptions.add(e);
}

public final boolean cancel(String cause) {
Expand Down Expand Up @@ -244,7 +258,7 @@ protected void handleError(OperationErrorType eType, String line)
switch (eType) {
case GENERAL:
case SERVER:
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
exceptions.add(new OperationException(eType, line + " @ " + handlingNode.getNodeName()));
break;
case CLIENT:
if (line.contains("bad command line format")) {
Expand All @@ -255,13 +269,16 @@ protected void handleError(OperationErrorType eType, String line)
String[] cmdLines = new String(bytes).split("\r\n");
getLogger().error("Bad command: %s", cmdLines[0]);
}
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
exceptions.add(new OperationException(eType, line + " @ " + handlingNode.getNodeName()));
break;
default:
assert false;
}
transitionState(OperationState.COMPLETE);
throw exception;

if (!isPipeOperation()) {
transitionState(OperationState.COMPLETE);
throw exceptions.get(0);
}
}

public void handleRead(ByteBuffer data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import net.spy.memcached.ops.CollectionBulkInsertOperation;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
Expand Down Expand Up @@ -135,6 +137,12 @@ assert getState() == OperationState.READING
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (count > 0 && index + getExceptions().size() >= count) {
// If END|PIPE_ERROR not received after reading all responses,
// then throw exception to reconnect.
addException(new OperationException(
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.CollectionPipedInsertOperation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
Expand Down Expand Up @@ -145,6 +147,12 @@ assert getState() == OperationState.READING
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (count > 0 && index + getExceptions().size() >= count) {
// If END|PIPE_ERROR not received after reading all responses,
// then throw exception to reconnect.
addException(new OperationException(
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.CollectionPipedUpdateOperation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
Expand Down Expand Up @@ -141,6 +143,12 @@ assert getState() == OperationState.READING : "Read ``" + line
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (count > 0 && index + getExceptions().size() >= count) {
// If END|PIPE_ERROR not received after reading all responses,
// then throw exception to reconnect.
addException(new OperationException(
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.nio.ByteBuffer;

import net.spy.memcached.KeyUtil;
import net.spy.memcached.internal.CompositeException;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatusCode;
Expand Down Expand Up @@ -160,12 +162,23 @@ public void readFromBuffer(ByteBuffer data) throws IOException {
} else {
handleLine(line);
}

if (isPipeOperation()) {
validatePipeEnd();
}
} else { // OperationReadType.DATA
handleRead(data);
}
}
}

private void validatePipeEnd() throws OperationException {
if (hasErrored() && getState() == OperationState.COMPLETE) {
throw new OperationException(OperationErrorType.SERVER,
new CompositeException(getExceptions()).getMessage());
}
}

public abstract void handleLine(String line);

protected boolean hasSwitchedOver(String line) {
Expand Down
61 changes: 61 additions & 0 deletions src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,24 @@

package net.spy.memcached.protocol.ascii;

import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import net.spy.memcached.collection.CollectionPipedInsert;
import net.spy.memcached.ops.CollectionPipedInsertOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationStatus;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand Down Expand Up @@ -99,6 +109,57 @@ void testPartialLine() throws Exception {
assertEquals("this is a test", op.getCurrentLine());
}

@Test
void throwExceptionIfEndOrPipeErrorNotReceived() throws Exception {
String key = "testPipeLine";
CollectionPipedInsert.ListPipedInsert<String> insert =
new CollectionPipedInsert.ListPipedInsert<>(key, 0,
Arrays.asList("a", "b"), null, null);
OperationCallback cb = new CollectionPipedInsertOperation.Callback() {
@Override
public void receivedStatus(OperationStatus status) {
}

@Override
public void complete() {
}

@Override
public void gotStatus(Integer index, OperationStatus status) {
}
};
CollectionPipedInsertOperationImpl op =
new CollectionPipedInsertOperationImpl("test", insert, cb);
LinkedBlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211),
60, queue, queue, queue, 0L));

ByteBuffer b = ByteBuffer.allocate(40);
String line1 = "RESPONSE 2\r\n";
op.writeComplete();
b.put(line1.getBytes());
b.flip();
assertDoesNotThrow(() -> op.readFromBuffer(b));
b.clear();

String line2 = "SERVER_ERROR out of memory\r\n";
b.put(line2.getBytes());
b.flip();
assertDoesNotThrow(() -> op.readFromBuffer(b));
b.clear();

String line3 = "CLIENT_ERROR too large value\r\n";
b.put(line3.getBytes());
b.flip();
assertDoesNotThrow(() -> op.readFromBuffer(b));
b.clear();

String line4 = "RESPONSE 2\r\n";
b.put(line4.getBytes());
b.flip();
assertThrows(OperationException.class, () -> op.readFromBuffer(b));
}

private static class SimpleOp extends OperationImpl {

private final LinkedList<String> lines = new LinkedList<>();
Expand Down

0 comments on commit a752f86

Please sign in to comment.