From 56ac7b714fe29edebb970843bf5e9820f91fe999 Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Fri, 17 Jun 2022 20:15:57 +0900 Subject: [PATCH] INTERNAL: Add classes that implements BTreeSortMergeGetOperation.Callback. --- .../java/net/spy/memcached/ArcusClient.java | 350 ++---------------- .../callback/BTreeSMGetOperationCallback.java | 197 ++++++++++ .../BTreeSMGetOperationOldCallback.java | 147 ++++++++ .../BaseBTreeSMGetOperationCallback.java | 110 ++++++ .../ascii/callback/BaseOperationCallback.java | 21 ++ 5 files changed, 511 insertions(+), 314 deletions(-) create mode 100644 src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationCallback.java create mode 100644 src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationOldCallback.java create mode 100644 src/main/java/net/spy/memcached/protocol/ascii/callback/BaseBTreeSMGetOperationCallback.java create mode 100644 src/main/java/net/spy/memcached/protocol/ascii/callback/BaseOperationCallback.java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index a30ee8c0b..fae4a58c1 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -130,8 +130,6 @@ import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; import net.spy.memcached.ops.BTreeGetBulkOperation; import net.spy.memcached.ops.BTreeGetByPositionOperation; -import net.spy.memcached.ops.BTreeSortMergeGetOperation; -import net.spy.memcached.ops.BTreeSortMergeGetOperationOld; import net.spy.memcached.ops.BTreeInsertAndGetOperation; import net.spy.memcached.ops.CollectionBulkInsertOperation; import net.spy.memcached.ops.CollectionGetOperation; @@ -147,6 +145,8 @@ import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.StoreType; import net.spy.memcached.plugin.FrontCacheMemcachedClient; +import net.spy.memcached.protocol.ascii.callback.BTreeSMGetOperationCallback; +import net.spy.memcached.protocol.ascii.callback.BTreeSMGetOperationOldCallback; import net.spy.memcached.transcoders.CollectionTranscoder; import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.util.BTreeUtil; @@ -2224,157 +2224,42 @@ private SMGetFuture>> smget( final List> smGetList, final int offset, final int count, final boolean reverse, final Transcoder tc) { - final String END = "END"; - final String TRIMMED = "TRIMMED"; - final String DUPLICATED = "DUPLICATED"; - final String DUPLICATED_TRIMMED = "DUPLICATED_TRIMMED"; - final CountDownLatch blatch = new CountDownLatch(smGetList.size()); + final ReentrantLock lock = new ReentrantLock(); final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); + final List missedKeyList = Collections.synchronizedList(new ArrayList()); final Map missedKeys = Collections.synchronizedMap(new HashMap()); - final List mergedTrimmedKeys = - Collections.synchronizedList(new ArrayList()); - final int totalResultElementCount = count + offset; final List> mergedResult = - Collections.synchronizedList(new ArrayList>(totalResultElementCount)); - - final ReentrantLock lock = new ReentrantLock(); + Collections.synchronizedList(new ArrayList>(count + offset)); + final List mergedTrimmedKeys = + Collections.synchronizedList(new ArrayList()); final List resultOperationStatus = Collections.synchronizedList(new ArrayList(1)); - final List failedOperationStatus = Collections.synchronizedList(new ArrayList(1)); // if processedSMGetCount is 0, then all smget is done. final AtomicInteger processedSMGetCount = new AtomicInteger(smGetList.size()); - final AtomicBoolean mergedTrim = new AtomicBoolean(false); final AtomicBoolean stopCollect = new AtomicBoolean(false); + final AtomicBoolean mergedTrim = new AtomicBoolean(false); - for (BTreeSMGet smGet : smGetList) { - Operation op = opFact.bopsmget(smGet, new BTreeSortMergeGetOperationOld.Callback() { - private final List> eachResult = new ArrayList>(); - - @Override - public void receivedStatus(OperationStatus status) { - processedSMGetCount.decrementAndGet(); - - if (!status.isSuccess()) { - getLogger().warn("SMGetFailed. status=%s", status); - if (!stopCollect.get()) { - stopCollect.set(true); - failedOperationStatus.add(status); - } - mergedResult.clear(); - return; - } - - boolean isTrimmed = (TRIMMED.equals(status.getMessage()) || - DUPLICATED_TRIMMED.equals(status.getMessage())) - ? true : false; - lock.lock(); - try { - if (mergedResult.size() == 0) { - // merged result is empty, add all. - mergedResult.addAll(eachResult); - mergedTrim.set(isTrimmed); - } else { - boolean addAll = true; - int pos = 0; - for (SMGetElement result : eachResult) { - for (; pos < mergedResult.size(); pos++) { - if ((reverse) ? (0 < result.compareTo(mergedResult.get(pos))) - : (0 > result.compareTo(mergedResult.get(pos)))) { - break; - } - } - if (pos >= totalResultElementCount) { - addAll = false; - break; - } - if (pos >= mergedResult.size() && mergedTrim.get() && - result.compareBkeyTo(mergedResult.get(pos - 1)) != 0) { - addAll = false; - break; - } - mergedResult.add(pos, result); - if (mergedResult.size() > totalResultElementCount) { - mergedResult.remove(totalResultElementCount); - } - pos += 1; - } - if (isTrimmed && addAll) { - while (pos < mergedResult.size()) { - if (mergedResult.get(pos).compareBkeyTo(mergedResult.get(pos - 1)) == 0) { - pos += 1; - } else { - mergedResult.remove(pos); - } - } - mergedTrim.set(true); - } - if (mergedResult.size() >= totalResultElementCount) { - mergedTrim.set(false); - } - } - - if (processedSMGetCount.get() == 0) { - boolean isDuplicated = false; - for (int i = 1; i < mergedResult.size(); i++) { - if (mergedResult.get(i).compareBkeyTo(mergedResult.get(i - 1)) == 0) { - isDuplicated = true; - break; - } - } - if (mergedTrim.get()) { - if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED_TRIMMED")); - } else { - resultOperationStatus.add(new OperationStatus(true, "TRIMMED")); - } - } else { - if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); - } else { - resultOperationStatus.add(new OperationStatus(true, "END")); - } - } - } - } finally { - lock.unlock(); - } - } - - @Override - public void complete() { - blatch.countDown(); - } - - @Override - public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] data) { - if (stopCollect.get()) { - return; - } - - if (subkey instanceof Long) { - eachResult.add(new SMGetElement(key, (Long) subkey, eflag, - tc.decode(new CachedData(flags, data, tc.getMaxSize())))); - } else if (subkey instanceof byte[]) { - eachResult.add(new SMGetElement(key, (byte[]) subkey, eflag, - tc.decode(new CachedData(flags, data, tc.getMaxSize())))); - } - } + final BTreeSMGetOperationOldCallback.GlobalParams globalParams = + new BTreeSMGetOperationOldCallback.GlobalParams( + blatch, lock, + missedKeyList, missedKeys, + mergedResult, mergedTrimmedKeys, + resultOperationStatus, failedOperationStatus, + processedSMGetCount, + Arrays.asList(stopCollect, mergedTrim) + ); - @Override - public void gotMissedKey(byte[] data) { - missedKeyList.add(new String(data)); - OperationStatus cause = new OperationStatus(false, "UNDEFINED"); - missedKeys.put(new String(data), new CollectionOperationStatus(cause)); - } - }); + for (BTreeSMGet smGet : smGetList) { + Operation op = opFact.bopsmget(smGet, + new BTreeSMGetOperationOldCallback(offset, count, reverse, tc, globalParams)); ops.add(op); addOp(smGet.getMemcachedNode(), op); } @@ -2448,206 +2333,43 @@ private SMGetFuture>> smget( final List> smGetList, final int count, final boolean reverse, final Transcoder tc, final SMGetMode smgetMode) { - final String END = "END"; - final String TRIMMED = "TRIMMED"; - final String DUPLICATED = "DUPLICATED"; - final String DUPLICATED_TRIMMED = "DUPLICATED_TRIMMED"; - final CountDownLatch blatch = new CountDownLatch(smGetList.size()); + final ReentrantLock lock = new ReentrantLock(); final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); + final List missedKeyList = Collections.synchronizedList(new ArrayList()); final Map missedKeys = Collections.synchronizedMap(new HashMap()); - final int totalResultElementCount = count; final List> mergedResult = - Collections.synchronizedList(new ArrayList>(totalResultElementCount)); + Collections.synchronizedList(new ArrayList>(count)); final List mergedTrimmedKeys = Collections.synchronizedList(new ArrayList()); - final ReentrantLock lock = new ReentrantLock(); final List resultOperationStatus = Collections.synchronizedList(new ArrayList(1)); - final List failedOperationStatus = Collections.synchronizedList(new ArrayList(1)); - final AtomicBoolean stopCollect = new AtomicBoolean(false); // if processedSMGetCount is 0, then all smget is done. final AtomicInteger processedSMGetCount = new AtomicInteger(smGetList.size()); + final AtomicBoolean stopCollect = new AtomicBoolean(false); - for (BTreeSMGet smGet : smGetList) { - Operation op = opFact.bopsmget(smGet, new BTreeSortMergeGetOperation.Callback() { - private final List> eachResult = new ArrayList>(); - private final List eachTrimmedResult = new ArrayList(); - - @Override - public void receivedStatus(OperationStatus status) { - processedSMGetCount.decrementAndGet(); - - if (!status.isSuccess()) { - getLogger().warn("SMGetFailed. status=%s", status); - if (!stopCollect.get()) { - stopCollect.set(true); - failedOperationStatus.add(status); - } - mergedResult.clear(); - mergedTrimmedKeys.clear(); - return; - } - - lock.lock(); - try { - if (mergedResult.size() == 0) { - // merged result is empty, add all. - mergedResult.addAll(eachResult); - } else { - // do sort merge - boolean duplicated; - int comp, pos = 0; - for (SMGetElement result : eachResult) { - duplicated = false; - for (; pos < mergedResult.size(); pos++) { - // compare b+tree key - comp = result.compareBkeyTo(mergedResult.get(pos)); - if ((reverse) ? (0 < comp) : (0 > comp)) { - break; - } - if (comp == 0) { // compare key string - comp = result.compareKeyTo(mergedResult.get(pos)); - if ((reverse) ? (0 < comp) : (0 > comp)) { - if (smgetMode == SMGetMode.UNIQUE) { - mergedResult.remove(pos); // remove dup bkey - } - break; - } else { - if (smgetMode == SMGetMode.UNIQUE) { - duplicated = true; - break; - } - } - } - } - if (duplicated) { // UNIQUE - continue; - } - if (pos >= totalResultElementCount) { - // At this point, following conditions are met. - // - mergedResult.size() == totalResultElementCount && - // - The current of eachResult is - // behind of the last of mergedResult. - // Then, all the next elements of eachResult are - // definitely behind of the last of mergedResult. - // So, stop the current sort-merge. - break; - } - - mergedResult.add(pos, result); - if (mergedResult.size() > totalResultElementCount) { - mergedResult.remove(totalResultElementCount); - } - pos += 1; - } - } - - if (eachTrimmedResult.size() > 0) { - if (mergedTrimmedKeys.size() == 0) { - mergedTrimmedKeys.addAll(eachTrimmedResult); - } else { - // do sort merge trimmed list - int pos = 0; - for (SMGetTrimKey result : eachTrimmedResult) { - for (; pos < mergedTrimmedKeys.size(); pos++) { - if ((reverse) ? (0 < result.compareTo(mergedTrimmedKeys.get(pos))) - : (0 > result.compareTo(mergedTrimmedKeys.get(pos)))) { - break; - } - } - mergedTrimmedKeys.add(pos, result); - pos += 1; - } - } - } - - if (processedSMGetCount.get() == 0) { - if (mergedTrimmedKeys.size() > 0 && count <= mergedResult.size()) { - // remove trimed keys whose bkeys are behind of the last element. - SMGetElement lastElement = mergedResult.get(mergedResult.size() - 1); - SMGetTrimKey lastTrimKey = new SMGetTrimKey(lastElement.getKey(), - lastElement.getBkeyObject()); - for (int i = mergedTrimmedKeys.size() - 1; i >= 0; i--) { - SMGetTrimKey me = mergedTrimmedKeys.get(i); - if ((reverse) ? (0 >= me.compareTo(lastTrimKey)) - : (0 <= me.compareTo(lastTrimKey))) { - mergedTrimmedKeys.remove(i); - } else { - break; - } - } - } - if (smgetMode == SMGetMode.UNIQUE) { - resultOperationStatus.add(new OperationStatus(true, "END")); - } else { - boolean isDuplicated = false; - for (int i = 1; i < mergedResult.size(); i++) { - if (mergedResult.get(i).compareBkeyTo(mergedResult.get(i - 1)) == 0) { - isDuplicated = true; - break; - } - } - if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); - } else { - resultOperationStatus.add(new OperationStatus(true, "END")); - } - } - } - } finally { - lock.unlock(); - } - } - - @Override - public void complete() { - blatch.countDown(); - } - - @Override - public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] data) { - if (stopCollect.get()) { - return; - } - - if (subkey instanceof Long) { - eachResult.add(new SMGetElement(key, (Long) subkey, eflag, - tc.decode(new CachedData(flags, data, tc.getMaxSize())))); - } else if (subkey instanceof byte[]) { - eachResult.add(new SMGetElement(key, (byte[]) subkey, eflag, - tc.decode(new CachedData(flags, data, tc.getMaxSize())))); - } - } - - @Override - public void gotMissedKey(String key, OperationStatus cause) { - missedKeyList.add(key); - missedKeys.put(key, new CollectionOperationStatus(cause)); - } - - @Override - public void gotTrimmedKey(String key, Object subkey) { - if (stopCollect.get()) { - return; - } + final BTreeSMGetOperationCallback.GlobalParams globalParams = + new BTreeSMGetOperationCallback.GlobalParams( + blatch, lock, + missedKeyList, missedKeys, + mergedResult, mergedTrimmedKeys, + resultOperationStatus, failedOperationStatus, + processedSMGetCount, + Collections.singletonList(stopCollect) + ); - if (subkey instanceof Long) { - eachTrimmedResult.add(new SMGetTrimKey(key, (Long) subkey)); - } else if (subkey instanceof byte[]) { - eachTrimmedResult.add(new SMGetTrimKey(key, (byte[]) subkey)); - } - } - }); + for (BTreeSMGet smGet : smGetList) { + Operation op = opFact.bopsmget(smGet, + new BTreeSMGetOperationCallback(count, reverse, tc, smgetMode, globalParams)); ops.add(op); addOp(smGet.getMemcachedNode(), op); } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationCallback.java b/src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationCallback.java new file mode 100644 index 000000000..99525891e --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationCallback.java @@ -0,0 +1,197 @@ +package net.spy.memcached.protocol.ascii.callback; + +import net.spy.memcached.collection.SMGetElement; +import net.spy.memcached.collection.SMGetMode; +import net.spy.memcached.collection.SMGetTrimKey; +import net.spy.memcached.ops.BTreeSortMergeGetOperation; +import net.spy.memcached.ops.CollectionOperationStatus; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.transcoders.Transcoder; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class BTreeSMGetOperationCallback + extends BaseBTreeSMGetOperationCallback> + implements BTreeSortMergeGetOperation.Callback { + + private final SMGetMode smGetMode; + + public BTreeSMGetOperationCallback(final int count, + final boolean reverse, + final Transcoder tc, + final SMGetMode smGetMode, + final BTreeSMGetOperationCallback.GlobalParams gp) { + super(count, count, reverse, tc, gp); + this.smGetMode = smGetMode; + } + + @Override + public void receivedStatus(OperationStatus status) { + gp.processedSMGetCount.decrementAndGet(); + + if (!status.isSuccess()) { + getLogger().warn("SMGetFailed. status=%s", status); + if (!gp.stopCollect.get()) { + gp.stopCollect.set(true); + gp.failedOperationStatus.add(status); + } + gp.mergedResult.clear(); + gp.mergedTrimmedKeys.clear(); + return; + } + + gp.lock.lock(); + try { + if (gp.mergedResult.size() == 0) { + // merged result is empty, add all. + gp.mergedResult.addAll(eachResult); + } else { + // do sort merge + boolean duplicated; + int comp, pos = 0; + for (SMGetElement result : eachResult) { + duplicated = false; + for (; pos < gp.mergedResult.size(); pos++) { + // compare b+tree key + comp = result.compareBkeyTo(gp.mergedResult.get(pos)); + if ((reverse) ? (0 < comp) : (0 > comp)) { + break; + } + if (comp == 0) { // compare key string + comp = result.compareKeyTo(gp.mergedResult.get(pos)); + if ((reverse) ? (0 < comp) : (0 > comp)) { + if (smGetMode == SMGetMode.UNIQUE) { + gp.mergedResult.remove(pos); // remove dup bkey + } + break; + } else { + if (smGetMode == SMGetMode.UNIQUE) { + duplicated = true; + break; + } + } + } + } + if (duplicated) { // UNIQUE + continue; + } + if (pos >= totalResultElementCount) { + // At this point, following conditions are met. + // - mergedResult.size() == totalResultElementCount && + // - The current of eachResult is + // behind of the last of mergedResult. + // Then, all the next elements of eachResult are + // definitely behind of the last of mergedResult. + // So, stop the current sort-merge. + break; + } + + gp.mergedResult.add(pos, result); + if (gp.mergedResult.size() > totalResultElementCount) { + gp.mergedResult.remove(totalResultElementCount); + } + pos += 1; + } + } + + if (eachTrimmedResult.size() > 0) { + if (gp.mergedTrimmedKeys.size() == 0) { + gp.mergedTrimmedKeys.addAll(eachTrimmedResult); + } else { + // do sort merge trimmed list + int pos = 0; + for (SMGetTrimKey result : eachTrimmedResult) { + for (; pos < gp.mergedTrimmedKeys.size(); pos++) { + if ((reverse) ? + (0 < result.compareTo(gp.mergedTrimmedKeys.get(pos))) : + (0 > result.compareTo(gp.mergedTrimmedKeys.get(pos)))) { + break; + } + } + gp.mergedTrimmedKeys.add(pos, result); + pos += 1; + } + } + } + + if (gp.processedSMGetCount.get() == 0) { + if (gp.mergedTrimmedKeys.size() > 0 && count <= gp.mergedResult.size()) { + // remove trimed keys whose bkeys are behind of the last element. + SMGetElement lastElement = gp.mergedResult.get(gp.mergedResult.size() - 1); + SMGetTrimKey lastTrimKey = new SMGetTrimKey(lastElement.getKey(), + lastElement.getBkeyObject()); + for (int i = gp.mergedTrimmedKeys.size() - 1; i >= 0; i--) { + SMGetTrimKey me = gp.mergedTrimmedKeys.get(i); + if ((reverse) ? + (0 >= me.compareTo(lastTrimKey)) : + (0 <= me.compareTo(lastTrimKey))) { + gp.mergedTrimmedKeys.remove(i); + } else { + break; + } + } + } + if (smGetMode == SMGetMode.UNIQUE) { + gp.resultOperationStatus.add(new OperationStatus(true, "END")); + } else { + boolean isDuplicated = false; + for (int i = 1; i < gp.mergedResult.size(); i++) { + if (gp.mergedResult.get(i).compareBkeyTo(gp.mergedResult.get(i - 1)) == 0) { + isDuplicated = true; + break; + } + } + if (isDuplicated) { + gp.resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); + } else { + gp.resultOperationStatus.add(new OperationStatus(true, "END")); + } + } + } + } finally { + gp.lock.unlock(); + } + } + + @Override + public void gotMissedKey(String key, OperationStatus cause) { + gp.missedKeyList.add(key); + gp.missedKeys.put(key, new CollectionOperationStatus(cause)); + } + + @Override + public void gotTrimmedKey(String key, Object bkey) { + if (gp.stopCollect.get()) { + return; + } + + if (bkey instanceof Long) { + eachTrimmedResult.add(new SMGetTrimKey(key, (Long) bkey)); + } else if (bkey instanceof byte[]) { + eachTrimmedResult.add(new SMGetTrimKey(key, (byte[]) bkey)); + } + } + + public static class GlobalParams + extends BaseBTreeSMGetOperationCallback.GlobalParams { + + public GlobalParams(final CountDownLatch blatch, + final ReentrantLock lock, + final List missedKeyList, + final Map missedKeys, + final List> mergedResult, + final List mergedTrimmedKeys, + final List resultOperationStatus, + final List failedOperationStatus, + final AtomicInteger processedSMGetCount, + final List atomicBooleans) { + super(blatch, lock, missedKeyList, missedKeys, mergedResult, mergedTrimmedKeys, + resultOperationStatus, failedOperationStatus, processedSMGetCount, atomicBooleans); + } + } +} diff --git a/src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationOldCallback.java b/src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationOldCallback.java new file mode 100644 index 000000000..00aae9c06 --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/callback/BTreeSMGetOperationOldCallback.java @@ -0,0 +1,147 @@ +package net.spy.memcached.protocol.ascii.callback; + +import net.spy.memcached.collection.SMGetElement; +import net.spy.memcached.collection.SMGetTrimKey; +import net.spy.memcached.ops.BTreeSortMergeGetOperationOld; +import net.spy.memcached.ops.CollectionOperationStatus; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.transcoders.Transcoder; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class BTreeSMGetOperationOldCallback + extends BaseBTreeSMGetOperationCallback> + implements BTreeSortMergeGetOperationOld.Callback { + + public BTreeSMGetOperationOldCallback(final int offset, + final int count, + final boolean reverse, + final Transcoder tc, + final BTreeSMGetOperationOldCallback.GlobalParams gp) { + super(count, offset + count, reverse, tc, gp); + } + + @Override + public void receivedStatus(OperationStatus status) { + gp.processedSMGetCount.decrementAndGet(); + + if (!status.isSuccess()) { + getLogger().warn("SMGetFailed. status=%s", status); + if (!gp.stopCollect.get()) { + gp.stopCollect.set(true); + gp.failedOperationStatus.add(status); + } + gp.mergedResult.clear(); + return; + } + + boolean isTrimmed = "TRIMMED".equals(status.getMessage()) || + "DUPLICATED_TRIMMED".equals(status.getMessage()); + gp.lock.lock(); + try { + if (gp.mergedResult.size() == 0) { + // merged result is empty, add all. + gp.mergedResult.addAll(eachResult); + gp.mergedTrim.set(isTrimmed); + } else { + boolean addAll = true; + int pos = 0; + for (SMGetElement result : eachResult) { + for (; pos < gp.mergedResult.size(); pos++) { + if (reverse && 0 < result.compareTo(gp.mergedResult.get(pos))) { + break; + } else if (0 > result.compareTo(gp.mergedResult.get(pos))) { + break; + } + } + if (pos >= totalResultElementCount) { + addAll = false; + break; + } + if (pos >= gp.mergedResult.size() && gp.mergedTrim.get() && + result.compareBkeyTo(gp.mergedResult.get(pos - 1)) != 0) { + addAll = false; + break; + } + gp.mergedResult.add(pos, result); + if (gp.mergedResult.size() > totalResultElementCount) { + gp.mergedResult.remove(totalResultElementCount); + } + pos += 1; + } + if (isTrimmed && addAll) { + while (pos < gp.mergedResult.size()) { + if (gp.mergedResult.get(pos).compareBkeyTo(gp.mergedResult.get(pos - 1)) == 0) { + pos += 1; + } else { + gp.mergedResult.remove(pos); + } + } + gp.mergedTrim.set(true); + } + if (gp.mergedResult.size() >= totalResultElementCount) { + gp.mergedTrim.set(false); + } + } + + if (gp.processedSMGetCount.get() == 0) { + boolean isDuplicated = false; + for (int i = 1; i < gp.mergedResult.size(); i++) { + if (gp.mergedResult.get(i).compareBkeyTo(gp.mergedResult.get(i - 1)) == 0) { + isDuplicated = true; + break; + } + } + if (gp.mergedTrim.get()) { + if (isDuplicated) { + gp.resultOperationStatus.add(new OperationStatus(true, "DUPLICATED_TRIMMED")); + } else { + gp.resultOperationStatus.add(new OperationStatus(true, "TRIMMED")); + } + } else { + if (isDuplicated) { + gp.resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); + } else { + gp.resultOperationStatus.add(new OperationStatus(true, "END")); + } + } + } + } finally { + gp.lock.unlock(); + } + } + + @Override + public void gotMissedKey(byte[] data) { + gp.missedKeyList.add(new String(data)); + OperationStatus cause = new OperationStatus(false, "UNDEFINED"); + gp.missedKeys.put(new String(data), new CollectionOperationStatus(cause)); + } + + public static class GlobalParams + extends BaseBTreeSMGetOperationCallback.GlobalParams { + + private final AtomicBoolean mergedTrim; + + public GlobalParams(final CountDownLatch blatch, + final ReentrantLock lock, + final List missedKeyList, + final Map missedKeys, + final List> mergedResult, + final List mergedTrimmedKeys, + final List resultOperationStatus, + final List failedOperationStatus, + final AtomicInteger processedSMGetCount, + final List atomicBooleans) { + super(blatch, lock, missedKeyList, missedKeys, mergedResult, mergedTrimmedKeys, + resultOperationStatus, failedOperationStatus, processedSMGetCount, atomicBooleans); + + this.mergedTrim = atomicBooleans.get(1); + } + } +} diff --git a/src/main/java/net/spy/memcached/protocol/ascii/callback/BaseBTreeSMGetOperationCallback.java b/src/main/java/net/spy/memcached/protocol/ascii/callback/BaseBTreeSMGetOperationCallback.java new file mode 100644 index 000000000..417ad20a0 --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/callback/BaseBTreeSMGetOperationCallback.java @@ -0,0 +1,110 @@ +package net.spy.memcached.protocol.ascii.callback; + +import net.spy.memcached.CachedData; +import net.spy.memcached.collection.SMGetElement; +import net.spy.memcached.collection.SMGetTrimKey; +import net.spy.memcached.ops.CollectionOperationStatus; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.transcoders.Transcoder; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public abstract class BaseBTreeSMGetOperationCallback> + extends BaseOperationCallback { + + protected final int count; + protected final int totalResultElementCount; + + protected final boolean reverse; + protected final Transcoder tc; + + protected final List> eachResult = new ArrayList>(); + protected final List eachTrimmedResult = new ArrayList(); + + protected final G gp; + + public BaseBTreeSMGetOperationCallback(final int count, + final int totalResultElementCount, + final boolean reverse, + final Transcoder tc, + final G globalParams) { + this.count = count; + this.totalResultElementCount = totalResultElementCount; + + this.reverse = reverse; + this.tc = tc; + + this.gp = globalParams; + } + + public void complete() { + gp.blatch.countDown(); + } + + public void gotData(String key, int flags, Object bkey, byte[] eflag, byte[] data) { + if (gp.stopCollect.get()) { + return; + } + + if (bkey instanceof Long) { + eachResult.add(new SMGetElement(key, (Long) bkey, eflag, + tc.decode(new CachedData(flags, data, tc.getMaxSize())))); + } else if (bkey instanceof byte[]) { + eachResult.add(new SMGetElement(key, (byte[]) bkey, eflag, + tc.decode(new CachedData(flags, data, tc.getMaxSize())))); + } + } + + public abstract void receivedStatus(OperationStatus status); + + public abstract static class GlobalParams { + protected final CountDownLatch blatch; + protected final ReentrantLock lock; + + protected final List missedKeyList; + protected final Map missedKeys; + + protected final List> mergedResult; + protected final List mergedTrimmedKeys; + + protected final List resultOperationStatus; + protected final List failedOperationStatus; + + // if processedSMGetCount is 0, then all smget is done. + protected final AtomicInteger processedSMGetCount; + protected final AtomicBoolean stopCollect; + + public GlobalParams(final CountDownLatch blatch, + final ReentrantLock lock, + final List missedKeyList, + final Map missedKeys, + final List> mergedResult, + final List mergedTrimmedKeys, + final List resultOperationStatus, + final List failedOperationStatus, + final AtomicInteger processedSMGetCount, + final List atomicBooleans) { + this.blatch = blatch; + this.lock = lock; + + this.missedKeyList = missedKeyList; + this.missedKeys = missedKeys; + + this.mergedResult = mergedResult; + this.mergedTrimmedKeys = mergedTrimmedKeys; + + this.resultOperationStatus = resultOperationStatus; + this.failedOperationStatus = failedOperationStatus; + + this.stopCollect = atomicBooleans.get(0); + this.processedSMGetCount = processedSMGetCount; + } + } +} diff --git a/src/main/java/net/spy/memcached/protocol/ascii/callback/BaseOperationCallback.java b/src/main/java/net/spy/memcached/protocol/ascii/callback/BaseOperationCallback.java new file mode 100644 index 000000000..cea8109be --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/callback/BaseOperationCallback.java @@ -0,0 +1,21 @@ +package net.spy.memcached.protocol.ascii.callback; + +import net.spy.memcached.compat.log.Logger; +import net.spy.memcached.compat.log.LoggerFactory; + +public abstract class BaseOperationCallback { + + private transient Logger logger = null; + + /** + * Get a Logger instance for this class. + * + * @return an appropriate logger instance. + */ + protected Logger getLogger() { + if (logger == null) { + logger = LoggerFactory.getLogger(getClass()); + } + return (logger); + } +}