diff --git a/src/main/java/net/spy/memcached/ArcusClientPool.java b/src/main/java/net/spy/memcached/ArcusClientPool.java index 5b307a4bf..f2f7ff660 100644 --- a/src/main/java/net/spy/memcached/ArcusClientPool.java +++ b/src/main/java/net/spy/memcached/ArcusClientPool.java @@ -234,12 +234,12 @@ public GetFuture asyncGet(String key) { } @Override - public OperationFuture> asyncGets(String key, Transcoder tc) { + public GetFuture> asyncGets(String key, Transcoder tc) { return this.getClient().asyncGets(key, tc); } @Override - public OperationFuture> asyncGets(String key) { + public GetFuture> asyncGets(String key) { return this.getClient().asyncGets(key); } diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 638c3d4a4..536176bf0 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -51,6 +51,7 @@ import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.SingleElementInfiniteIterator; +import net.spy.memcached.internal.result.GetsResultImpl; import net.spy.memcached.internal.result.GetResult; import net.spy.memcached.internal.result.GetResultImpl; import net.spy.memcached.ops.CASOperationStatus; @@ -936,16 +937,15 @@ public GetFuture asyncGet(final String key) { * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ - public OperationFuture> asyncGets(final String key, + public GetFuture> asyncGets(final String key, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final OperationFuture> rv = - new OperationFuture>(latch, operationTimeout); + final GetFuture> rv = new GetFuture>(latch, operationTimeout); Operation op = opFact.gets(key, new GetsOperation.Callback() { - private CASValue val = null; + private GetResult> val = null; public void receivedStatus(OperationStatus status) { rv.set(val, status); @@ -954,8 +954,7 @@ public void receivedStatus(OperationStatus status) { public void gotData(String k, int flags, long cas, byte[] data) { assert key.equals(k) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; - val = new CASValue(cas, tc.decode( - new CachedData(flags, data, tc.getMaxSize()))); + val = new GetsResultImpl(cas, new CachedData(flags, data, tc.getMaxSize()), tc); } public void complete() { @@ -976,7 +975,7 @@ public void complete() { * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ - public OperationFuture> asyncGets(final String key) { + public GetFuture> asyncGets(final String key) { return asyncGets(key, transcoder); } @@ -993,7 +992,7 @@ public OperationFuture> asyncGets(final String key) { * is too full to accept any more requests */ public CASValue gets(String key, Transcoder tc) { - OperationFuture> future = asyncGets(key, tc); + GetFuture> future = asyncGets(key, tc); try { return future.get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index 874e18bd3..e6dd3e1f3 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -2,12 +2,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import net.spy.memcached.MemcachedConnection; import net.spy.memcached.internal.result.GetResult; -import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationStatus; /** @@ -17,51 +17,40 @@ * * @param Type of object returned from the get */ -public class GetFuture implements Future { - - private final OperationFuture> rv; +public class GetFuture extends OperationFuture { + private final AtomicReference> result; public GetFuture(CountDownLatch l, long opTimeout) { - this.rv = new OperationFuture>(l, opTimeout); + super(l, opTimeout); + result = new AtomicReference>(); } public GetFuture(GetFuture parent) { - this.rv = parent.rv; - } - - public boolean cancel(boolean ign) { - return rv.cancel(ign); - } - - public T get() throws InterruptedException, ExecutionException { - GetResult result = rv.get(); - return result == null ? null : result.getDecodedValue(); + super(parent.latch, parent.timeout); + this.result = parent.result; } public T get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { - GetResult result = rv.get(duration, units); - return result == null ? null : result.getDecodedValue(); - } - - public OperationStatus getStatus() { - return rv.getStatus(); + if (!latch.await(duration, units)) { + // whenever timeout occurs, continuous timeout counter will increase by 1. + MemcachedConnection.opTimedOut(op); + throw new CheckedOperationTimeoutException(duration, units, op); + } else { + // continuous timeout counter will be reset + MemcachedConnection.opSucceeded(op); + } + if (op != null && op.hasErrored()) { + throw new ExecutionException(op.getException()); + } + if (op != null && op.isCancelled()) { + throw new ExecutionException(new RuntimeException(op.getCancelCause())); + } + return result.get() == null ? null : result.get().getDecodedValue(); } public void set(GetResult result, OperationStatus status) { - rv.set(result, status); + this.result.set(result); + this.status = status; } - - public void setOperation(Operation to) { - rv.setOperation(to); - } - - public boolean isCancelled() { - return rv.isCancelled(); - } - - public boolean isDone() { - return rv.isDone(); - } - } diff --git a/src/main/java/net/spy/memcached/internal/result/GetsResultImpl.java b/src/main/java/net/spy/memcached/internal/result/GetsResultImpl.java new file mode 100644 index 000000000..e06fa6ec7 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/result/GetsResultImpl.java @@ -0,0 +1,26 @@ +package net.spy.memcached.internal.result; + +import net.spy.memcached.CASValue; +import net.spy.memcached.CachedData; +import net.spy.memcached.transcoders.Transcoder; + +public class GetsResultImpl implements GetResult> { + private final long cas; + private final CachedData cachedData; + private final Transcoder transcoder; + private volatile CASValue decodedValue = null; + + public GetsResultImpl(long cas, CachedData cachedData, Transcoder transcoder) { + this.cas = cas; + this.cachedData = cachedData; + this.transcoder = transcoder; + } + + @Override + public CASValue getDecodedValue() { + if (decodedValue == null) { + decodedValue = new CASValue(cas, transcoder.decode(cachedData)); + } + return decodedValue; + } +}