Skip to content

Commit

Permalink
chore(deps): bump kafka-protocol to 0.10 (#31)
Browse files Browse the repository at this point in the history
* Chore(deps): bump `kafka-protocol` to 0.10

* Chore: make clippy happy

* Chore: make fmt happy

* chore: make fmt happy

* chore: make fmt happy

* fix: test
  • Loading branch information
iamazy authored Mar 21, 2024
1 parent 4aaf42b commit 56110d8
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 121 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
- uses: dtolnay/rust-toolchain@nightly
with:
components: rustfmt, clippy
- uses: dtolnay/rust-toolchain@stable
with:
profile: minimal
components: rustfmt, clippy
- uses: actions/cache@v3
with:
Expand All @@ -22,7 +24,7 @@ jobs:
~/.cargo/git/db/
key: ${{ runner.os }}-cargo
- name: Check code format
run: cargo fmt --all -- --check
run: cargo +nightly fmt --all -- --check
- name: Clippy
run: cargo clippy --all-targets --all-features -- -D warnings

Expand Down
20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "kafkas"
version = "0.1.0"
edition = "2021"
authors = ["iamazy <[email protected]>"]
keywords = ["kafka", "message queue", "async", "tokio", "async-std"]
keywords = ["kafka", "async", "tokio", "async-std"]
license-file = "LICENSE"
readme = "README.md"
repository = "https://github.com/iamazy/kafkas"
Expand All @@ -12,10 +12,10 @@ description = "async kafka client for rust"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
asynchronous-codec = { version = "0.6", optional = true }
async-io = { version = "1", optional = true}
async-native-tls = { version = "0.4", optional = true }
async-recursion = "1.0.0"
asynchronous-codec = { version = "0.7", optional = true }
async-io = { version = "2", optional = true }
async-native-tls = { version = "0.5", optional = true }
async-recursion = "1"
async-std = { version = "1", features = ["attributes", "unstable"], optional = true }
async-stream = "0.3"
bit-vec = "0.6"
Expand All @@ -25,19 +25,19 @@ dashmap = "5"
fnv = "1"
futures = "0.3"
fxhash = "0.2"
indexmap = "1"
kafka-protocol = { git = "https://github.com/iamazy/kafka-protocol-rs", rev = "d8a289bbdebd71f89d52838810303902a7368773"}
indexmap = "2"
kafka-protocol = "0.10"
native-tls = "0.2"
pin-project-lite = "0.2"
rand = "0.8"
regex = "1.1.7"
regex = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"], optional = true }
tokio-util = { version = "0.7", features = ["codec"], optional = true }
tokio-native-tls = { version = "0.3", optional = true }
tracing = "0.1"
url = "2.1"
uuid = "1.3"
url = "2"
uuid = "1"

[dev-dependencies]
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly
stable
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# cargo +nightly fmt
comment_width = 100
edition = "2021"
format_code_in_doc_comments = true
Expand Down
76 changes: 38 additions & 38 deletions src/consumer/fetch_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl FetchRequestDataBuilder {

let mut session_remove = Vec::new();
for (tp, prev_data) in session.session_partitions.iter_mut() {
match self.next.remove(tp) {
match self.next.swap_remove(tp) {
Some(next_data) => {
// We basically check if the new partition had the same topic ID. If not,
// we add it to the "replaced" set. If the request is version 13 or higher, the
Expand Down Expand Up @@ -470,7 +470,7 @@ impl FetchRequestDataBuilder {
}

for tp in session_remove.iter() {
session.session_partitions.remove(tp);
session.session_partitions.swap_remove(tp);
}

// Add any new partitions to the session.
Expand Down Expand Up @@ -768,10 +768,10 @@ mod tests {
add_topic_id(
&mut topic_ids,
&mut topic_names,
StrBytes::from_str("foo"),
StrBytes::from_static_str("foo"),
version,
);
let foo_id = match topic_ids.get("foo") {
let foo_id = match topic_ids.get(&StrBytes::from_static_str("foo")) {
Some(id) => *id,
None => Uuid::nil(),
};
Expand Down Expand Up @@ -803,8 +803,8 @@ mod tests {

assert_maps_equals(
&req_map(vec![
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 110, 210),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 110, 210),
]),
vec![&data.to_send, &data.session_partitions],
);
Expand All @@ -813,8 +813,8 @@ mod tests {
assert_eq!(INITIAL_EPOCH, data.metadata.epoch);

let resp_map = resp_map(vec![
RespEntry::new(StrBytes::from_str("foo"), 0, foo_id, 0, 0),
RespEntry::new(StrBytes::from_str("foo"), 1, foo_id, 0, 0),
RespEntry::new(StrBytes::from_static_str("foo"), 0, foo_id, 0, 0),
RespEntry::new(StrBytes::from_static_str("foo"), 1, foo_id, 0, 0),
]);
let resp = to_fetch_response(0, 0, INVALID_SESSION_ID, resp_map);

Expand All @@ -840,7 +840,7 @@ mod tests {

assert_maps_equals(
&req_map(vec![ReqEntry::new(
StrBytes::from_str("foo"),
StrBytes::from_static_str("foo"),
foo_id,
0,
0,
Expand Down Expand Up @@ -869,10 +869,10 @@ mod tests {
add_topic_id(
&mut topic_ids,
&mut topic_names,
StrBytes::from_str("foo"),
StrBytes::from_static_str("foo"),
version,
);
let foo_id = match topic_ids.get("foo") {
let foo_id = match topic_ids.get(&StrBytes::from_static_str("foo")) {
Some(id) => *id,
None => Uuid::nil(),
};
Expand Down Expand Up @@ -908,8 +908,8 @@ mod tests {

assert_maps_equals(
&req_map(vec![
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 110, 210),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 110, 210),
]),
vec![&data.to_send, &data.session_partitions],
);
Expand All @@ -922,8 +922,8 @@ mod tests {
0,
123,
resp_map(vec![
RespEntry::new(StrBytes::from_str("foo"), 0, foo_id, 10, 20),
RespEntry::new(StrBytes::from_str("foo"), 1, foo_id, 10, 20),
RespEntry::new(StrBytes::from_static_str("foo"), 0, foo_id, 10, 20),
RespEntry::new(StrBytes::from_static_str("foo"), 1, foo_id, 10, 20),
]),
);

Expand All @@ -934,10 +934,10 @@ mod tests {
add_topic_id(
&mut topic_ids,
&mut topic_names,
StrBytes::from_str("bar"),
StrBytes::from_static_str("bar"),
version,
);
let bar_id = match topic_ids.get("bar") {
let bar_id = match topic_ids.get(&StrBytes::from_static_str("bar")) {
Some(id) => *id,
None => Uuid::nil(),
};
Expand Down Expand Up @@ -985,16 +985,16 @@ mod tests {

assert_map_equals(
&req_map(vec![
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 120, 210),
ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 200, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 120, 210),
ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 200, 200),
]),
&data2.session_partitions,
);
assert_map_equals(
&req_map(vec![
ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 200, 200),
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 120, 210),
ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 200, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 120, 210),
]),
&data2.to_send,
);
Expand All @@ -1004,7 +1004,7 @@ mod tests {
0,
123,
resp_map(vec![RespEntry::new(
StrBytes::from_str("foo"),
StrBytes::from_static_str("foo"),
1,
foo_id,
20,
Expand Down Expand Up @@ -1066,9 +1066,9 @@ mod tests {

assert_maps_equals(
&req_map(vec![
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 120, 210),
ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 200, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 120, 210),
ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 200, 200),
]),
vec![&data4.session_partitions, &data4.to_send],
);
Expand All @@ -1091,21 +1091,21 @@ mod tests {
add_topic_id(
&mut topic_ids,
&mut topic_names,
StrBytes::from_str("foo"),
StrBytes::from_static_str("foo"),
version,
);
add_topic_id(
&mut topic_ids,
&mut topic_names,
StrBytes::from_str("bar"),
StrBytes::from_static_str("bar"),
version,
);

let foo_id = match topic_ids.get("foo") {
let foo_id = match topic_ids.get(&StrBytes::from_static_str("foo")) {
Some(id) => *id,
None => Uuid::nil(),
};
let bar_id = match topic_ids.get("bar") {
let bar_id = match topic_ids.get(&StrBytes::from_static_str("bar")) {
Some(id) => *id,
None => Uuid::nil(),
};
Expand Down Expand Up @@ -1150,9 +1150,9 @@ mod tests {
let data = fetch_session_builder.build(&mut fetch_session);
assert_maps_equals(
&req_map(vec![
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_str("foo"), foo_id, 1, 10, 110, 210),
ReqEntry::new(StrBytes::from_str("bar"), bar_id, 0, 20, 120, 220),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 0, 0, 100, 200),
ReqEntry::new(StrBytes::from_static_str("foo"), foo_id, 1, 10, 110, 210),
ReqEntry::new(StrBytes::from_static_str("bar"), bar_id, 0, 20, 120, 220),
]),
vec![&data.to_send, &data.session_partitions],
);
Expand All @@ -1163,9 +1163,9 @@ mod tests {
0,
123,
resp_map(vec![
RespEntry::new(StrBytes::from_str("foo"), 0, foo_id, 10, 20),
RespEntry::new(StrBytes::from_str("foo"), 1, foo_id, 10, 20),
RespEntry::new(StrBytes::from_str("bar"), 0, bar_id, 10, 20),
RespEntry::new(StrBytes::from_static_str("foo"), 0, foo_id, 10, 20),
RespEntry::new(StrBytes::from_static_str("foo"), 1, foo_id, 10, 20),
RespEntry::new(StrBytes::from_static_str("bar"), 0, bar_id, 10, 20),
]),
);

Expand All @@ -1190,7 +1190,7 @@ mod tests {
assert_eq!(1, data.metadata.epoch);
assert_map_equals(
&req_map(vec![ReqEntry::new(
StrBytes::from_str("foo"),
StrBytes::from_static_str("foo"),
foo_id,
1,
10,
Expand Down Expand Up @@ -1239,7 +1239,7 @@ mod tests {
assert_eq!(INITIAL_EPOCH, data.metadata.epoch);
assert_maps_equals(
&req_map(vec![ReqEntry::new(
StrBytes::from_str("foo"),
StrBytes::from_static_str("foo"),
foo_id,
0,
0,
Expand Down
19 changes: 13 additions & 6 deletions src/consumer/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ impl<Exe: Executor> Fetcher<Exe> {
debug!(
"Handing v0 ListOffsetResponse response for [{} - {}]. Fetched \
offset {offset}",
&topic.name.0, partition
topic.name.as_str(),
partition
);
if offset != UNKNOWN_OFFSET {
let tp = TopicPartition {
Expand All @@ -488,7 +489,7 @@ impl<Exe: Executor> Fetcher<Exe> {
debug!(
"Handling ListOffsetResponse response for [{} - {}], Fetched \
offset {}, timestamp {}",
topic.name.0,
topic.name.as_str(),
partition,
partition_response.offset,
partition_response.timestamp
Expand Down Expand Up @@ -525,7 +526,8 @@ impl<Exe: Executor> Fetcher<Exe> {
debug!(
"Cannot search by timestamp for [{} - {}] because the message format \
version is before 0.10.0",
topic.name.0, partition
topic.name.as_str(),
partition
);
break;
}
Expand All @@ -540,7 +542,9 @@ impl<Exe: Executor> Fetcher<Exe> {
) => {
debug!(
"Attempt to fetch offsets for [{} - {}] failed due to {}, retrying.",
topic.name.0, partition, error
topic.name.as_str(),
partition,
error
);
let tp = TopicPartition {
topic: topic.name.clone(),
Expand All @@ -552,7 +556,8 @@ impl<Exe: Executor> Fetcher<Exe> {
warn!(
"Received unknown topic or partition error in ListOffset request for \
partition [{} - {}]",
topic.name.0, partition
topic.name.as_str(),
partition
);
let tp = TopicPartition {
topic: topic.name.clone(),
Expand All @@ -567,7 +572,9 @@ impl<Exe: Executor> Fetcher<Exe> {
warn!(
"Attempt to fetch offsets for [{} - {}] failed due to unexpected \
exception: {}, retrying.",
topic.name.0, partition, error
topic.name.as_str(),
partition,
error
);
let tp = TopicPartition {
topic: topic.name.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl<Exe: Executor> Consumer<Exe> {

pub async fn unsubscribe(&mut self) -> Result<()> {
self.coordinator
.maybe_leave_group(StrBytes::from_str(
.maybe_leave_group(StrBytes::from_static_str(
"the consumer unsubscribed from all topics",
))
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/partition_assignor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub trait PartitionAssigner {
} else {
debug!(
"skipping assignment for topic {} since no metadata is available",
topic.0
topic.as_str()
);
}
}
Expand Down
Loading

0 comments on commit 56110d8

Please sign in to comment.