Skip to content

Commit

Permalink
INTERNAL: Change asyncGets return future type.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jan 9, 2024
1 parent eb5c939 commit f0c91c6
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 46 deletions.
4 changes: 2 additions & 2 deletions src/main/java/net/spy/memcached/ArcusClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,12 @@ public GetFuture<Object> asyncGet(String key) {
}

@Override
public <T> OperationFuture<CASValue<T>> asyncGets(String key, Transcoder<T> tc) {
public <T> GetFuture<CASValue<T>> asyncGets(String key, Transcoder<T> tc) {
return this.getClient().asyncGets(key, tc);
}

@Override
public OperationFuture<CASValue<Object>> asyncGets(String key) {
public GetFuture<CASValue<Object>> asyncGets(String key) {
return this.getClient().asyncGets(key);
}

Expand Down
15 changes: 7 additions & 8 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SingleElementInfiniteIterator;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.internal.result.GetResultImpl;
import net.spy.memcached.ops.CASOperationStatus;
Expand Down Expand Up @@ -936,16 +937,15 @@ public GetFuture<Object> asyncGet(final String key) {
* @throws IllegalStateException in the rare circumstance where queue
* is too full to accept any more requests
*/
public <T> OperationFuture<CASValue<T>> asyncGets(final String key,
public <T> GetFuture<CASValue<T>> asyncGets(final String key,
final Transcoder<T> tc) {

final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<CASValue<T>> rv =
new OperationFuture<CASValue<T>>(latch, operationTimeout);
final GetFuture<CASValue<T>> rv = new GetFuture<CASValue<T>>(latch, operationTimeout);

Operation op = opFact.gets(key,
new GetsOperation.Callback() {
private CASValue<T> val = null;
private GetResult<CASValue<T>> val = null;

public void receivedStatus(OperationStatus status) {
rv.set(val, status);
Expand All @@ -954,8 +954,7 @@ public void receivedStatus(OperationStatus status) {
public void gotData(String k, int flags, long cas, byte[] data) {
assert key.equals(k) : "Wrong key returned";
assert cas > 0 : "CAS was less than zero: " + cas;
val = new CASValue<T>(cas, tc.decode(
new CachedData(flags, data, tc.getMaxSize())));
val = new GetsResultImpl<T>(cas, new CachedData(flags, data, tc.getMaxSize()), tc);
}

public void complete() {
Expand All @@ -976,7 +975,7 @@ public void complete() {
* @throws IllegalStateException in the rare circumstance where queue
* is too full to accept any more requests
*/
public OperationFuture<CASValue<Object>> asyncGets(final String key) {
public GetFuture<CASValue<Object>> asyncGets(final String key) {
return asyncGets(key, transcoder);
}

Expand All @@ -993,7 +992,7 @@ public OperationFuture<CASValue<Object>> asyncGets(final String key) {
* is too full to accept any more requests
*/
public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
OperationFuture<CASValue<T>> future = asyncGets(key, tc);
GetFuture<CASValue<T>> future = asyncGets(key, tc);
try {
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down
61 changes: 25 additions & 36 deletions src/main/java/net/spy/memcached/internal/GetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationStatus;

/**
Expand All @@ -17,51 +17,40 @@
*
* @param <T> Type of object returned from the get
*/
public class GetFuture<T> implements Future<T> {

private final OperationFuture<GetResult<T>> rv;
public class GetFuture<T> extends OperationFuture<T> {
private final AtomicReference<GetResult<T>> result;

public GetFuture(CountDownLatch l, long opTimeout) {
this.rv = new OperationFuture<GetResult<T>>(l, opTimeout);
super(l, opTimeout);
result = new AtomicReference<GetResult<T>>();
}

public GetFuture(GetFuture<T> parent) {
this.rv = parent.rv;
}

public boolean cancel(boolean ign) {
return rv.cancel(ign);
}

public T get() throws InterruptedException, ExecutionException {
GetResult<T> result = rv.get();
return result == null ? null : result.getDecodedValue();
super(parent.latch, parent.timeout);
this.result = parent.result;
}

public T get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {
GetResult<T> result = rv.get(duration, units);
return result == null ? null : result.getDecodedValue();
}

public OperationStatus getStatus() {
return rv.getStatus();
if (!latch.await(duration, units)) {
// whenever timeout occurs, continuous timeout counter will increase by 1.
MemcachedConnection.opTimedOut(op);
throw new CheckedOperationTimeoutException(duration, units, op);
} else {
// continuous timeout counter will be reset
MemcachedConnection.opSucceeded(op);
}
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
return result.get() == null ? null : result.get().getDecodedValue();
}

public void set(GetResult<T> result, OperationStatus status) {
rv.set(result, status);
this.result.set(result);
this.status = status;
}

public void setOperation(Operation to) {
rv.setOperation(to);
}

public boolean isCancelled() {
return rv.isCancelled();
}

public boolean isDone() {
return rv.isDone();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package net.spy.memcached.internal.result;

import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.Transcoder;

public class GetsResultImpl<T> implements GetResult<CASValue<T>> {
private final long cas;
private final CachedData cachedData;
private final Transcoder<T> transcoder;
private volatile CASValue<T> decodedValue = null;

public GetsResultImpl(long cas, CachedData cachedData, Transcoder<T> transcoder) {
this.cas = cas;
this.cachedData = cachedData;
this.transcoder = transcoder;
}

@Override
public CASValue<T> getDecodedValue() {
if (decodedValue == null) {
decodedValue = new CASValue<T>(cas, transcoder.decode(cachedData));
}
return decodedValue;
}
}

0 comments on commit f0c91c6

Please sign in to comment.