diff --git a/libsql/src/hrana/stream.rs b/libsql/src/hrana/stream.rs index b27e48145e..536764a13c 100644 --- a/libsql/src/hrana/stream.rs +++ b/libsql/src/hrana/stream.rs @@ -81,7 +81,12 @@ where tracing::trace!("baton not found - skipping finalize for {:?}", req); return Ok(false); } - let (resp, is_autocommit) = client.finalize(req).await?; + let (resp, is_autocommit) = client.finalize(req).await.map_err(|e| match e { + HranaError::Api(e) if e == "Received an invalid baton" && !self.is_autocommit() => { + HranaError::Api("Received an invalid baton: Transaction rolled back".into()) + } + e => e, + })?; self.inner .is_autocommit .store(is_autocommit, Ordering::SeqCst); @@ -94,7 +99,9 @@ where (0, 0) }; - self.inner.total_changes.fetch_add(affected_row_count, Ordering::SeqCst); + self.inner + .total_changes + .fetch_add(affected_row_count, Ordering::SeqCst); self.inner .affected_row_count .store(affected_row_count, Ordering::SeqCst); @@ -132,7 +139,15 @@ where StreamRequest::GetAutocommit(GetAutocommitStreamReq {}), StreamRequest::Close(CloseStreamReq {}), ]) - .await?; + .await + .map_err(|e| match e { + HranaError::Api(e) + if e == "Received an invalid baton" && !self.is_autocommit() => + { + HranaError::Api("Received an invalid baton: Transaction rolled back".into()) + } + e => e, + })?; client.reset(); (resp, get_autocommit) } else { @@ -142,7 +157,15 @@ where StreamRequest::Batch(BatchStreamReq { batch }), StreamRequest::GetAutocommit(GetAutocommitStreamReq {}), ]) - .await?; + .await + .map_err(|e| match e { + HranaError::Api(e) + if e == "Received an invalid baton" && !self.is_autocommit() => + { + HranaError::Api("Received an invalid baton: Transaction rolled back".into()) + } + e => e, + })?; (resp, get_autocommit) }; drop(client);