Skip to content

Commit

Permalink
INTERNAL: read while END/PIPE_ERROR received in the pipe operation
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Nov 19, 2024
1 parent 132254c commit 20cf3d9
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -90,16 +91,21 @@ public Map<String, T> get(long duration,
MemcachedConnection.opsSucceeded(ops);
}

List<Exception> exceptions = new ArrayList<>();
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
exceptions.addAll(op.getExceptions());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
exceptions.add(new RuntimeException(op.getCancelCause()));
}
}

if (!exceptions.isEmpty()) {
throw new CompositeException(exceptions);
}

return failedResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class CompositeException extends ExecutionException {
private static final long serialVersionUID = -599478797582490012L;
private final ArrayList<Exception> exceptions = new ArrayList<>();

CompositeException(List<Exception> exceptions) {
public CompositeException(List<Exception> exceptions) {
super(ExceptionMessageFactory.createCompositeMessage(exceptions));
this.exceptions.addAll(exceptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -83,16 +84,21 @@ public Map<K, V> get(long duration, TimeUnit unit)
MemcachedConnection.opSucceeded(ops.iterator().next());
}

List<Exception> exceptions = new ArrayList<>();
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
exceptions.addAll(op.getExceptions());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
exceptions.add(new RuntimeException(op.getCancelCause()));
}
}

if (!exceptions.isEmpty()) {
throw new CompositeException(exceptions);
}

return failedResult;
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/net/spy/memcached/ops/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import net.spy.memcached.MemcachedNode;
import net.spy.memcached.RedirectHandler;
Expand All @@ -44,6 +45,11 @@ public interface Operation {
*/
OperationException getException();

/**
* Get the all exceptions that occurred (or empty if no exception occurred).
*/
List<OperationException> getExceptions();

/**
* Get the callback for this get operation.
*/
Expand Down
33 changes: 25 additions & 8 deletions src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import net.spy.memcached.MemcachedNode;
Expand All @@ -46,12 +49,12 @@ public abstract class BaseOperationImpl extends SpyObject {
*/
public static final OperationStatus CANCELLED =
new CancelledOperationStatus();
private final AtomicBoolean callbacked = new AtomicBoolean(false);
private final List<OperationException> exceptions = new ArrayList<>();
private OperationState state = OperationState.WRITE_QUEUED;
private ByteBuffer cmd = null;
private boolean cancelled = false;
private final AtomicBoolean callbacked = new AtomicBoolean(false);
private String cancelCause = null;
private OperationException exception = null;
protected OperationCallback callback = null;
private volatile MemcachedNode handlingNode = null;

Expand Down Expand Up @@ -85,11 +88,22 @@ public final boolean isCancelled() {
}

public final boolean hasErrored() {
return exception != null;
return !exceptions.isEmpty();
}

public final OperationException getException() {
return exception;
if (exceptions.isEmpty()) {
return null;
}
return exceptions.get(0);
}

public final List<OperationException> getExceptions() {
return Collections.unmodifiableList(exceptions);
}

protected final void addException(OperationException e) {
exceptions.add(e);
}

public final boolean cancel(String cause) {
Expand Down Expand Up @@ -244,7 +258,7 @@ protected void handleError(OperationErrorType eType, String line)
switch (eType) {
case GENERAL:
case SERVER:
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
exceptions.add(new OperationException(eType, line + " @ " + handlingNode.getNodeName()));
break;
case CLIENT:
if (line.contains("bad command line format")) {
Expand All @@ -255,13 +269,16 @@ protected void handleError(OperationErrorType eType, String line)
String[] cmdLines = new String(bytes).split("\r\n");
getLogger().error("Bad command: %s", cmdLines[0]);
}
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
exceptions.add(new OperationException(eType, line + " @ " + handlingNode.getNodeName()));
break;
default:
assert false;
}
transitionState(OperationState.COMPLETE);
throw exception;

if (!this.isPipeOperation()) {
transitionState(OperationState.COMPLETE);
throw exceptions.get(0);
}
}

public void handleRead(ByteBuffer data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import net.spy.memcached.ops.CollectionBulkInsertOperation;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
Expand Down Expand Up @@ -135,6 +137,12 @@ assert getState() == OperationState.READING
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (count > 0 && index + getExceptions().size() >= count) {
// If END|PIPE_ERROR not received after reading all responses,
// then throw exception to reconnect.
addException(new OperationException(
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.CollectionPipedInsertOperation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
Expand Down Expand Up @@ -145,6 +147,12 @@ assert getState() == OperationState.READING
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (count > 0 && index + getExceptions().size() >= count) {
// If END|PIPE_ERROR not received after reading all responses,
// then throw exception to reconnect.
addException(new OperationException(
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.CollectionPipedUpdateOperation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
Expand Down Expand Up @@ -141,6 +143,12 @@ assert getState() == OperationState.READING : "Read ``" + line
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (count > 0 && index + getExceptions().size() >= count) {
// If END|PIPE_ERROR not received after reading all responses,
// then throw exception to reconnect.
addException(new OperationException(
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ private OperationErrorType classifyError(String line) {
public void readFromBuffer(ByteBuffer data) throws IOException {
// Loop while there's data remaining to get it all drained.
while (data.remaining() > 0) {
if (isPipeOperation() && hasErrored() && getState() == OperationState.COMPLETE) {
throw getException();
}
if (getState() == OperationState.COMPLETE ||
getState() == OperationState.MOVING || // ENABLE_REPLICATION
getState() == OperationState.REDIRECT) { // ENABLE_MIGRATION
Expand Down

0 comments on commit 20cf3d9

Please sign in to comment.