Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENHANCE: Change asyncSetPipedExist method logic. #628

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 73 additions & 72 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3119,99 +3119,100 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
}

@Override
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
List<Object> values) {
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
return asyncSetPipedExist(key, exist);
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
}

@Override
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
List<T> values,
Transcoder<T> tc) {
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
return asyncSetPipedExist(key, exist);
if (values.size() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}

List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
existList.add(new SetPipedExist<T>(key, values, tc));
} else {
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
for (List<T> partition : partitionedList) {
existList.add(new SetPipedExist<T>(key, partition, tc));
}
}
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
return asyncSetPipedExist(key, existList);
}

/**
* Generic pipelined existence operation for set items. Public methods call this method.
*
* @param key collection item's key
* @param exist operation parameters (element values)
* @param existList list of operation parameters (element values)
* @return future holding the map of elements and their existence results
*/
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
final String key, final SetPipedExist<T> exist) {

if (exist.getItemCount() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
throw new IllegalArgumentException(
"The number of piped operations must not exceed a maximum of "
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
}

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
latch, operationTimeout);

Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {

private final Map<T, Boolean> result = new HashMap<T, Boolean>();
private boolean hasAnError = false;

public void receivedStatus(OperationStatus status) {
if (hasAnError) {
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
final String key, final List<SetPipedExist<T>> existList) {
final CountDownLatch latch = new CountDownLatch(existList.size());
final PipedCollectionFuture<T, Boolean> rv
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);

for (final SetPipedExist<T> exist : existList) {
Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() {
private CollectionOperationStatus failedStatus = null;
private int failStatusCount = 0;
public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
switch (cstatus.getResponse()) {
case EXIST:
rv.addEachResult(exist.getValues().get(index), true);
break;
case NOT_EXIST:
rv.addEachResult(exist.getValues().get(index), false);
break;
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
if (failedStatus == null) {
failedStatus = cstatus;
failStatusCount++;
} else if (failedStatus.equals(cstatus)) {
failStatusCount++;
}
break;
default:
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.set(result, cstatus);
}
}

public void complete() {
latch.countDown();
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}

public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
cstatus = new CollectionOperationStatus(status);
}

switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
result.put(exist.getValues().get(index),
(CollectionResponse.EXIST.equals(cstatus
.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
hasAnError = true;
rv.set(new HashMap<T, Boolean>(0),
(CollectionOperationStatus) status);
break;
default:
getLogger().warn("Unhandled state: " + status);
}
if (failedStatus != null && exist.getItemCount() == failStatusCount) {
rv.setOperationStatus(failedStatus);
} else {
rv.setOperationStatus(cstatus);
}
});
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래 경우에 setOperationStatus 부분이 문제가 될 것 같습니다.

  • 2개 연산으로 pipeling 처리
  • 처음 연산에서는 모두 EXIST or NOT_EXIST 응답이 옴.
  • 둘째 연산에서는 NOT_FOUND 응답이 옴.

이 경우, 최종 OperationStatus는 NOT_FOUND가 됩니다.
맞는 동작인 지 검토 바랍니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#628 (comment)
위 코멘트대로, 여러 Op 중 하나라도 FAILED_END 라면,
Future에서 반환하는 응답값은 FAILED_END로 처리하는게 맞지 않나요?

반대의 상황에서도 FAILED_END는 유지가 됩니다.

다만, PR 내의 failStatusCount는 삭제해도 될 것 같습니다.

Copy link
Collaborator

@jhpark816 jhpark816 Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

바로 위의 코멘트 (https://github.com/naver/arcus-java-client/pull/628/files#r1530238965) 에 보인 예시에서는 FAILED_END가 아닌 NOT_FOUND가 최종 OperationStatus로 되지 않나요?

Copy link
Collaborator Author

@brido4125 brido4125 Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

맞는 동작인 지 검토 바랍니다.

최종 OperationStatus실패 / 성공의 여부에 대해 맞는 동작인지 검토하는것으로 이해했습니다.
말씀하신 부분은 수정되어야 합니다.

@jhpark816
Docs를 확인해보니 단순히 FAILED_END가 아닌 구체적인 이유가 들어간 OperationStatus를 반환합니다.
https://github.com/naver/arcus-java-client/blob/develop/docs/05-set-API.md#set-element-%EC%9D%BC%EA%B4%84-%EC%A1%B4%EC%9E%AC%EC%97%AC%EB%B6%80-%ED%99%95%EC%9D%B8

기존 동작으로 수행해야 하위 호환성 문제가 없을 것 같네요.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

현재 구현은 count를 통해서 해당 pipe Op의 모든 연산들이 동일한 원인인 경우,
구체적인 NOT_FOUND, TYPE_MISMATCH, UNREADABLE 을 리턴합니다.

만약 연산 중 일부는 성공 / 일부는 실패일 경우는 FAILED_END를 리턴하네요.

하지만 FAILED_END라는 의미가 디버깅에도 별 도움이 되지 않기 때문에,
하나라도 NOT_FOUND, TYPE_MISMATCH, UNREADABLE 중 설정 된다면
해당하는 Status를 리턴시켜 주는게 도움이 될 것 같습니다.

Copy link
Collaborator

@jhpark816 jhpark816 Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brido4125
코멘트에 있는 대로 처리하면 안 될 것 같습니다.

N개 pipe 연산에서 i번째 연산이 실패하면 그 뒤에 있는 모든 연산도 함께 실패 처리하는 것이 나을 것 같습니다.
이렇게 구현하는 것이 가능할까요?

@uhm0311 @oliviarla
위의 사항 검토 바랍니다.
위의 사항 때문에 기존에 piped 연산은 1개 연산으로만 제공했던 것은 아닌지 ?

Copy link
Collaborator

@uhm0311 uhm0311 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N개 pipe 연산에서 i번째 연산이 실패하면 그 뒤에 있는 모든 연산도 함께 실패 처리하는 것이 나을 것 같습니다.
이렇게 구현하는 것이 가능할까요?

그러면 N개 pipe 연산을 보낼 때 1개를 보내고 응답을 받아서 실패하지 않았으면 다시 다음걸 보내는 방식으로 해야 합니다.
JDK 8에서는 CompletableFuture가 있으므로 1개 pipe 연산에 대한 CompletableFuture의 성공 Callback에 다음 pipe 연산을 보내는 식으로 구성하여 실패 시 다음 연산을 보내지 않도록 할 수 있습니다.
JDK 6에서는 ListenableFuture를 구현하지 않는 이상 비동기 방식은 불가능할 것으로 보입니다.
단, N개의 연산을 보낼 때 1개를 보내고 응답을 받아서 다음걸 보내는 방식은 N개 연산을 다 처리하는 데에는 지연 시간이 더 걸리게 되어 있으므로 Timeout 에러가 발생할 확률이 높아집니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

그러면 N개 pipe 연산을 보낼 때 1개를 보내고 응답을 받아서 실패하지 않았으면 다시 다음걸 보내는 방식으로 해야 합니다.

N개 pipe 연산을 보낼 때, 중간에 에러 발생 등으로 pipeline이 중단되는 경우가 아니면 마지막 N번째 연산을 보내기 전까지는 응답이 돌아오지 않을텐데요.
1개를 보내고 응답을 받은 뒤 다음 pipe 연산을 보낸다는 것의 의미가 제가 이해한 것과 다른 동작일까요?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

500개의 연산을 1개의 pipe 연산으로 지칭한 것입니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhpark816

위의 사항 때문에 기존에 piped 연산은 1개 연산으로만 제공했던 것은 아닌지 ?

exists 파이프만 1개 연산 지원이고, 다른 pipe는 초기 커밋부터 n개를 허용하였기에 그렇다고 보긴 힘들것 같네요


rv.setOperation(op);
addOp(key, op);
public void complete() {
latch.countDown();
}
});
rv.addOperation(op);
addOp(key, op);
}
return rv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,37 @@ public void testMaxPipedExist() {
}
}

public void testMaxOverPipedExist() {
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
int OVER_COUNT = 1000;

try {
List<Object> findValues = new ArrayList<Object>();

// insert items
for (int i = 0; i < OVER_COUNT; i++) {
findValues.add("VALUE" + i);

Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get());
}

// exist bulk
CollectionFuture<Map<Object, Boolean>> future = mc
.asyncSopPipedExistBulk(KEY, findValues);

Map<Object, Boolean> map = future.get();

Assert.assertTrue(future.getOperationStatus().isSuccess());

for (int i = 0; i < OVER_COUNT; i++) {
Assert.assertTrue(map.get("VALUE" + i));
}

} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

public void testPipedExistNotExistsKey() {
try {
List<Object> findValues = new ArrayList<Object>();
Expand Down
Loading