From 56110d84083269bbdfb422b617c4214af64434cd Mon Sep 17 00:00:00 2001 From: iamazy Date: Thu, 21 Mar 2024 19:20:44 +0800 Subject: [PATCH] chore(deps): bump `kafka-protocol` to 0.10 (#31) * Chore(deps): bump `kafka-protocol` to 0.10 * Chore: make clippy happy * Chore: make fmt happy * chore: make fmt happy * chore: make fmt happy * fix: test --- .github/workflows/build.yaml | 8 ++-- Cargo.toml | 20 ++++---- rust-toolchain | 2 +- rustfmt.toml | 1 + src/consumer/fetch_session.rs | 76 +++++++++++++++--------------- src/consumer/fetcher.rs | 19 +++++--- src/consumer/mod.rs | 2 +- src/consumer/partition_assignor.rs | 2 +- src/consumer/subscription_state.rs | 15 ++++-- src/coordinator/consumer.rs | 56 ++++++++++++---------- src/error.rs | 24 ++++------ src/lib.rs | 2 +- src/metadata.rs | 11 +++-- src/producer/mod.rs | 9 +++- src/protocol.rs | 39 +++++++++++---- 15 files changed, 165 insertions(+), 121 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index d3e1043..497c634 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -10,9 +10,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 + - uses: dtolnay/rust-toolchain@nightly + with: + components: rustfmt, clippy + - uses: dtolnay/rust-toolchain@stable with: - profile: minimal components: rustfmt, clippy - uses: actions/cache@v3 with: @@ -22,7 +24,7 @@ jobs: ~/.cargo/git/db/ key: ${{ runner.os }}-cargo - name: Check code format - run: cargo fmt --all -- --check + run: cargo +nightly fmt --all -- --check - name: Clippy run: cargo clippy --all-targets --all-features -- -D warnings diff --git a/Cargo.toml b/Cargo.toml index cf9b225..05264fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "kafkas" version = "0.1.0" edition = "2021" authors = ["iamazy "] -keywords = ["kafka", "message queue", "async", "tokio", "async-std"] +keywords = ["kafka", "async", "tokio", "async-std"] license-file = "LICENSE" readme = "README.md" repository = "https://github.com/iamazy/kafkas" @@ -12,10 +12,10 @@ description = "async kafka client for rust" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -asynchronous-codec = { version = "0.6", optional = true } -async-io = { version = "1", optional = true} -async-native-tls = { version = "0.4", optional = true } -async-recursion = "1.0.0" +asynchronous-codec = { version = "0.7", optional = true } +async-io = { version = "2", optional = true } +async-native-tls = { version = "0.5", optional = true } +async-recursion = "1" async-std = { version = "1", features = ["attributes", "unstable"], optional = true } async-stream = "0.3" bit-vec = "0.6" @@ -25,19 +25,19 @@ dashmap = "5" fnv = "1" futures = "0.3" fxhash = "0.2" -indexmap = "1" -kafka-protocol = { git = "https://github.com/iamazy/kafka-protocol-rs", rev = "d8a289bbdebd71f89d52838810303902a7368773"} +indexmap = "2" +kafka-protocol = "0.10" native-tls = "0.2" pin-project-lite = "0.2" rand = "0.8" -regex = "1.1.7" +regex = "1" thiserror = "1" tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.7", features = ["codec"], optional = true } tokio-native-tls = { version = "0.3", optional = true } tracing = "0.1" -url = "2.1" -uuid = "1.3" +url = "2" +uuid = "1" [dev-dependencies] rand = "0.8" diff --git a/rust-toolchain b/rust-toolchain index 07ade69..870bbe4 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly \ No newline at end of file +stable \ No newline at end of file diff --git a/rustfmt.toml b/rustfmt.toml index 532b89b..148229d 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,4 @@ +# cargo +nightly fmt comment_width = 100 edition = "2021" format_code_in_doc_comments = true diff --git a/src/consumer/fetch_session.rs b/src/consumer/fetch_session.rs index 40cbe11..12646b8 100644 --- a/src/consumer/fetch_session.rs +++ b/src/consumer/fetch_session.rs @@ -423,7 +423,7 @@ impl FetchRequestDataBuilder { let mut session_remove = Vec::new(); for (tp, prev_data) in session.session_partitions.iter_mut() { - match self.next.remove(tp) { + match self.next.swap_remove(tp) { Some(next_data) => { // We basically check if the new partition had the same topic ID. If not, // we add it to the "replaced" set. If the request is version 13 or higher, the @@ -470,7 +470,7 @@ impl FetchRequestDataBuilder { } for tp in session_remove.iter() { - session.session_partitions.remove(tp); + session.session_partitions.swap_remove(tp); } // Add any new partitions to the session. @@ -768,10 +768,10 @@ mod tests { add_topic_id( &mut topic_ids, &mut topic_names, - StrBytes::from_str("foo"), + StrBytes::from_static_str("foo"), version, ); - let foo_id = match topic_ids.get("foo") { + let foo_id = match topic_ids.get(&StrBytes::from_static_str("foo")) { Some(id) => *id, None => Uuid::nil(), }; @@ -803,8 +803,8 @@ mod tests { assert_maps_equals( &req_map(vec![ - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200), - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 110, 210), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 110, 210), ]), vec![&data.to_send, &data.session_partitions], ); @@ -813,8 +813,8 @@ mod tests { assert_eq!(INITIAL_EPOCH, data.metadata.epoch); let resp_map = resp_map(vec![ - RespEntry::new(StrBytes::from_str("foo"), 0, foo_id, 0, 0), - RespEntry::new(StrBytes::from_str("foo"), 1, foo_id, 0, 0), + RespEntry::new(StrBytes::from_static_str("foo"), 0, foo_id, 0, 0), + RespEntry::new(StrBytes::from_static_str("foo"), 1, foo_id, 0, 0), ]); let resp = to_fetch_response(0, 0, INVALID_SESSION_ID, resp_map); @@ -840,7 +840,7 @@ mod tests { assert_maps_equals( &req_map(vec![ReqEntry::new( - StrBytes::from_str("foo"), + StrBytes::from_static_str("foo"), foo_id, 0, 0, @@ -869,10 +869,10 @@ mod tests { add_topic_id( &mut topic_ids, &mut topic_names, - StrBytes::from_str("foo"), + StrBytes::from_static_str("foo"), version, ); - let foo_id = match topic_ids.get("foo") { + let foo_id = match topic_ids.get(&StrBytes::from_static_str("foo")) { Some(id) => *id, None => Uuid::nil(), }; @@ -908,8 +908,8 @@ mod tests { assert_maps_equals( &req_map(vec![ - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200), - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 110, 210), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 110, 210), ]), vec![&data.to_send, &data.session_partitions], ); @@ -922,8 +922,8 @@ mod tests { 0, 123, resp_map(vec![ - RespEntry::new(StrBytes::from_str("foo"), 0, foo_id, 10, 20), - RespEntry::new(StrBytes::from_str("foo"), 1, foo_id, 10, 20), + RespEntry::new(StrBytes::from_static_str("foo"), 0, foo_id, 10, 20), + RespEntry::new(StrBytes::from_static_str("foo"), 1, foo_id, 10, 20), ]), ); @@ -934,10 +934,10 @@ mod tests { add_topic_id( &mut topic_ids, &mut topic_names, - StrBytes::from_str("bar"), + StrBytes::from_static_str("bar"), version, ); - let bar_id = match topic_ids.get("bar") { + let bar_id = match topic_ids.get(&StrBytes::from_static_str("bar")) { Some(id) => *id, None => Uuid::nil(), }; @@ -985,16 +985,16 @@ mod tests { assert_map_equals( &req_map(vec![ - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200), - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 120, 210), - ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 200, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 120, 210), + ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 200, 200), ]), &data2.session_partitions, ); assert_map_equals( &req_map(vec![ - ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 200, 200), - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 120, 210), + ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 200, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 120, 210), ]), &data2.to_send, ); @@ -1004,7 +1004,7 @@ mod tests { 0, 123, resp_map(vec![RespEntry::new( - StrBytes::from_str("foo"), + StrBytes::from_static_str("foo"), 1, foo_id, 20, @@ -1066,9 +1066,9 @@ mod tests { assert_maps_equals( &req_map(vec![ - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200), - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 120, 210), - ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 200, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 120, 210), + ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 200, 200), ]), vec![&data4.session_partitions, &data4.to_send], ); @@ -1091,21 +1091,21 @@ mod tests { add_topic_id( &mut topic_ids, &mut topic_names, - StrBytes::from_str("foo"), + StrBytes::from_static_str("foo"), version, ); add_topic_id( &mut topic_ids, &mut topic_names, - StrBytes::from_str("bar"), + StrBytes::from_static_str("bar"), version, ); - let foo_id = match topic_ids.get("foo") { + let foo_id = match topic_ids.get(&StrBytes::from_static_str("foo")) { Some(id) => *id, None => Uuid::nil(), }; - let bar_id = match topic_ids.get("bar") { + let bar_id = match topic_ids.get(&StrBytes::from_static_str("bar")) { Some(id) => *id, None => Uuid::nil(), }; @@ -1150,9 +1150,9 @@ mod tests { let data = fetch_session_builder.build(&mut fetch_session); assert_maps_equals( &req_map(vec![ - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200), - ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 110, 210), - ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 120, 220), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200), + ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 110, 210), + ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 120, 220), ]), vec![&data.to_send, &data.session_partitions], ); @@ -1163,9 +1163,9 @@ mod tests { 0, 123, resp_map(vec![ - RespEntry::new(StrBytes::from_str("foo"), 0, foo_id, 10, 20), - RespEntry::new(StrBytes::from_str("foo"), 1, foo_id, 10, 20), - RespEntry::new(StrBytes::from_str("bar"), 0, bar_id, 10, 20), + RespEntry::new(StrBytes::from_static_str("foo"), 0, foo_id, 10, 20), + RespEntry::new(StrBytes::from_static_str("foo"), 1, foo_id, 10, 20), + RespEntry::new(StrBytes::from_static_str("bar"), 0, bar_id, 10, 20), ]), ); @@ -1190,7 +1190,7 @@ mod tests { assert_eq!(1, data.metadata.epoch); assert_map_equals( &req_map(vec![ReqEntry::new( - StrBytes::from_str("foo"), + StrBytes::from_static_str("foo"), foo_id, 1, 10, @@ -1239,7 +1239,7 @@ mod tests { assert_eq!(INITIAL_EPOCH, data.metadata.epoch); assert_maps_equals( &req_map(vec![ReqEntry::new( - StrBytes::from_str("foo"), + StrBytes::from_static_str("foo"), foo_id, 0, 0, diff --git a/src/consumer/fetcher.rs b/src/consumer/fetcher.rs index 97161d8..c803f83 100644 --- a/src/consumer/fetcher.rs +++ b/src/consumer/fetcher.rs @@ -467,7 +467,8 @@ impl Fetcher { debug!( "Handing v0 ListOffsetResponse response for [{} - {}]. Fetched \ offset {offset}", - &topic.name.0, partition + topic.name.as_str(), + partition ); if offset != UNKNOWN_OFFSET { let tp = TopicPartition { @@ -488,7 +489,7 @@ impl Fetcher { debug!( "Handling ListOffsetResponse response for [{} - {}], Fetched \ offset {}, timestamp {}", - topic.name.0, + topic.name.as_str(), partition, partition_response.offset, partition_response.timestamp @@ -525,7 +526,8 @@ impl Fetcher { debug!( "Cannot search by timestamp for [{} - {}] because the message format \ version is before 0.10.0", - topic.name.0, partition + topic.name.as_str(), + partition ); break; } @@ -540,7 +542,9 @@ impl Fetcher { ) => { debug!( "Attempt to fetch offsets for [{} - {}] failed due to {}, retrying.", - topic.name.0, partition, error + topic.name.as_str(), + partition, + error ); let tp = TopicPartition { topic: topic.name.clone(), @@ -552,7 +556,8 @@ impl Fetcher { warn!( "Received unknown topic or partition error in ListOffset request for \ partition [{} - {}]", - topic.name.0, partition + topic.name.as_str(), + partition ); let tp = TopicPartition { topic: topic.name.clone(), @@ -567,7 +572,9 @@ impl Fetcher { warn!( "Attempt to fetch offsets for [{} - {}] failed due to unexpected \ exception: {}, retrying.", - topic.name.0, partition, error + topic.name.as_str(), + partition, + error ); let tp = TopicPartition { topic: topic.name.clone(), diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index c5e2cad..d38231d 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -340,7 +340,7 @@ impl Consumer { pub async fn unsubscribe(&mut self) -> Result<()> { self.coordinator - .maybe_leave_group(StrBytes::from_str( + .maybe_leave_group(StrBytes::from_static_str( "the consumer unsubscribed from all topics", )) .await?; diff --git a/src/consumer/partition_assignor.rs b/src/consumer/partition_assignor.rs index d197bdd..fdacbc4 100644 --- a/src/consumer/partition_assignor.rs +++ b/src/consumer/partition_assignor.rs @@ -61,7 +61,7 @@ pub trait PartitionAssigner { } else { debug!( "skipping assignment for topic {} since no metadata is available", - topic.0 + topic.as_str() ); } } diff --git a/src/consumer/subscription_state.rs b/src/consumer/subscription_state.rs index f89e09a..aa8c815 100644 --- a/src/consumer/subscription_state.rs +++ b/src/consumer/subscription_state.rs @@ -40,7 +40,7 @@ impl SubscriptionState { OffsetMetadata { committed_offset: tp_state.position.offset, committed_leader_epoch: tp_state.position.current_leader.epoch, - metadata: Some(StrBytes::from_str("")), + metadata: Some(StrBytes::default()), }, ); } @@ -150,25 +150,30 @@ impl SubscriptionState { if !matches!(partition_state.fetch_state, FetchState::AwaitReset) { debug!( "Skipping reset of [{} - {}] since it is no longer needed", - partition.topic.0, partition.partition + partition.topic.as_str(), + partition.partition ); } else if partition_state.offset_strategy != offset_strategy { debug!( "Skipping reset of topic [{} - {}] since an alternative reset has been \ requested", - partition.topic.0, partition.partition + partition.topic.as_str(), + partition.partition ); } else { info!( "Resetting offset for topic [{} - {}] to position {}.", - partition.topic.0, partition.partition, position.offset + partition.topic.as_str(), + partition.partition, + position.offset ); partition_state.seek_unvalidated(position)?; } } else { debug!( "Skipping reset of [{} - {}] since it is no longer assigned", - partition.topic.0, partition.partition + partition.topic.as_str(), + partition.partition ); } diff --git a/src/coordinator/consumer.rs b/src/coordinator/consumer.rs index 3a69fda..7c69efa 100644 --- a/src/coordinator/consumer.rs +++ b/src/coordinator/consumer.rs @@ -331,7 +331,7 @@ impl CoordinatorInner { let node = find_coordinator(&client, group_id.clone(), CoordinatorType::Group).await?; info!( "Find coordinator success, group {}, node: {}", - group_id, + group_id.as_str(), node.address() ); @@ -399,7 +399,7 @@ impl CoordinatorInner { .await?; for group in describe_groups_response.groups { if group.error_code.is_err() { - error!("Describe group [{}] failed", group.group_id.0); + error!("Describe group [{}] failed", group.group_id.as_str()); } } Ok(()) @@ -424,7 +424,8 @@ impl CoordinatorInner { warn!( "Join group with unknown member id, will rejoin group [{}] with \ member id: {}", - self.group_meta.group_id.0, self.group_meta.member_id + self.group_meta.group_id.as_str(), + self.group_meta.member_id.as_str() ); self.join_group().await } @@ -443,9 +444,9 @@ impl CoordinatorInner { info!( "Join group [{}] success, leader = {}, member_id = {}, generation_id \ = {}", - self.group_meta.group_id.0, - self.group_meta.leader, - self.group_meta.member_id, + self.group_meta.group_id.as_str(), + self.group_meta.leader.as_str(), + self.group_meta.member_id.as_str(), self.group_meta.generation_id ); Ok(()) @@ -460,9 +461,10 @@ impl CoordinatorInner { match self.client.version_range(ApiKey::LeaveGroupKey) { Some(version_range) => { debug!( - "Member {} send LeaveGroup request to coordinator {} due to {reason}", - self.group_meta.member_id, + "Member {} send LeaveGroup request to coordinator {} due to {}", + self.group_meta.member_id.as_str(), self.node.address(), + reason.as_str() ); let leave_group_request = self.leave_group_builder(version_range.max, reason)?; @@ -478,25 +480,28 @@ impl CoordinatorInner { if member.error_code.is_ok() { debug!( "Member {} leave group {} success.", - member.member_id, self.group_meta.group_id.0 + member.member_id.as_str(), + self.group_meta.group_id.as_str() ); } else { error!( "Member {} leave group {} failed.", - member.member_id, self.group_meta.group_id.0 + member.member_id.as_str(), + self.group_meta.group_id.as_str() ); } } info!( "Leave group [{}] success, member: {}", - self.group_meta.group_id.0, self.group_meta.member_id + self.group_meta.group_id.as_str(), + self.group_meta.member_id.as_str() ); Ok(()) } Some(error) => { error!( "Leave group [{}] failed, error: {error}", - self.group_meta.group_id.0 + self.group_meta.group_id.as_str() ); Err(error.into()) } @@ -528,8 +533,8 @@ impl CoordinatorInner { error!( "JoinGroup failed: Inconsistent Protocol Type, received {} but \ expected {}", - sync_group_response.protocol_type.unwrap(), - self.group_meta.protocol_type.as_ref().unwrap() + sync_group_response.protocol_type.unwrap().as_str(), + self.group_meta.protocol_type.as_ref().unwrap().as_str() ); return Err(ResponseError::InconsistentGroupProtocol.into()); } @@ -560,12 +565,12 @@ impl CoordinatorInner { info!( "Sync group [{}] success, leader = {}, member_id = {}, generation_id \ = {}, protocol_type = {}, protocol_name = {}, assignments = <{}>", - self.group_meta.group_id.0, - self.group_meta.leader, - self.group_meta.member_id, + self.group_meta.group_id.as_str(), + self.group_meta.leader.as_str(), + self.group_meta.member_id.as_str(), self.group_meta.generation_id, - self.group_meta.protocol_type.as_ref().unwrap(), - self.group_meta.protocol_name.as_ref().unwrap(), + self.group_meta.protocol_type.as_ref().unwrap().as_str(), + self.group_meta.protocol_name.as_ref().unwrap().as_str(), crate::array_display(self.subscriptions.assignments.keys()), ); Ok(()) @@ -643,7 +648,8 @@ impl CoordinatorInner { None => { debug!( "Heartbeat success, group: {}, member: {}", - self.group_meta.group_id.0, self.group_meta.member_id + self.group_meta.group_id.as_str(), + self.group_meta.member_id.as_str() ); Ok(()) } @@ -663,7 +669,7 @@ impl CoordinatorInner { // this case and ignore the REBALANCE_IN_PROGRESS error warn!( "Group [{}] is rebalance in progress.", - self.group_meta.group_id.0 + self.group_meta.group_id.as_str() ); if matches!(self.state, MemberState::Stable) { self.rejoin_group().await?; @@ -780,7 +786,7 @@ impl CoordinatorInner { let mut request = JoinGroupRequest::default(); request.group_id = self.group_meta.group_id.clone(); request.member_id = self.group_meta.member_id.clone(); - request.protocol_type = StrBytes::from_str(CONSUMER_PROTOCOL_TYPE); + request.protocol_type = StrBytes::from_static_str(CONSUMER_PROTOCOL_TYPE); request.protocols = self.join_group_protocol()?; request.session_timeout_ms = self.consumer_options.rebalance_options.session_timeout_ms; if version >= 1 { @@ -830,7 +836,7 @@ impl CoordinatorInner { None => { return Err(Error::Custom(format!( "Group leader {} has no partition assignor protocol", - self.group_meta.leader + self.group_meta.leader.as_str() ))); } } @@ -945,11 +951,11 @@ impl CoordinatorInner { } } else { generation = DEFAULT_GENERATION_ID; - member = StrBytes::from_str(""); + member = StrBytes::default(); } request.group_id = self.group_meta.group_id.clone(); - request.generation_id = generation; + request.generation_id_or_member_epoch = generation; request.member_id = member; request.group_instance_id = self.group_meta.group_instance_id.clone(); request.retention_time_ms = -1; diff --git a/src/error.rs b/src/error.rs index 8bad68b..c4f6df7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,7 +12,7 @@ use futures::channel::{ }; use kafka_protocol::{ messages::{ApiKey, TopicName}, - protocol::{buf::NotEnoughBytesError, DecodeError, EncodeError}, + protocol::{buf::NotEnoughBytesError, EncodeError}, records::Record, ResponseError, }; @@ -108,12 +108,6 @@ impl From for Error { } } -impl From for Error { - fn from(value: DecodeError) -> Self { - Error::Connection(ConnectionError::Decoding(value.to_string())) - } -} - impl From> for Error { fn from(value: TrySendError) -> Self { Error::Custom(format!("{value}")) @@ -141,16 +135,20 @@ impl std::fmt::Display for Error { Error::Produce(e) => write!(f, "Produce error: {e}"), Error::Consume(e) => write!(f, "Consume error: {e}"), Error::PartitionNotAvailable { topic, partition } => { - write!(f, "Partition {partition} not available, topic: {}", topic.0) + write!( + f, + "Partition {partition} not available, topic: {}", + topic.as_str() + ) } Error::TopicNotAvailable { topic } => { - write!(f, "Topic not available, topic: {}", topic.0) + write!(f, "Topic not available, topic: {}", topic.as_str()) } Error::TopicAuthorizationError { topics } => { write!( f, "Topic Authorization Error, topics: <{}>", - array_display(topics.iter().map(|topic| &**topic)) + array_display(topics.iter().map(|topic| topic.as_str())) ) } Error::NodeNotAvailable { node } => { @@ -188,12 +186,6 @@ impl From for ConnectionError { } } -impl From for ConnectionError { - fn from(value: DecodeError) -> Self { - ConnectionError::Decoding(value.to_string()) - } -} - impl From for ConnectionError { fn from(e: std::io::Error) -> Self { ConnectionError::Io(e) diff --git a/src/lib.rs b/src/lib.rs index 4bfb662..8ac0e0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,7 @@ pub trait ToStrBytes { impl ToStrBytes for String { fn to_str_bytes(self) -> StrBytes { - unsafe { StrBytes::from_utf8_unchecked(Bytes::from(self)) } + StrBytes::from_string(self) } } diff --git a/src/metadata.rs b/src/metadata.rs index 9c9137f..80fd713 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -132,7 +132,12 @@ impl Debug for TopicPartition { impl Display for TopicPartition { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "partition [{} - {}]", self.topic.0, self.partition) + write!( + f, + "partition [{} - {}]", + self.topic.as_str(), + self.partition + ) } } @@ -146,7 +151,7 @@ impl Node { pub fn new(id: BrokerId, host: StrBytes, port: i32) -> Self { Self { id: id.0, - address: format!("{host}:{port}"), + address: format!("{}:{port}", host.as_str()), } } @@ -180,7 +185,7 @@ impl Cluster { let cluster_id = other.cluster_id; { let mut lock = self.id.lock()?; - if matches!(*lock, None) { + if lock.is_none() { *lock = cluster_id; } else if *lock != cluster_id { return Err(Error::Custom(format!( diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 425f0f5..b292e13 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -272,7 +272,10 @@ impl Producer { } } Err(err) => { - error!("failed to push record, topic: {}, err: {err}", topic.0,); + error!( + "failed to push record, topic: {}, err: {err}", + topic.as_str(), + ); Err(err) } }; @@ -348,7 +351,9 @@ impl Producer { { error!( "failed to flush topic produce data, topic: [{} - {}], error: {}", - partition.topic.0, partition.partition, e + partition.topic.as_str(), + partition.partition, + e ); } } diff --git a/src/protocol.rs b/src/protocol.rs index 637ab06..9971d08 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -56,10 +56,10 @@ impl tokio_util::codec::Decoder for KafkaCodec { #[cfg(feature = "async-std-runtime")] impl asynchronous_codec::Encoder for KafkaCodec { - type Item = Command; + type Item<'a> = Command; type Error = ConnectionError; - fn encode(&mut self, cmd: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, cmd: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> { let mut bytes = BytesMut::new(); self.encode0(cmd, &mut bytes)?; self.length_codec @@ -184,8 +184,7 @@ impl KafkaCodec { header.request_api_key = api_key; header.request_api_version = api_version; - let header_version = Req::header_version(api_version); - header.encode(dst, header_version)?; + header.encode(dst, Req::header_version(api_version))?; self.active_requests.insert(header.correlation_id, header); @@ -213,16 +212,14 @@ impl KafkaCodec { ConnectionError::UnexpectedResponse(format!("correlation_id: {correlation_id}")) })?; - let response_header_version = self.response_header_version( - request_header.request_api_key, - request_header.request_api_version, - ); + let api_key = ApiKey::try_from(request_header.request_api_key)?; + let response_header_version = + api_key.response_header_version(request_header.request_api_version); // decode response header let response_header = ResponseHeader::decode(src, response_header_version)?; let header_version = request_header.request_api_version; - let api_key = ApiKey::try_from(request_header.request_api_key)?; let response_kind = match api_key { ApiKey::ProduceKey => { let res = ProduceResponse::decode(src, header_version)?; @@ -505,6 +502,30 @@ impl KafkaCodec { let res = AllocateProducerIdsResponse::decode(src, header_version)?; ResponseKind::AllocateProducerIdsResponse(res) } + ApiKey::ConsumerGroupHeartbeatKey => { + let res = ConsumerGroupHeartbeatResponse::decode(src, header_version)?; + ResponseKind::ConsumerGroupHeartbeatResponse(res) + } + ApiKey::ControllerRegistrationKey => { + let res = ControllerRegistrationResponse::decode(src, header_version)?; + ResponseKind::ControllerRegistrationResponse(res) + } + ApiKey::GetTelemetrySubscriptionsKey => { + let res = GetTelemetrySubscriptionsResponse::decode(src, header_version)?; + ResponseKind::GetTelemetrySubscriptionsResponse(res) + } + ApiKey::PushTelemetryKey => { + let res = PushTelemetryResponse::decode(src, header_version)?; + ResponseKind::PushTelemetryResponse(res) + } + ApiKey::AssignReplicasToDirsKey => { + let res = AssignReplicasToDirsResponse::decode(src, header_version)?; + ResponseKind::AssignReplicasToDirsResponse(res) + } + ApiKey::ListClientMetricsResourcesKey => { + let res = ListClientMetricsResourcesResponse::decode(src, header_version)?; + ResponseKind::ListClientMetricsResourcesResponse(res) + } }; let response = KafkaResponse::new(response_header, response_kind); Ok(Some(Command::Response(response)))