Skip to content

Commit

Permalink
INTERNAL: read while END/PIPE_ERROR received in the pipe operation
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Dec 24, 2024
1 parent d84094a commit de7cd25
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -90,16 +91,21 @@ public Map<String, T> get(long duration,
MemcachedConnection.opsSucceeded(ops);
}

List<Exception> exceptions = new ArrayList<>();
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
exceptions.add(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
exceptions.add(new RuntimeException(op.getCancelCause()));
}
}

if (!exceptions.isEmpty()) {
throw new CompositeException(exceptions);
}

return failedResult;
}

Expand Down
38 changes: 17 additions & 21 deletions src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import net.spy.memcached.ops.OperationType;
import net.spy.memcached.ops.StatusCode;

import static net.spy.memcached.ops.OperationErrorType.CLIENT;

/**
* Base class for protocol-specific operation implementations.
*/
Expand All @@ -51,7 +53,7 @@ public abstract class BaseOperationImpl extends SpyObject {
private boolean cancelled = false;
private final AtomicBoolean callbacked = new AtomicBoolean(false);
private String cancelCause = null;
private OperationException exception = null;
protected OperationException exception = null;
private OperationCallback callback = null;
private volatile MemcachedNode handlingNode = null;

Expand Down Expand Up @@ -239,31 +241,25 @@ public final void writeComplete() {
public abstract void readFromBuffer(ByteBuffer data) throws IOException;

protected void handleError(OperationErrorType eType, String line)
throws IOException {
throws OperationException {
getLogger().error("Error: %s by %s", line, this);
switch (eType) {
case GENERAL:
case SERVER:
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
break;
case CLIENT:
if (line.contains("bad command line format")) {
initialize();
byte[] bytes = new byte[cmd.remaining()];
cmd.get(bytes);

String[] cmdLines = new String(bytes).split("\r\n");
getLogger().error("Bad command: %s", cmdLines[0]);
}
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
break;
default:
assert false;
}
exception = createException(eType, line);
transitionState(OperationState.COMPLETE);
throw exception;
}

protected final OperationException createException(OperationErrorType eType, String line) {
if (eType == CLIENT && line.contains("bad command line format")) {
initialize();
byte[] bytes = new byte[cmd.remaining()];
cmd.get(bytes);

String[] cmdLines = new String(bytes).split("\r\n");
getLogger().error("Bad command: %s", cmdLines[0]);
}
return new OperationException(eType, line + " @ " + handlingNode.getNodeName());
}

public void handleRead(ByteBuffer data) {
assert false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.CollectionPipedInsertOperation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationErrorType;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
Expand Down Expand Up @@ -89,33 +90,29 @@ public CollectionPipedInsertOperationImpl(String key,
}
setOperationType(OperationType.WRITE);
}

@Override
public void handleLine(String line) {
assert getState() == OperationState.READING
: "Read ``" + line + "'' when in " + getState() + " state";

/* ENABLE_REPLICATION if */
if (hasSwitchedOver(line)) {
this.insert.setNextOpIndex(index);
prepareSwitchover(line);
return;
}
/* ENABLE_REPLICATION end */
/* ENABLE_MIGRATION if */
if (hasNotMyKey(line)) {
// Only one NOT_MY_KEY is provided in response of single key piped operation when redirection.
addRedirectSingleKeyOperation(line, key);
if (insert.isNotPiped()) {
if (insert.isNotPiped()) {
// insert object contains only one command.

/* ENABLE_REPLICATION if */
if (hasSwitchedOver(line)) {
prepareSwitchover(line);
return;
}
/* ENABLE_REPLICATION end */

/* ENABLE_MIGRATION if */
if (hasNotMyKey(line)) {
addRedirectSingleKeyOperation(line, key);
transitionState(OperationState.REDIRECT);
} else {
insert.setNextOpIndex(index);
return;
}
return;
}
/* ENABLE_MIGRATION end */
/* ENABLE_MIGRATION end */

if (insert.isNotPiped()) {
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
Expand All @@ -126,26 +123,8 @@ assert getState() == OperationState.READING
cb.receivedStatus(FAILED_END);
}
transitionState(OperationState.COMPLETE);
return;
}

/*
RESPONSE <count>\r\n
<status of the 1st pipelined command>\r\n
[ ... ]
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
// insert object contains multiple commands
getLogger().debug("Got line %s", line);

// TODO server should be fixed
Expand All @@ -155,16 +134,71 @@ assert getState() == OperationState.READING
String[] stuff = line.split(" ");
assert "RESPONSE".equals(stuff[0]);
count = Integer.parseInt(stuff[1]);
} else {
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
setReadType(OperationReadType.DATA);
}
}

if (!status.isSuccess()) {
cb.gotStatus(index, status);
successAll = false;
@Override
public void handleRead(ByteBuffer bb) {
while (bb.remaining() > 0) {
try {
String line = getLineFromBuffer(bb);
if (line == null) {
break;
}
OperationErrorType eType = classifyError(line);
if (eType != null) {
this.exception = createException(eType, line);
continue;
}

/* ENABLE_REPLICATION if */
if (hasSwitchedOver(line)) {
this.insert.setNextOpIndex(index);
prepareSwitchover(line);
return;
}
/* ENABLE_REPLICATION end */

/* ENABLE_MIGRATION if */
if (hasNotMyKey(line)) {
// Only one NOT_MY_KEY is provided in response of single key piped operation when redirection.
addRedirectSingleKeyOperation(line, key);
if (insert.isNotPiped()) {
transitionState(OperationState.REDIRECT);
} else {
insert.setNextOpIndex(index);
}
return;
}
/* ENABLE_MIGRATION end */

if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
} else {
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);

if (!status.isSuccess()) {
cb.gotStatus(index, status);
successAll = false;
}
index++;
}
} catch (Exception e) {
getLogger().error("Failed to parse line: %s", e.getMessage());
transitionState(OperationState.COMPLETE);
return;
}
index++;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected final void setArguments(ByteBuffer bb, Object... args) {
bb.put(CRLF);
}

private String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingException {
protected final String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingException {
boolean lineFound = false;
while (data.remaining() > 0) {
byte b = data.get();
Expand All @@ -128,7 +128,7 @@ private String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingExce
return null;
}

private OperationErrorType classifyError(String line) {
protected final OperationErrorType classifyError(String line) {
OperationErrorType rv = null;
if (line.startsWith("ERROR")) {
rv = OperationErrorType.GENERAL;
Expand Down Expand Up @@ -163,6 +163,9 @@ public void readFromBuffer(ByteBuffer data) throws IOException {
} else { // OperationReadType.DATA
handleRead(data);
}
if (hasErrored() && isPipeOperation() && getState() == OperationState.COMPLETE) {
throw getException();
}
}
}

Expand Down
55 changes: 55 additions & 0 deletions src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,24 @@

package net.spy.memcached.protocol.ascii;

import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import net.spy.memcached.collection.CollectionPipedInsert;
import net.spy.memcached.ops.CollectionPipedInsertOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationStatus;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand Down Expand Up @@ -99,6 +109,51 @@ void testPartialLine() throws Exception {
assertEquals("this is a test", op.getCurrentLine());
}

@Test
void throwExceptionAfterReadingEndOrPipeError() throws Exception {
String key = "testPipeLine";
CollectionPipedInsert.ListPipedInsert<String> insert =
new CollectionPipedInsert.ListPipedInsert<>(key, 0,
Arrays.asList("a", "b"), null, null);
OperationCallback cb = new CollectionPipedInsertOperation.Callback() {
@Override
public void receivedStatus(OperationStatus status) {
}

@Override
public void complete() {
}

@Override
public void gotStatus(Integer index, OperationStatus status) {
}
};
CollectionPipedInsertOperationImpl op =
new CollectionPipedInsertOperationImpl("test", insert, cb);
LinkedBlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211),
60, queue, queue, queue, 0L));

ByteBuffer b = ByteBuffer.allocate(40);
String line1 = "RESPONSE 2\r\n";
op.writeComplete();
b.put(line1.getBytes());
b.flip();
assertDoesNotThrow(() -> op.readFromBuffer(b));
b.clear();

String line2 = "SERVER_ERROR out of memory\r\n";
b.put(line2.getBytes());
b.flip();
assertDoesNotThrow(() -> op.readFromBuffer(b));
b.clear();

String line4 = "PIPE_ERROR failed\r\n";
b.put(line4.getBytes());
b.flip();
assertThrows(OperationException.class, () -> op.readFromBuffer(b));
}

private static class SimpleOp extends OperationImpl {

private final LinkedList<String> lines = new LinkedList<>();
Expand Down

0 comments on commit de7cd25

Please sign in to comment.