Skip to content

Commit

Permalink
Merge pull request #17 from jam2in/whchoi83/forReadingStateOP
Browse files Browse the repository at this point in the history
move queueReconnect line because of switchover
  • Loading branch information
jhpark816 authored Sep 8, 2016
2 parents 2750d97 + 2a61d7e commit 4c3d199
Showing 1 changed file with 80 additions and 30 deletions.
110 changes: 80 additions & 30 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
/* ENABLE_REPLICATION if */
if (arcusReplEnabled) {
List<MemcachedReplicaGroup> changeRoleGroups = new ArrayList<MemcachedReplicaGroup>();
List<MoveOperationTask> taskList = new ArrayList<MoveOperationTask>();
// will do task after locator update
List<Task> taskList = new ArrayList<Task>();
Map<String, List<ArcusReplNodeAddress>> newAllGroups =
ArcusReplNodeAddress.makeGroupAddrsList(addrs);

Expand Down Expand Up @@ -300,24 +301,25 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
}
} else { /* Old group has both a master node and a slave node. */
if (newGroupAddrs.get(0).getIPPort().equals(oldMasterAddr.getIPPort())) {
/* The old slave has disappeared. */
removeNodes.add(oldGroup.getSlaveNode());

/* move operation slave -> master */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation slave -> master
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), oldGroup.getMasterNode()));
} else if (newGroupAddrs.get(0).getIPPort().equals(oldSlaveAddr.getIPPort())) {
/* The old slave has failovered to the master with new slave */
removeNodes.add(oldGroup.getMasterNode());
changeRoleGroups.add(oldGroup);

/* move operation master -> slave */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), oldGroup.getSlaveNode()));
} else {
MemcachedNode newMasterNode;
Expand All @@ -327,10 +329,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
/* move operation old slave -> new master */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");

/* move operation old slave -> new master
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), newMasterNode));
}
}
Expand All @@ -346,7 +350,7 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
queueReconnect(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations.");
taskList.add(new QueueReconnectTask(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
} else {
MemcachedNode newMasterNode;
Expand All @@ -356,7 +360,7 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
}
} else { /* Old group has both a master node and a slave node. */
Expand All @@ -369,26 +373,33 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
removeNodes.add(oldGroup.getSlaveNode());
attachNodes.add(newSlaveNode = attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old slave -> new slave */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation old slave -> new slave
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), newSlaveNode));
}
} else if (newGroupAddrs.get(0).getIPPort().equals(oldSlaveAddr.getIPPort())) {
if (newGroupAddrs.get(1).getIPPort().equals(oldMasterAddr.getIPPort())) {
/* Switchover */
changeRoleGroups.add(oldGroup);

/* move operation master -> slave */
queueReconnect(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations.");
/* move operation master -> slave
* must keep the following execution order when switchover
* - first moveOperations
* - second, queueReconnect
*
* because moves all operations
*/
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), oldGroup.getSlaveNode()));
taskList.add(new QueueReconnectTask(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations."));
} else {
/* Failover. And, new slave has appeared */
removeNodes.add(oldGroup.getMasterNode());
changeRoleGroups.add(oldGroup);
attachNodes.add(attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old master -> old slave */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), oldGroup.getSlaveNode()));
}
} else {
Expand All @@ -401,11 +412,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
queueReconnect(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations.");
taskList.add(new QueueReconnectTask(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));

/* move operation old slave -> old master(slave) */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation old slave -> old master(slave)
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), oldGroup.getMasterNode()));
} else if (newGroupAddrs.get(1).getIPPort().equals(oldSlaveAddr.getIPPort())) {
MemcachedNode newMasterNode;
Expand All @@ -414,7 +426,7 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
} else {
MemcachedNode newMasterNode;
Expand All @@ -426,11 +438,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newSlaveNode = attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));

/* move operation old slave -> new slave */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation old slave -> new slave
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), newSlaveNode));
}
}
Expand All @@ -449,11 +462,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(attachMemcachedNode(newGroupAddrs.get(1)));
}
}

// Update the hash.
((ArcusReplKetamaNodeLocator)locator).update(attachNodes, removeNodes, changeRoleGroups);
for (MoveOperationTask task : taskList)
task.moveOperations();
// do task after locator update
for (Task t : taskList)
t.doTask();
} else {
for (MemcachedNode node : locator.getAll()) {
if (addrs.contains((InetSocketAddress) node.getSocketAddress())) {
Expand Down Expand Up @@ -1249,17 +1263,53 @@ public int getAddedQueueSize() {
return addedQueue.size();
}
/* ENABLE_REPLICATION if */

private interface Task {
void doTask();
}

private class SetupResendTask implements Task {
private MemcachedNode node;
private boolean cancelWrite;
private String cause;

public SetupResendTask(MemcachedNode node, boolean cancelWrite, String cause) {
this.node = node;
this.cancelWrite = cancelWrite;
this.cause = cause;
}

public void doTask() {
node.setupResend(cancelWrite, cause);
}
}

private class QueueReconnectTask implements Task {
private MemcachedNode node;
private ReconnDelay delay;
private String cause;

public QueueReconnectTask(MemcachedNode node, ReconnDelay delay, String cause) {
this.node = node;
this.delay = delay;
this.cause = cause;
}

public void doTask() {
queueReconnect(node, delay, cause);
}
}

private class MoveOperationTask {
MemcachedNode fromNode;
MemcachedNode toNode;
private class MoveOperationTask implements Task {
private MemcachedNode fromNode;
private MemcachedNode toNode;

public MoveOperationTask(MemcachedNode from, MemcachedNode to) {
fromNode = from;
toNode = to;
}

public void moveOperations() {
public void doTask() {
if (fromNode.moveOperations(toNode) > 0)
addedQueue.offer(toNode);
}
Expand Down

0 comments on commit 4c3d199

Please sign in to comment.