Skip to content

Commit

Permalink
iterator: return NextRowError from next() methods
Browse files Browse the repository at this point in the history
  • Loading branch information
muzarski committed Jan 23, 2025
1 parent a140f7f commit 04c7eec
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
14 changes: 7 additions & 7 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::cluster::{ClusterState, NodeRef};
#[allow(deprecated)]
use crate::cql_to_rust::{FromRow, FromRowError};
use crate::deserialize::DeserializeOwnedRow;
use crate::errors::{QueryError, RequestAttemptError, RequestError};
use crate::errors::{RequestAttemptError, RequestError};
use crate::frame::response::result;
use crate::network::Connection;
use crate::observability::driver_tracing::RequestSpan;
Expand Down Expand Up @@ -584,11 +584,11 @@ impl QueryPager {
/// borrows from self.
///
/// This is cancel-safe.
async fn next(&mut self) -> Option<Result<ColumnIterator, QueryError>> {
async fn next(&mut self) -> Option<Result<ColumnIterator, NextRowError>> {
let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await;
match res {
Some(Ok(())) => {}
Some(Err(err)) => return Some(Err(err.into())),
Some(Err(err)) => return Some(Err(err)),
None => return None,
}

Expand All @@ -597,7 +597,7 @@ impl QueryPager {
self.current_page
.next()
.unwrap()
.map_err(|err| NextRowError::RowDeserializationError(err).into()),
.map_err(NextRowError::RowDeserializationError),
)
}

Expand Down Expand Up @@ -1035,14 +1035,14 @@ impl<RowT> Stream for TypedRowStream<RowT>
where
RowT: DeserializeOwnedRow,
{
type Item = Result<RowT, QueryError>;
type Item = Result<RowT, NextRowError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next_fut = async {
self.raw_row_lending_stream.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| NextRowError::RowDeserializationError(err).into())
.map_err(NextRowError::RowDeserializationError)
})
})
};
Expand Down Expand Up @@ -1177,7 +1177,7 @@ mod legacy {
pub enum LegacyNextRowError {
/// Query to fetch next page has failed
#[error(transparent)]
QueryError(#[from] QueryError),
NextRowError(#[from] NextRowError),

/// Parsing values in row as given types failed
#[error(transparent)]
Expand Down
7 changes: 6 additions & 1 deletion scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));

Expand All @@ -864,6 +865,7 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Local, row_result)));

Expand Down Expand Up @@ -1007,7 +1009,9 @@ where
pager.rows_stream::<R>().map_err(convert_typecheck_error)?;
Ok::<_, QueryError>(stream)
};
fut.into_stream().try_flatten()
fut.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
}

async fn query_keyspaces(
Expand Down Expand Up @@ -1861,6 +1865,7 @@ async fn query_table_partitioners(
Ok::<_, QueryError>(stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten();

let result = rows
Expand Down

0 comments on commit 04c7eec

Please sign in to comment.