Skip to content

Commit

Permalink
fix(producer): send messages to leader
Browse files Browse the repository at this point in the history
  • Loading branch information
iamazy committed Mar 22, 2024
1 parent 56110d8 commit c1c66d4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
2 changes: 1 addition & 1 deletion examples/simple_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box<Error>> {
let record = TestData::new(&format!("hello - kafka {i}"));
let ret = producer.send(&topic, record).await?;
let _ = tx.send(ret).await;
// tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!("elapsed: {:?}", now.elapsed());
// wait till all cached records send to kafka
Expand Down
18 changes: 13 additions & 5 deletions src/coordinator/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,17 +562,25 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
}
}
self.state = MemberState::Stable;

let group_id = self.group_meta.group_id.as_str();
info!(
"Sync group [{}] success, leader = {}, member_id = {}, generation_id \
= {}, protocol_type = {}, protocol_name = {}, assignments = <{}>",
self.group_meta.group_id.as_str(),
"Sync group [{group_id}] success, leader = {}, member_id = {}, \
generation_id = {}, assignments = <{}>",
self.group_meta.leader.as_str(),
self.group_meta.member_id.as_str(),
self.group_meta.generation_id,
self.group_meta.protocol_type.as_ref().unwrap().as_str(),
self.group_meta.protocol_name.as_ref().unwrap().as_str(),
crate::array_display(self.subscriptions.assignments.keys()),
);

if let Some(protocol_type) = self.group_meta.protocol_type.as_ref() {
info!(
"Sync group [{group_id}] success, protocol_type = {}, \
protocol_name = {}",
protocol_type.as_str(),
self.group_meta.protocol_name.as_ref().unwrap().as_str()
);
}
Ok(())
}
Some(error) => Err(error.into()),
Expand Down
20 changes: 11 additions & 9 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,19 @@ impl Cluster {
if !self.nodes.contains_key(&node) {
return Err(Error::NodeNotAvailable { node });
}
return if let Some(node_entry) = self.partitions_by_nodes.get(&node) {
Ok(node_entry)
return if let Some(node_ref) = self.partitions_by_nodes.get(&node) {
Ok(node_ref)
} else {
let mut topic_partitions = Vec::new();
for topic_entry in self.topics.iter() {
for partition in topic_entry.partitions.iter() {
let tp = TopicPartition {
topic: topic_entry.key().clone(),
partition: partition.partition,
};
topic_partitions.push(tp);
for topic in self.topics.iter() {
for partition in topic.partitions.iter() {
if partition.leader == node {
let tp = TopicPartition {
topic: topic.key().clone(),
partition: partition.partition,
};
topic_partitions.push(tp);
}
}
}
self.partitions_by_nodes.insert(node, topic_partitions);
Expand Down
8 changes: 6 additions & 2 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,12 @@ impl<Exe: Executor> Producer<Exe> {
) -> Result<Vec<(Node, FlushResult)>> {
let mut result = Vec::new();
for node_entry in self.client.cluster_meta.nodes.iter() {
if let Ok(node_topology) = self.client.cluster_meta.drain_node(node_entry.value().id) {
let partitions = node_topology.value();
if let Ok(node) = self.client.cluster_meta.drain_node(node_entry.value().id) {
let partitions = node.value();
if partitions.is_empty() {
continue;
}

let mut topic_data = IndexMap::new();
let mut topics_thunks = BTreeMap::new();

Expand Down

0 comments on commit c1c66d4

Please sign in to comment.