From 166686f9d92a0111729f3287524fab8f95118569 Mon Sep 17 00:00:00 2001 From: brido4125 Date: Thu, 22 Jun 2023 14:58:43 +0900 Subject: [PATCH] ENHANCE: Change asyncSetPipedExist method logic. --- .../java/net/spy/memcached/ArcusClient.java | 145 +++++++++--------- .../collection/set/SopPipedExistTest.java | 31 ++++ 2 files changed, 104 insertions(+), 72 deletions(-) diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index a8d872215..ad445c676 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -3119,99 +3119,100 @@ public CollectionFuture asyncBopGetItemCount(String key, } @Override - public CollectionFuture> asyncSopPipedExistBulk(String key, - List values) { - SetPipedExist exist = new SetPipedExist(key, values, collectionTranscoder); - return asyncSetPipedExist(key, exist); + public CollectionFuture> asyncSopPipedExistBulk(String key, List values) { + return asyncSopPipedExistBulk(key, values, collectionTranscoder); } @Override public CollectionFuture> asyncSopPipedExistBulk(String key, List values, Transcoder tc) { - SetPipedExist exist = new SetPipedExist(key, values, tc); - return asyncSetPipedExist(key, exist); + if (values.size() == 0) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> existList = new ArrayList>(); + if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) { + existList.add(new SetPipedExist(key, values, tc)); + } else { + PartitionedList partitionedList = new PartitionedList(values, SetPipedExist.MAX_PIPED_ITEM_COUNT); + for (List partition : partitionedList) { + existList.add(new SetPipedExist(key, partition, tc)); + } + } + return asyncSetPipedExist(key, existList); } /** * Generic pipelined existence operation for set items. Public methods call this method. * * @param key collection item's key - * @param exist operation parameters (element values) + * @param existList list of operation parameters (element values) * @return future holding the map of elements and their existence results */ CollectionFuture> asyncSetPipedExist( - final String key, final SetPipedExist exist) { - - if (exist.getItemCount() == 0) { - throw new IllegalArgumentException( - "The number of piped operations must be larger than 0."); - } - if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - throw new IllegalArgumentException( - "The number of piped operations must not exceed a maximum of " - + CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + "."); - } - - final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>( - latch, operationTimeout); - - Operation op = opFact.collectionPipedExist(key, exist, - new CollectionPipedExistOperation.Callback() { - - private final Map result = new HashMap(); - private boolean hasAnError = false; - - public void receivedStatus(OperationStatus status) { - if (hasAnError) { - return; - } - - CollectionOperationStatus cstatus; - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { + final String key, final List> existList) { + final CountDownLatch latch = new CountDownLatch(existList.size()); + final PipedCollectionFuture rv + = new PipedCollectionFuture(latch, operationTimeout); + + for (final SetPipedExist exist : existList) { + Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() { + private CollectionOperationStatus failedStatus = null; + private int failStatusCount = 0; + public void gotStatus(Integer index, OperationStatus status) { + CollectionOperationStatus cstatus; + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + switch (cstatus.getResponse()) { + case EXIST: + rv.addEachResult(exist.getValues().get(index), true); + break; + case NOT_EXIST: + rv.addEachResult(exist.getValues().get(index), false); + break; + case UNREADABLE: + case TYPE_MISMATCH: + case NOT_FOUND: + if (failedStatus == null) { + failedStatus = cstatus; + failStatusCount++; + } else if (failedStatus.equals(cstatus)) { + failStatusCount++; + } + break; + default: getLogger().warn("Unhandled state: " + status); - cstatus = new CollectionOperationStatus(status); - } - rv.set(result, cstatus); } + } - public void complete() { - latch.countDown(); + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); } - - public void gotStatus(Integer index, OperationStatus status) { - CollectionOperationStatus cstatus; - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { - cstatus = new CollectionOperationStatus(status); - } - - switch (cstatus.getResponse()) { - case EXIST: - case NOT_EXIST: - result.put(exist.getValues().get(index), - (CollectionResponse.EXIST.equals(cstatus - .getResponse()))); - break; - case UNREADABLE: - case TYPE_MISMATCH: - case NOT_FOUND: - hasAnError = true; - rv.set(new HashMap(0), - (CollectionOperationStatus) status); - break; - default: - getLogger().warn("Unhandled state: " + status); - } + if (failedStatus != null && exist.getItemCount() == failStatusCount) { + rv.setOperationStatus(failedStatus); + } else { + rv.setOperationStatus(cstatus); } - }); + } - rv.setOperation(op); - addOp(key, op); + public void complete() { + latch.countDown(); + } + }); + rv.addOperation(op); + addOp(key, op); + } return rv; } diff --git a/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java b/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java index 201847137..41ee1e4d0 100644 --- a/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java +++ b/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java @@ -167,6 +167,37 @@ public void testMaxPipedExist() { } } + public void testMaxOverPipedExist() { + int OVER_COUNT = 1000; + + try { + List findValues = new ArrayList(); + + // insert items + for (int i = 0; i < OVER_COUNT; i++) { + findValues.add("VALUE" + i); + + Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get()); + } + + // exist bulk + CollectionFuture> future = mc + .asyncSopPipedExistBulk(KEY, findValues); + + Map map = future.get(); + + Assert.assertTrue(future.getOperationStatus().isSuccess()); + + for (int i = 0; i < OVER_COUNT; i++) { + Assert.assertTrue(map.get("VALUE" + i)); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + public void testPipedExistNotExistsKey() { try { List findValues = new ArrayList();