Skip to content

Commit

Permalink
ENHANCE: Change creating callback objects multiple times in pipe oper…
Browse files Browse the repository at this point in the history
…ation.
  • Loading branch information
brido4125 committed Mar 18, 2024
1 parent 1fcd2d7 commit 00e791f
Showing 1 changed file with 66 additions and 64 deletions.
130 changes: 66 additions & 64 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -849,41 +849,42 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);

for (int i = 0; i < updateList.size(); i++) {
final CollectionPipedUpdate<T> update = updateList.get(i);
final int idx = i;

Operation op = opFact.collectionPipedUpdate(key, update,
new CollectionPipedUpdateOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}
CollectionPipedUpdateOperation.Callback callback = new CollectionPipedUpdateOperation.Callback() {
private int opIndex = 0;

// complete
public void complete() {
latch.countDown();
}
@Override
public void receivedStatus(OperationStatus status) {
opIndex += 1;
CollectionOperationStatus cstatus;

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

@Override
public void complete() {
latch.countDown();
}

@Override
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIndex * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIndex * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (CollectionPipedUpdate<T> pipedUpdate : updateList) {
Operation op = opFact.collectionPipedUpdate(key, pipedUpdate, callback);
rv.addOperation(op);
addOp(key, op);
}
Expand Down Expand Up @@ -3291,41 +3292,42 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
final int idx = i;
CollectionPipedInsertOperation.Callback callback = new CollectionPipedInsertOperation.Callback() {
private int opIdx = 0;

Operation op = opFact.collectionPipedInsert(key, insert,
new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
@Override
public void receivedStatus(OperationStatus status) {
opIdx += 1;
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
latch.countDown();
}
@Override
public void complete() {
latch.countDown();
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
@Override
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (CollectionPipedInsert<T> pipedInsert : insertList) {
Operation op = opFact.collectionPipedInsert(key, pipedInsert, callback);
rv.addOperation(op);
addOp(key, op);
}
Expand Down

0 comments on commit 00e791f

Please sign in to comment.