Skip to content

Commit

Permalink
FIX: Handling abnormal swithover from zk.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jan 8, 2025
1 parent db1c636 commit 1286a78
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
29 changes: 28 additions & 1 deletion src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,17 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
Set<ArcusReplNodeAddress> newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs);

if (oldMasterAddr.isSameAddress(newMasterAddr)) {
if (oldGroup.masterCandidate != null) {
/**
* Handling the below case.
* old group : [oldMaster, oldSlave]
* old group after switchover response : [oldMaster, oldSlave-masterCandidate]
* new group from zk cache list: [oldMaster, X]
*/
oldGroup.masterCandidate = null;
taskList.add(new MoveOperationTask(
oldGroup.getMasterCandidate(), oldMasterNode, false));
}
// add newly added slave node
for (ArcusReplNodeAddress newSlaveAddr : newSlaveAddrs) {
if (!oldSlaveAddrs.contains(newSlaveAddr)) {
Expand All @@ -490,9 +501,25 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
} else if (oldSlaveAddrs.contains(newMasterAddr)) {
if (newSlaveAddrs.contains(oldMasterAddr)) {
// Switchover
if (oldGroup.getMasterCandidate() != null) {
ArcusReplNodeAddress fromZk = (ArcusReplNodeAddress) oldGroup
.getMasterCandidateZk(newMasterAddr.getIPPort()).getSocketAddress();
MemcachedNode masterCandidate = oldGroup.getMasterCandidate();
if (masterCandidate != null && fromZk.isSameAddress(
((ArcusReplNodeAddress) masterCandidate.getSocketAddress()))) {
changeRoleGroups.add(oldGroup);
} else {
if (masterCandidate != null) {
/**
* Moves ops from masterCandidate set by cache server to oldMasterNode
* before setting master candidate from zk cache list.
* Handling the below case.
* old group : [oldMaster, oldSlave1, oldSlave2]
* old group after switchover response : [oldMaster, oldSlave1-masterCandidate, oldSlave2]
* new group from zk cache list: [slave1, X, newMaster]
*/
taskList.add(new MoveOperationTask(
masterCandidate, oldMasterNode, false));
}
// ZK event occurs before cache server response.
oldGroup.setMasterCandidateByAddr(newMasterAddr.getIPPort());
if (oldMasterNode.hasNonIdempotentOperationInReadQ()) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedReplicaGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,14 @@ private MemcachedNode getNextActiveSlaveNodeNoRotate() {
public static String getGroupNameFromNode(final MemcachedNode node) {
return ((ArcusReplNodeAddress) node.getSocketAddress()).getGroupName();
}

public MemcachedNode getMasterCandidateZk(String address) {
for (MemcachedNode node : this.getSlaveNodes()) {
if (address.equals(((ArcusReplNodeAddress) node.getSocketAddress()).getIPPort())) {
return node;
}
}
return null;
}
}
/* ENABLE_REPLICATION end */

0 comments on commit 1286a78

Please sign in to comment.