diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index b8cd743d9..0d91a4043 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -471,6 +471,17 @@ private void updateReplConnections(List addrs) throws IOExcep Set newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs); if (oldMasterAddr.isSameAddress(newMasterAddr)) { + if (oldGroup.masterCandidate != null) { + /** + * Handling the below case. + * old group : [oldMaster, oldSlave] + * old group after switchover response : [oldMaster, oldSlave-masterCandidate] + * new group from zk cache list: [oldMaster, X] + */ + oldGroup.masterCandidate = null; + taskList.add(new MoveOperationTask( + oldGroup.getMasterCandidate(), oldMasterNode, false)); + } // add newly added slave node for (ArcusReplNodeAddress newSlaveAddr : newSlaveAddrs) { if (!oldSlaveAddrs.contains(newSlaveAddr)) { @@ -490,9 +501,25 @@ private void updateReplConnections(List addrs) throws IOExcep } else if (oldSlaveAddrs.contains(newMasterAddr)) { if (newSlaveAddrs.contains(oldMasterAddr)) { // Switchover - if (oldGroup.getMasterCandidate() != null) { + ArcusReplNodeAddress fromZk = (ArcusReplNodeAddress) oldGroup + .getMasterCandidateZk(newMasterAddr.getIPPort()).getSocketAddress(); + MemcachedNode masterCandidate = oldGroup.getMasterCandidate(); + if (masterCandidate != null && fromZk.isSameAddress( + ((ArcusReplNodeAddress) masterCandidate.getSocketAddress()))) { changeRoleGroups.add(oldGroup); } else { + if (masterCandidate != null) { + /** + * Moves ops from masterCandidate set by cache server to oldMasterNode + * before setting master candidate from zk cache list. + * Handling the below case. + * old group : [oldMaster, oldSlave1, oldSlave2] + * old group after switchover response : [oldMaster, oldSlave1-masterCandidate, oldSlave2] + * new group from zk cache list: [slave1, X, newMaster] + */ + taskList.add(new MoveOperationTask( + masterCandidate, oldMasterNode, false)); + } // ZK event occurs before cache server response. oldGroup.setMasterCandidateByAddr(newMasterAddr.getIPPort()); if (oldMasterNode.hasNonIdempotentOperationInReadQ()) { diff --git a/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java b/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java index a6dedc4d4..809d837ad 100644 --- a/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java +++ b/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java @@ -181,5 +181,14 @@ private MemcachedNode getNextActiveSlaveNodeNoRotate() { public static String getGroupNameFromNode(final MemcachedNode node) { return ((ArcusReplNodeAddress) node.getSocketAddress()).getGroupName(); } + + public MemcachedNode getMasterCandidateZk(String address) { + for (MemcachedNode node : this.getSlaveNodes()) { + if (address.equals(((ArcusReplNodeAddress) node.getSocketAddress()).getIPPort())) { + return node; + } + } + return null; + } } /* ENABLE_REPLICATION end */