Skip to content

Commit

Permalink
ENHANCE: Change creating callback objects multiple times in pipe oper…
Browse files Browse the repository at this point in the history
…ation.
  • Loading branch information
brido4125 committed Apr 5, 2024
1 parent ae2a82f commit eb87cef
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 150 deletions.
211 changes: 90 additions & 121 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -815,41 +815,40 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);

for (int i = 0; i < updateList.size(); i++) {
final CollectionPipedUpdate<T> update = updateList.get(i);
final int idx = i;

Operation op = opFact.collectionPipedUpdate(key, update,
new CollectionPipedUpdateOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}
CollectionPipedUpdateOperation.Callback callback = new CollectionPipedUpdateOperation.Callback() {

// complete
public void complete() {
latch.countDown();
}
@Override
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

@Override
public void complete() {
latch.countDown();
}

@Override
public void gotStatus(Integer index, int opIdx, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (CollectionPipedUpdate<T> pipedUpdate : updateList) {
Operation op = opFact.collectionPipedUpdate(key, pipedUpdate, callback);
rv.addOperation(op);
addOp(key, op);
}
Expand Down Expand Up @@ -1778,15 +1777,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(elements,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new BTreePipedInsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (Map<Long, T> elementMap : list) {
insertList.add(new BTreePipedInsert<T>(key, elementMap, attributesForCreate, tc));
}
for (int i = 0; i < list.size(); i++) {
insertList.add(new BTreePipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1802,15 +1797,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(elements,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new ByteArraysBTreePipedInsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (List<Element<T>> elementList : list) {
insertList.add(new ByteArraysBTreePipedInsert<T>(key, elementList, attributesForCreate, tc));
}
for (int i = 0; i < list.size(); i++) {
insertList.add(new ByteArraysBTreePipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1830,15 +1821,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedMap<String, T> list = new PartitionedMap<String, T>(elements,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new MapPipedInsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedMap<String, T> list = new PartitionedMap<String, T>(
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (Map<String, T> elementMap : list) {
insertList.add(new MapPipedInsert<T>(key, elementMap, attributesForCreate, tc));
}
for (int i = 0 ; i < list.size(); i++) {
insertList.add(new MapPipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1854,15 +1841,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new ListPipedInsert<T>(key, index, valueList, attributesForCreate, tc));
} else {
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (List<T> elementList : list) {
insertList.add(new ListPipedInsert<T>(key, index, elementList, attributesForCreate, tc));
}
for (int i = 0 ; i < list.size(); i++) {
insertList.add(new ListPipedInsert<T>(key, index, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1878,15 +1861,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncSopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new SetPipedInsert<T>(key, valueList, attributesForCreate, tc));
} else {
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (List<T> elementList : list) {
insertList.add(new SetPipedInsert<T>(key, elementList, attributesForCreate, tc));
}
for (int i = 0 ; i < list.size(); i++) {
insertList.add(new SetPipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand Down Expand Up @@ -2326,15 +2305,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
}

List<CollectionPipedUpdate<T>> updateList = new ArrayList<CollectionPipedUpdate<T>>();
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(elements,
CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) {
updateList.add(new BTreePipedUpdate<T>(key, elements, tc));
} else {
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);
for (List<Element<T>> elementList : list) {
updateList.add(new BTreePipedUpdate<T>(key, elementList, tc));
}
for (int i = 0; i < list.size(); i++) {
updateList.add(new BTreePipedUpdate<T>(key, list.get(i), i, tc));
}
return asyncCollectionPipedUpdate(key, updateList);
}
Expand All @@ -2359,16 +2334,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
}

List<CollectionPipedUpdate<T>> updateList = new ArrayList<CollectionPipedUpdate<T>>();
PartitionedMap<String, T> list = new PartitionedMap<String, T>(elements,
CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
updateList.add(new MapPipedUpdate<T>(key, elements, tc));
} else {
PartitionedMap<String, T> list = new PartitionedMap<String, T>(
elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);

for (Map<String, T> elementMap : list) {
updateList.add(new MapPipedUpdate<T>(key, elementMap, tc));
}
for (int i = 0 ; i < list.size(); i++) {
updateList.add(new MapPipedUpdate<T>(key, list.get(i), i, tc));
}
return asyncCollectionPipedUpdate(key, updateList);
}
Expand Down Expand Up @@ -3199,41 +3169,40 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
final int idx = i;
CollectionPipedInsertOperation.Callback callback = new CollectionPipedInsertOperation.Callback() {

Operation op = opFact.collectionPipedInsert(key, insert,
new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
@Override
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
latch.countDown();
}
@Override
public void complete() {
latch.countDown();
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
@Override
public void gotStatus(Integer index, int opIdx, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (CollectionPipedInsert<T> pipedInsert : insertList) {
Operation op = opFact.collectionPipedInsert(key, pipedInsert, callback);
rv.addOperation(op);
addOp(key, op);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,21 @@ public abstract class CollectionPipedInsert<T> extends CollectionPipe {
protected final CollectionAttributes attribute;
protected final Transcoder<T> tc;

protected CollectionPipedInsert(String key, CollectionAttributes attribute,
private final int opIdx;

protected CollectionPipedInsert(String key, int opIdx, CollectionAttributes attribute,
Transcoder<T> tc, int itemCount) {
super(itemCount);
this.key = key;
this.opIdx = opIdx;
this.attribute = attribute;
this.tc = tc;
}

public int getOpIdx() {
return opIdx;
}

/**
*
*/
Expand All @@ -53,9 +60,9 @@ public static class ListPipedInsert<T> extends CollectionPipedInsert<T> {
private final Collection<T> list;
private final int index;

public ListPipedInsert(String key, int index, Collection<T> list,
public ListPipedInsert(String key, int index, Collection<T> list, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, list.size());
super(key, opIdx, attr, tc, list.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.list, attr.getOverflowAction());
}
Expand Down Expand Up @@ -115,9 +122,9 @@ public static class SetPipedInsert<T> extends CollectionPipedInsert<T> {
private static final String COMMAND = "sop insert";
private final Collection<T> set;

public SetPipedInsert(String key, Collection<T> set,
public SetPipedInsert(String key, Collection<T> set, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, set.size());
super(key, opIdx, attr, tc, set.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.set, attr.getOverflowAction());
}
Expand Down Expand Up @@ -172,9 +179,9 @@ public static class BTreePipedInsert<T> extends CollectionPipedInsert<T> {
private static final String COMMAND = "bop insert";
private final Map<Long, T> map;

public BTreePipedInsert(String key, Map<Long, T> map,
public BTreePipedInsert(String key, Map<Long, T> map, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, map.size());
super(key, opIdx, attr, tc, map.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.btree, attr.getOverflowAction());
}
Expand Down Expand Up @@ -234,9 +241,9 @@ public static class ByteArraysBTreePipedInsert<T> extends
private static final String COMMAND = "bop insert";
private final List<Element<T>> elements;

public ByteArraysBTreePipedInsert(String key, List<Element<T>> elements,
public ByteArraysBTreePipedInsert(String key, List<Element<T>> elements, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, elements.size());
super(key, opIdx, attr, tc, elements.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.btree, attr.getOverflowAction());
}
Expand Down Expand Up @@ -296,9 +303,9 @@ public static class MapPipedInsert<T> extends CollectionPipedInsert<T> {
private static final String COMMAND = "mop insert";
private final Map<String, T> map;

public MapPipedInsert(String key, Map<String, T> map,
public MapPipedInsert(String key, Map<String, T> map, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, map.size());
super(key, opIdx, attr, tc, map.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.map, attr.getOverflowAction());
}
Expand Down
Loading

0 comments on commit eb87cef

Please sign in to comment.