Skip to content

Commit

Permalink
FIX: Send flush op to master node only.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 authored and jhpark816 committed Oct 30, 2024
1 parent d79a8d1 commit 132254c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
3 changes: 2 additions & 1 deletion src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,8 @@ public OperationFuture<Boolean> flush(final String prefix) {

@Override
public OperationFuture<Boolean> flush(final String prefix, final int delay) {
Collection<MemcachedNode> nodes = getAllNodes();
Collection<MemcachedNode> nodes = getFlushNodes();

final BroadcastFuture<Boolean> rv
= new BroadcastFuture<>(operationTimeout, Boolean.TRUE, nodes.size());
final Map<MemcachedNode, Operation> opsMap = new HashMap<>();
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -41,7 +42,7 @@
public final class ArcusReplKetamaNodeLocator extends SpyObject implements NodeLocator {

private final TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaGroups;
private final HashMap<String, MemcachedReplicaGroup> allGroups;
private final ConcurrentHashMap<String, MemcachedReplicaGroup> allGroups;
private final Collection<MemcachedNode> allNodes;

/* ENABLE_MIGRATION if */
Expand All @@ -67,7 +68,7 @@ public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {
super();
allNodes = nodes;
ketamaGroups = new TreeMap<>();
allGroups = new HashMap<>();
allGroups = new ConcurrentHashMap<>();

// create all memcached replica group
for (MemcachedNode node : nodes) {
Expand Down Expand Up @@ -103,7 +104,7 @@ public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {
}

private ArcusReplKetamaNodeLocator(TreeMap<Long, SortedSet<MemcachedReplicaGroup>> kg,
HashMap<String, MemcachedReplicaGroup> ag,
ConcurrentHashMap<String, MemcachedReplicaGroup> ag,
Collection<MemcachedNode> an) {
super();
ketamaGroups = kg;
Expand Down Expand Up @@ -208,7 +209,8 @@ public NodeLocator getReadonlyCopy() {
lock.lock();
try {
TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaCopy = new TreeMap<>();
HashMap<String, MemcachedReplicaGroup> groupsCopy = new HashMap<>(allGroups.size());
ConcurrentHashMap<String, MemcachedReplicaGroup> groupsCopy
= new ConcurrentHashMap<>(allGroups.size());
Collection<MemcachedNode> nodesCopy = new ArrayList<>(allNodes.size());

// Rewrite the values a copy of the map
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.AuthThreadMonitor;
Expand Down Expand Up @@ -1880,7 +1881,8 @@ public void complete() {
* is too full to accept any more requests
*/
public Future<Boolean> flush(final int delay) {
Collection<MemcachedNode> nodes = getAllNodes();
Collection<MemcachedNode> nodes = getFlushNodes();

final BroadcastFuture<Boolean> rv
= new BroadcastFuture<>(operationTimeout, Boolean.TRUE, nodes.size());
final Map<MemcachedNode, Operation> opsMap = new HashMap<>();
Expand Down Expand Up @@ -2148,6 +2150,18 @@ protected Collection<MemcachedNode> getAllNodes() {
return conn.getLocator().getAll();
}

protected Collection<MemcachedNode> getFlushNodes() {
/* ENABLE_REPLICATION if */
if (conn.getArcusReplEnabled()) {
return ((ArcusReplKetamaNodeLocator) getNodeLocator()).getAllGroups().values()
.stream()
.map(MemcachedReplicaGroup::getMasterNode)
.collect(Collectors.toList());
}
/* ENABLE_REPLICATION end */
return conn.getLocator().getAll();
}

/**
* Turn the list of keys into groups of keys.
* All keys in a group belong to the same memcached server.
Expand Down

0 comments on commit 132254c

Please sign in to comment.