diff --git a/examples/simple_producer.rs b/examples/simple_producer.rs index 305a45d..38720b3 100644 --- a/examples/simple_producer.rs +++ b/examples/simple_producer.rs @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box> { let record = TestData::new(&format!("hello - kafka {i}")); let ret = producer.send(&topic, record).await?; let _ = tx.send(ret).await; - // tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } info!("elapsed: {:?}", now.elapsed()); // wait till all cached records send to kafka diff --git a/src/client.rs b/src/client.rs index c9c9b85..59c4db4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -49,7 +49,7 @@ pub struct Kafka { pub manager: Arc>, pub operation_retry_options: OperationRetryOptions, pub executor: Arc, - pub cluster_meta: Arc, + pub cluster: Arc, supported_versions: Arc>, } @@ -122,20 +122,20 @@ impl Kafka { manager, operation_retry_options, executor, - cluster_meta: Arc::new(Cluster::default()), + cluster: Arc::new(Cluster::default()), supported_versions: Arc::new(supported_versions), }) } pub fn topic_id(&self, topic_name: &TopicName) -> Uuid { - match self.cluster_meta.topic_id(topic_name) { + match self.cluster.topic_id(topic_name) { Some(topic_id) => topic_id, None => Uuid::nil(), } } pub fn partitions(&self, topic: &TopicName) -> Result { - self.cluster_meta.partitions(topic) + self.cluster.partitions(topic) } pub fn version_range(&self, key: ApiKey) -> Option { @@ -284,7 +284,7 @@ impl Kafka { let request = RequestKind::MetadataRequest(request); let response = self.manager.invoke(&self.manager.url, request).await?; if let ResponseKind::MetadataResponse(metadata) = response { - self.cluster_meta.update_metadata(metadata) + self.cluster.update_metadata(metadata) } else { Err(Error::Connection(ConnectionError::UnexpectedResponse( format!("{response:?}"), @@ -293,8 +293,8 @@ impl Kafka { } pub async fn update_full_metadata(&self) -> Result<()> { - let mut topics = Vec::with_capacity(self.cluster_meta.topics.len()); - for topic in self.cluster_meta.topics.iter() { + let mut topics = Vec::with_capacity(self.cluster.topics.len()); + for topic in self.cluster.topics.iter() { topics.push(topic.key().clone()); } self.update_metadata(topics).await diff --git a/src/consumer/fetcher.rs b/src/consumer/fetcher.rs index c803f83..90a8e33 100644 --- a/src/consumer/fetcher.rs +++ b/src/consumer/fetcher.rs @@ -61,7 +61,7 @@ impl Fetcher { completed_fetches_tx: mpsc::UnboundedSender, ) -> Self { let sessions = DashMap::new(); - for node in client.cluster_meta.nodes.iter() { + for node in client.cluster.nodes.iter() { sessions.insert(node.id, FetchSession::new(node.id)); } @@ -100,7 +100,7 @@ impl Fetcher { version = 12; } let metadata = fetch_request_data.metadata; - if let Some(node) = self.client.cluster_meta.nodes.get(&node) { + if let Some(node) = self.client.cluster.nodes.get(&node) { let fetch_request = self.fetch_builder(&mut fetch_request_data, version).await?; trace!("Send fetch request: {:?}", fetch_request); @@ -239,7 +239,7 @@ impl Fetcher { match rx.await { Ok(partitions) => { for tp in partitions { - let current_leader = self.client.cluster_meta.current_leader(&tp); + let current_leader = self.client.cluster.current_leader(&tp); self.event_tx.unbounded_send( CoordinatorEvent::MaybeValidatePositionForCurrentLeader { partition: tp, @@ -428,7 +428,7 @@ impl Fetcher { let position = FetchPosition { offset: offset_data.offset - 1, offset_epoch: None, - current_leader: self.client.cluster_meta.current_leader(&partition), + current_leader: self.client.cluster.current_leader(&partition), }; // TODO: metadata update last seen epoch if newer self.event_tx @@ -603,8 +603,8 @@ impl Fetcher { offset_reset_timestamps: &mut HashMap, ) -> Result> { let mut node_request = HashMap::new(); - for node_entry in self.client.cluster_meta.nodes.iter() { - if let Ok(node_topology) = self.client.cluster_meta.drain_node(node_entry.value().id) { + for node_entry in self.client.cluster.nodes.iter() { + if let Ok(node_topology) = self.client.cluster.drain_node(node_entry.value().id) { let partitions = node_topology.value(); let mut topics = HashMap::new(); diff --git a/src/coordinator/consumer.rs b/src/coordinator/consumer.rs index 7c69efa..b4dbf8c 100644 --- a/src/coordinator/consumer.rs +++ b/src/coordinator/consumer.rs @@ -53,7 +53,7 @@ use crate::{ const CONSUMER_PROTOCOL_TYPE: &str = "consumer"; -macro_rules! offset_fetch_block { +macro_rules! fetch_offsets_block { ($self:ident, $source:ident) => { for topic in $source.topics { for partition in topic.partitions { @@ -63,13 +63,22 @@ macro_rules! offset_fetch_block { }; if partition.error_code.is_ok() { if let Some(partition_state) = $self.subscriptions.assignments.get_mut(&tp) { - partition_state.position.offset = partition.committed_offset; - partition_state.position.offset_epoch = - Some(partition.committed_leader_epoch); - info!( - "Fetch {tp} offset success, offset: {}", - partition.committed_offset - ); + // record the position with the offset (-1 indicates no committed offset to + // fetch) + if partition.committed_offset >= 0 { + partition_state.position.offset = partition.committed_offset; + partition_state.position.offset_epoch = + Some(partition.committed_leader_epoch); + info!( + "Fetch {tp} offset success, offset: {}", + partition.committed_offset + ); + } else { + debug!( + "Found no committed offset for partition {}", + partition.partition_index + ); + } } } else { error!( @@ -86,7 +95,7 @@ macro_rules! offset_fetch_block { if let Some(tp_state) = $self.subscriptions.assignments.get_mut(&tp) { tp_state.position.offset = offset; tp_state.position.offset_epoch = None; - tp_state.position.current_leader = $self.client.cluster_meta.current_leader(&tp); + tp_state.position.current_leader = $self.client.cluster.current_leader(&tp); info!("Seek {tp} with offset: {offset}",); } } @@ -370,7 +379,7 @@ impl CoordinatorInner { self.join_group().await?; self.sync_group().await?; - self.offset_fetch().await?; + self.fetch_offsets().await?; // resume fetch thread. self.fetcher.resume(); @@ -557,22 +566,30 @@ impl CoordinatorInner { }; let mut tp_state = TopicPartitionState::new(*partition); tp_state.position.current_leader = - self.client.cluster_meta.current_leader(&tp); + self.client.cluster.current_leader(&tp); self.subscriptions.assignments.insert(tp, tp_state); } } self.state = MemberState::Stable; + + let group_id = self.group_meta.group_id.as_str(); info!( - "Sync group [{}] success, leader = {}, member_id = {}, generation_id \ - = {}, protocol_type = {}, protocol_name = {}, assignments = <{}>", - self.group_meta.group_id.as_str(), + "Sync group [{group_id}] success, leader = {}, member_id = {}, \ + generation_id = {}, assignments = <{}>", self.group_meta.leader.as_str(), self.group_meta.member_id.as_str(), self.group_meta.generation_id, - self.group_meta.protocol_type.as_ref().unwrap().as_str(), - self.group_meta.protocol_name.as_ref().unwrap().as_str(), crate::array_display(self.subscriptions.assignments.keys()), ); + + if let Some(protocol_type) = self.group_meta.protocol_type.as_ref() { + info!( + "Sync group [{group_id}] success, protocol_type = {}, \ + protocol_name = {}", + protocol_type.as_str(), + self.group_meta.protocol_name.as_ref().unwrap().as_str() + ); + } Ok(()) } Some(error) => Err(error.into()), @@ -582,7 +599,7 @@ impl CoordinatorInner { } } - pub async fn offset_fetch(&mut self) -> Result<()> { + pub async fn fetch_offsets(&mut self) -> Result<()> { match self.client.version_range(ApiKey::OffsetFetchKey) { Some(version_range) => { let mut offset_fetch_response = self @@ -595,9 +612,9 @@ impl CoordinatorInner { match offset_fetch_response.error_code.err() { None => { if let Some(group) = offset_fetch_response.groups.pop() { - offset_fetch_block!(self, group); + fetch_offsets_block!(self, group); } else { - offset_fetch_block!(self, offset_fetch_response); + fetch_offsets_block!(self, offset_fetch_response); } Ok(()) } @@ -829,7 +846,7 @@ impl CoordinatorInner { match self.group_meta.protocol_name { Some(ref protocol) => { let assignor = self.look_up_assignor(&protocol.to_string())?; - let cluster = self.client.cluster_meta.clone(); + let cluster = self.client.cluster.clone(); request.assignments = serialize_assignments(assignor.assign(cluster, &self.group_subscription)?)?; } @@ -858,7 +875,7 @@ impl CoordinatorInner { if version <= 7 { let mut topics = Vec::with_capacity(self.subscriptions.topics.len()); for assign in self.subscriptions.topics.iter() { - let partitions = self.client.cluster_meta.partitions(assign)?; + let partitions = self.client.cluster.partitions(assign)?; let mut topic = OffsetFetchRequestTopic::default(); topic.name = assign.clone(); @@ -870,7 +887,7 @@ impl CoordinatorInner { } else { let mut topics = Vec::with_capacity(self.subscriptions.topics.len()); for assign in self.subscriptions.topics.iter() { - let partitions = self.client.cluster_meta.partitions(assign)?; + let partitions = self.client.cluster.partitions(assign)?; let mut topic = OffsetFetchRequestTopics::default(); topic.name = assign.clone(); @@ -984,7 +1001,7 @@ async fn coordinator_loop( CoordinatorEvent::JoinGroup => coordinator.join_group().await, CoordinatorEvent::SyncGroup => coordinator.sync_group().await, CoordinatorEvent::LeaveGroup(reason) => coordinator.maybe_leave_group(reason).await, - CoordinatorEvent::OffsetFetch => coordinator.offset_fetch().await, + CoordinatorEvent::OffsetFetch => coordinator.fetch_offsets().await, CoordinatorEvent::PauseFetch => { coordinator.fetcher.pause(); Ok(()) diff --git a/src/metadata.rs b/src/metadata.rs index 80fd713..a0f1a08 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -299,17 +299,19 @@ impl Cluster { if !self.nodes.contains_key(&node) { return Err(Error::NodeNotAvailable { node }); } - return if let Some(node_entry) = self.partitions_by_nodes.get(&node) { - Ok(node_entry) + return if let Some(node_ref) = self.partitions_by_nodes.get(&node) { + Ok(node_ref) } else { let mut topic_partitions = Vec::new(); - for topic_entry in self.topics.iter() { - for partition in topic_entry.partitions.iter() { - let tp = TopicPartition { - topic: topic_entry.key().clone(), - partition: partition.partition, - }; - topic_partitions.push(tp); + for topic in self.topics.iter() { + for partition in topic.partitions.iter() { + if partition.leader == node { + let tp = TopicPartition { + topic: topic.key().clone(), + partition: partition.partition, + }; + topic_partitions.push(tp); + } } } self.partitions_by_nodes.insert(node, topic_partitions); diff --git a/src/producer/mod.rs b/src/producer/mod.rs index b292e13..3a986dd 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -332,9 +332,13 @@ impl Producer { encode_options: &RecordEncodeOptions, ) -> Result> { let mut result = Vec::new(); - for node_entry in self.client.cluster_meta.nodes.iter() { - if let Ok(node_topology) = self.client.cluster_meta.drain_node(node_entry.value().id) { - let partitions = node_topology.value(); + for node_entry in self.client.cluster.nodes.iter() { + if let Ok(node) = self.client.cluster.drain_node(node_entry.value().id) { + let partitions = node.value(); + if partitions.is_empty() { + continue; + } + let mut topic_data = IndexMap::new(); let mut topics_thunks = BTreeMap::new(); @@ -444,7 +448,7 @@ impl TopicProducer { pub async fn new(client: Arc>, topic: TopicName) -> Result>> { client.update_metadata(vec![topic.clone()]).await?; - let partitions = client.cluster_meta.partitions(&topic)?; + let partitions = client.cluster.partitions(&topic)?; let partitions = partitions.value(); let num_partitions = partitions.len(); let batches = DashMap::with_capacity_and_hasher(num_partitions, FxBuildHasher::default()); @@ -473,7 +477,7 @@ impl TopicProducer { &self.topic, record.key(), record.value(), - &self.client.cluster_meta, + &self.client.cluster, )?; } return match self.batches.get_mut(&partition) { diff --git a/src/protocol.rs b/src/protocol.rs index 9971d08..1b93557 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -160,6 +160,7 @@ impl KafkaCodec { } Ok(()) } + fn encode_request( &mut self, mut header: RequestHeader, @@ -192,18 +193,8 @@ impl KafkaCodec { Ok(()) } - fn response_header_version(&self, api_key: i16, api_version: i16) -> i16 { - if let Some(version_range) = self.support_versions.get(&api_key) { - if api_version >= version_range.max { - return 1; - } - } - 0 - } - fn decode_response(&mut self, src: &mut BytesMut) -> Result, ConnectionError> { - let mut correlation_id_bytes = src.try_peek_bytes(0..4)?; - let correlation_id = correlation_id_bytes.get_i32(); + let correlation_id = src.peek_bytes(0..4).get_i32(); let request_header = self .active_requests