Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENHANCE: Change creating callback objects multiple times in pipe operation. #727

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 90 additions & 121 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -815,41 +815,40 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);

for (int i = 0; i < updateList.size(); i++) {
final CollectionPipedUpdate<T> update = updateList.get(i);
final int idx = i;

Operation op = opFact.collectionPipedUpdate(key, update,
new CollectionPipedUpdateOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}
CollectionPipedUpdateOperation.Callback callback = new CollectionPipedUpdateOperation.Callback() {

// complete
public void complete() {
latch.countDown();
}
@Override
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

@Override
public void complete() {
latch.countDown();
}

@Override
public void gotStatus(Integer index, int opIdx, OperationStatus status) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

인자 타입이 Integer, int가 혼용 사용되고 있는 데, int 타입으로 통일하면 될 것 같습니다.

참고로, 아래와 같이 구현할 수 있습니다.

      public void gotStatus(int opIdx, int elemIdx, OperationStatus status) {
        int index = (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) + elemIdx;
        if (status instanceof CollectionOperationStatus) {
          rv.addEachResult(index, (CollectionOperationStatus) status);
        } else {
          rv.addEachResult(index, new CollectionOperationStatus(status));
        }
      }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아니면, gotStatus()은 index 인자를 받고,
해당되는 index 값은 gotStatus() 호출하는 쪽에서 계산하여 넘기는 것이 나을 것 같습니다.

      public void gotStatus(int index, OperationStatus status) {
        if (status instanceof CollectionOperationStatus) {
          rv.addEachResult(index, (CollectionOperationStatus) status);
        } else {
          rv.addEachResult(index, new CollectionOperationStatus(status));
        }
      }

if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (CollectionPipedUpdate<T> pipedUpdate : updateList) {
Operation op = opFact.collectionPipedUpdate(key, pipedUpdate, callback);
jhpark816 marked this conversation as resolved.
Show resolved Hide resolved
rv.addOperation(op);
addOp(key, op);
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -1778,15 +1777,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(elements,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionedMap 구현은 주어진 map 크기가 주어진 size 보다 작거나 같을 경우, 해당 map을 그대로 리턴하도록 최적화합시다.

CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new BTreePipedInsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (Map<Long, T> elementMap : list) {
insertList.add(new BTreePipedInsert<T>(key, elementMap, attributesForCreate, tc));
}
for (int i = 0; i < list.size(); i++) {
insertList.add(new BTreePipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

인자 순서 검토 바람: list.get(i), i => i, list.get(i)

}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1802,15 +1797,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(elements,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new ByteArraysBTreePipedInsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (List<Element<T>> elementList : list) {
insertList.add(new ByteArraysBTreePipedInsert<T>(key, elementList, attributesForCreate, tc));
}
for (int i = 0; i < list.size(); i++) {
insertList.add(new ByteArraysBTreePipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
jhpark816 marked this conversation as resolved.
Show resolved Hide resolved
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1830,15 +1821,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedMap<String, T> list = new PartitionedMap<String, T>(elements,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new MapPipedInsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedMap<String, T> list = new PartitionedMap<String, T>(
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (Map<String, T> elementMap : list) {
insertList.add(new MapPipedInsert<T>(key, elementMap, attributesForCreate, tc));
}
for (int i = 0 ; i < list.size(); i++) {
insertList.add(new MapPipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1854,15 +1841,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new ListPipedInsert<T>(key, index, valueList, attributesForCreate, tc));
} else {
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (List<T> elementList : list) {
insertList.add(new ListPipedInsert<T>(key, index, elementList, attributesForCreate, tc));
}
for (int i = 0 ; i < list.size(); i++) {
insertList.add(new ListPipedInsert<T>(key, index, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand All @@ -1878,15 +1861,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncSopPip
}

List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
insertList.add(new SetPipedInsert<T>(key, valueList, attributesForCreate, tc));
} else {
PartitionedList<T> list = new PartitionedList<T>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (List<T> elementList : list) {
insertList.add(new SetPipedInsert<T>(key, elementList, attributesForCreate, tc));
}
for (int i = 0 ; i < list.size(); i++) {
insertList.add(new SetPipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
}
return asyncCollectionPipedInsert(key, insertList);
}
Expand Down Expand Up @@ -2326,15 +2305,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
}

List<CollectionPipedUpdate<T>> updateList = new ArrayList<CollectionPipedUpdate<T>>();
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(elements,
CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) {
updateList.add(new BTreePipedUpdate<T>(key, elements, tc));
} else {
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);
for (List<Element<T>> elementList : list) {
updateList.add(new BTreePipedUpdate<T>(key, elementList, tc));
}
for (int i = 0; i < list.size(); i++) {
updateList.add(new BTreePipedUpdate<T>(key, list.get(i), i, tc));
}
return asyncCollectionPipedUpdate(key, updateList);
}
Expand All @@ -2359,16 +2334,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
}

List<CollectionPipedUpdate<T>> updateList = new ArrayList<CollectionPipedUpdate<T>>();
PartitionedMap<String, T> list = new PartitionedMap<String, T>(elements,
CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);

if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
updateList.add(new MapPipedUpdate<T>(key, elements, tc));
} else {
PartitionedMap<String, T> list = new PartitionedMap<String, T>(
elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);

for (Map<String, T> elementMap : list) {
updateList.add(new MapPipedUpdate<T>(key, elementMap, tc));
}
for (int i = 0 ; i < list.size(); i++) {
updateList.add(new MapPipedUpdate<T>(key, list.get(i), i, tc));
}
return asyncCollectionPipedUpdate(key, updateList);
}
Expand Down Expand Up @@ -3199,41 +3169,40 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
final int idx = i;
CollectionPipedInsertOperation.Callback callback = new CollectionPipedInsertOperation.Callback() {

Operation op = opFact.collectionPipedInsert(key, insert,
new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
@Override
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
latch.countDown();
}
@Override
public void complete() {
latch.countDown();
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
@Override
public void gotStatus(Integer index, int opIdx, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (CollectionPipedInsert<T> pipedInsert : insertList) {
Operation op = opFact.collectionPipedInsert(key, pipedInsert, callback);
rv.addOperation(op);
addOp(key, op);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,21 @@ public abstract class CollectionPipedInsert<T> extends CollectionPipe {
protected final CollectionAttributes attribute;
protected final Transcoder<T> tc;

protected CollectionPipedInsert(String key, CollectionAttributes attribute,
private final int opIdx;

protected CollectionPipedInsert(String key, int opIdx, CollectionAttributes attribute,
Transcoder<T> tc, int itemCount) {
super(itemCount);
this.key = key;
this.opIdx = opIdx;
this.attribute = attribute;
this.tc = tc;
}

public int getOpIdx() {
return opIdx;
}

/**
*
*/
Expand All @@ -53,9 +60,9 @@ public static class ListPipedInsert<T> extends CollectionPipedInsert<T> {
private final Collection<T> list;
private final int index;

public ListPipedInsert(String key, int index, Collection<T> list,
public ListPipedInsert(String key, int index, Collection<T> list, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, list.size());
super(key, opIdx, attr, tc, list.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.list, attr.getOverflowAction());
}
Expand Down Expand Up @@ -115,9 +122,9 @@ public static class SetPipedInsert<T> extends CollectionPipedInsert<T> {
private static final String COMMAND = "sop insert";
private final Collection<T> set;

public SetPipedInsert(String key, Collection<T> set,
public SetPipedInsert(String key, Collection<T> set, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, set.size());
super(key, opIdx, attr, tc, set.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.set, attr.getOverflowAction());
}
Expand Down Expand Up @@ -172,9 +179,9 @@ public static class BTreePipedInsert<T> extends CollectionPipedInsert<T> {
private static final String COMMAND = "bop insert";
private final Map<Long, T> map;

public BTreePipedInsert(String key, Map<Long, T> map,
public BTreePipedInsert(String key, Map<Long, T> map, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, map.size());
super(key, opIdx, attr, tc, map.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.btree, attr.getOverflowAction());
}
Expand Down Expand Up @@ -234,9 +241,9 @@ public static class ByteArraysBTreePipedInsert<T> extends
private static final String COMMAND = "bop insert";
private final List<Element<T>> elements;

public ByteArraysBTreePipedInsert(String key, List<Element<T>> elements,
public ByteArraysBTreePipedInsert(String key, List<Element<T>> elements, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, elements.size());
super(key, opIdx, attr, tc, elements.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.btree, attr.getOverflowAction());
}
Expand Down Expand Up @@ -296,9 +303,9 @@ public static class MapPipedInsert<T> extends CollectionPipedInsert<T> {
private static final String COMMAND = "mop insert";
private final Map<String, T> map;

public MapPipedInsert(String key, Map<String, T> map,
public MapPipedInsert(String key, Map<String, T> map, int opIdx,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, attr, tc, map.size());
super(key, opIdx, attr, tc, map.size());
if (attr != null) { /* item creation option */
CollectionCreate.checkOverflowAction(CollectionType.map, attr.getOverflowAction());
}
Expand Down
Loading
Loading