Replace AsyncRead impl of Object with Stream of Bytes #1205
+114
−112
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This replaces the
AsyncRead
implementation ofObject
with aStream<Item = io::Result<Bytes>>
.The reason for doing this breaking change is that because the
AsyncRead
impl is a wrapper of a Consumer (which in turn uses a Subscriber internally) the data already came in the form of aBytes
Stream
, so it was the wrong abstraction to force the user to go through the naiveAsyncRead
implementation, at times probably needing to reconvert that back to aStream
because of many async crates preferring them to read/write style APIs.This removes 2
memcpy
s from the stack (copying into theVecDeque
and copying out of theVecDeque
).Users who need the
AsyncRead
implementation can usetokio_util::io::StreamReader
, which has an even more efficient implementation than the nats.rs one by doing only onememcpy
.As a side note, this also replaces what I thought was a messy implementation (and in some places unexpected, like throwing an error if a second read had been launched after exhausting the stream, or the use of
unwrap
in creating the Consumer) with a more structured and idiomatic one.