From e501361387a38ece54f486520275378d9a1e4e86 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 8 May 2019 16:27:18 +0800 Subject: [PATCH] Bump to v0.4.3 (#231) * raft: leader respond to learner read index message (#220) Signed-off-by: nolouch * Bump to v0.4.3 Signed-off-by: Neil Shen --- CHANGELOG.md | 4 ++ Cargo.toml | 2 +- benches/benches.rs | 1 - src/lib.rs | 1 - src/raft.rs | 30 ++++++++---- src/raw_node.rs | 8 +-- tests/integration_cases/test_raft.rs | 73 ++++++++++++++++++++++++++++ 7 files changed, 103 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7a4159fe..577f3af17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.4.3 - 2019-05-08 + +- Leader responds to learner read index message. (https://github.com/pingcap/raft-rs/pull/220) + # 0.4.2 - 2019-04-29 - Fix potential two leaders at the same term when transferring leader. (https://github.com/pingcap/raft-rs/pull/225) diff --git a/Cargo.toml b/Cargo.toml index 5510c51a0..ec98653b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raft" -version = "0.4.2" +version = "0.4.3" authors = ["The TiKV Project Developers"] license = "Apache-2.0" keywords = ["raft", "distributed-systems", "ha"] diff --git a/benches/benches.rs b/benches/benches.rs index 6b034e269..63ca57fa2 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -12,7 +12,6 @@ mod suites; pub const DEFAULT_RAFT_SETS: [(usize, usize); 4] = [(0, 0), (3, 1), (5, 2), (7, 3)]; fn main() { - criterion::init_logging(); let mut c = Criterion::default() // Configure defaults before overriding with args. .warm_up_time(Duration::from_millis(500)) diff --git a/src/lib.rs b/src/lib.rs index 412fca4cd..29707f0ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -261,7 +261,6 @@ For more information, check out an [example](examples/single_mem_node/main.rs#L1 */ #![cfg_attr(not(feature = "cargo-clippy"), allow(unknown_lints))] -#![cfg_attr(feature = "cargo-clippy", feature(tool_lints))] #![deny(missing_docs)] #[cfg(feature = "failpoint")] diff --git a/src/raft.rs b/src/raft.rs index 3928d0ec2..713f9a497 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -679,7 +679,7 @@ impl Raft { self.election_elapsed = 0; let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id)); - self.step(m).is_ok(); + let _ = self.step(m); true } @@ -695,7 +695,7 @@ impl Raft { if self.check_quorum { let m = new_message(INVALID_ID, MessageType::MsgCheckQuorum, Some(self.id)); has_ready = true; - self.step(m).is_ok(); + let _ = self.step(m); } if self.state == StateRole::Leader && self.lead_transferee.is_some() { self.abort_leader_transfer() @@ -710,7 +710,7 @@ impl Raft { self.heartbeat_elapsed = 0; has_ready = true; let m = new_message(INVALID_ID, MessageType::MsgBeat, Some(self.id)); - self.step(m).is_ok(); + let _ = self.step(m); } has_ready } @@ -794,7 +794,7 @@ impl Raft { } fn num_pending_conf(&self, ents: &[Entry]) -> usize { - ents.into_iter() + ents.iter() .filter(|e| e.get_entry_type() == EntryType::EntryConfChange) .count() } @@ -1486,11 +1486,23 @@ impl Raft { } } } else { - let rs = ReadState { - index: self.raft_log.committed, - request_ctx: m.take_entries()[0].take_data(), - }; - self.read_states.push(rs); + // there is only one voting member (the leader) in the cluster + if m.get_from() == INVALID_ID || m.get_from() == self.id { + // from leader itself + let rs = ReadState { + index: self.raft_log.committed, + request_ctx: m.take_entries()[0].take_data(), + }; + self.read_states.push(rs); + } else { + // from learner member + let mut to_send = Message::default(); + to_send.set_to(m.get_from()); + to_send.set_msg_type(MessageType::MsgReadIndexResp); + to_send.set_index(self.raft_log.committed); + to_send.set_entries(m.take_entries()); + self.send(to_send); + } } return Ok(()); } diff --git a/src/raw_node.rs b/src/raw_node.rs index 94da81d85..350812b11 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -443,7 +443,7 @@ impl RawNode { m.set_msg_type(MessageType::MsgUnreachable); m.set_from(id); // we don't care if it is ok actually - self.raft.step(m).is_ok(); + let _ = self.raft.step(m); } /// ReportSnapshot reports the status of the sent snapshot. @@ -454,7 +454,7 @@ impl RawNode { m.set_from(id); m.set_reject(rej); // we don't care if it is ok actually - self.raft.step(m).is_ok(); + let _ = self.raft.step(m); } /// TransferLeader tries to transfer leadership to the given transferee. @@ -462,7 +462,7 @@ impl RawNode { let mut m = Message::new(); m.set_msg_type(MessageType::MsgTransferLeader); m.set_from(transferee); - self.raft.step(m).is_ok(); + let _ = self.raft.step(m); } /// ReadIndex requests a read state. The read state will be set in ready. @@ -475,7 +475,7 @@ impl RawNode { let mut e = Entry::new(); e.set_data(rctx); m.set_entries(RepeatedField::from_vec(vec![e])); - self.raft.step(m).is_ok(); + let _ = self.raft.step(m); } /// Returns the store as an immutable reference. diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 480347c72..f1a80f53a 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -2205,6 +2205,79 @@ fn test_read_only_option_safe() { } } +#[test] +fn test_read_only_with_learner() { + setup_for_test(); + let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage()); + let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage()); + + let mut nt = Network::new(vec![Some(a), Some(b)]); + + // we can not let system choose the value of randomizedElectionTimeout + // otherwise it will introduce some uncertainty into this test case + // we need to ensure randomizedElectionTimeout > electionTimeout here + let b_election_timeout = nt.peers[&2].get_election_timeout(); + nt.peers + .get_mut(&2) + .unwrap() + .set_randomized_election_timeout(b_election_timeout + 1); + + for _ in 0..b_election_timeout { + nt.peers.get_mut(&2).unwrap().tick(); + } + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + assert_eq!(nt.peers[&1].state, StateRole::Leader); + assert_eq!(nt.peers[&2].state, StateRole::Follower); + + let mut tests = vec![ + (1, 10, 11, "ctx1"), + (2, 10, 21, "ctx2"), + (1, 10, 31, "ctx3"), + (2, 10, 41, "ctx4"), + ]; + + for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { + for _ in 0..proposals { + nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); + } + + let e = new_entry(0, 0, Some(wctx)); + nt.send(vec![new_message_with_entries( + id, + id, + MessageType::MsgReadIndex, + vec![e], + )]); + + let read_states: Vec = nt + .peers + .get_mut(&id) + .unwrap() + .read_states + .drain(..) + .collect(); + assert_eq!( + read_states.is_empty(), + false, + "#{}: read_states is empty, want non-empty", + i + ); + let rs = &read_states[0]; + assert_eq!( + rs.index, wri, + "#{}: read_index = {}, want {}", + i, rs.index, wri + ); + let vec_wctx = wctx.as_bytes().to_vec(); + assert_eq!( + rs.request_ctx, vec_wctx, + "#{}: request_ctx = {:?}, want {:?}", + i, rs.request_ctx, vec_wctx + ); + } +} + #[test] fn test_read_only_option_lease() { setup_for_test();