From b22b948b24004e78089c0069b1b710814bef9637 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Wed, 1 Jan 2025 20:55:50 +0800 Subject: [PATCH] rest_api support fn query_row_batch --- Cargo.toml | 1 - core/src/response.rs | 4 +-- driver/src/rest_api.rs | 67 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4c4b721e..e3731a2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ members = [ "cli", "bindings/python", "bindings/nodejs", - "bindings/java", ] resolver = "2" diff --git a/core/src/response.rs b/core/src/response.rs index 048964f5..e7268ea8 100644 --- a/core/src/response.rs +++ b/core/src/response.rs @@ -14,7 +14,7 @@ use crate::error_code::ErrorCode; use crate::session::SessionState; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; #[derive(Deserialize, Debug)] pub struct QueryStats { @@ -53,7 +53,7 @@ pub struct ProgressValues { pub bytes: usize, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct SchemaField { pub name: String, #[serde(rename = "type")] diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 39ef627c..cbfb7198 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -27,9 +27,9 @@ use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_stream::Stream; -use databend_client::APIClient; use databend_client::PresignedResponse; use databend_client::QueryResponse; +use databend_client::{APIClient, SchemaField}; use databend_driver_core::error::{Error, Result}; use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats}; use databend_driver_core::schema::{Schema, SchemaRef}; @@ -82,7 +82,7 @@ impl Connection for RestAPIConnection { async fn query_iter_ext(&self, sql: &str) -> Result { info!("query iter ext: {}", sql); let resp = self.client.start_query(sql).await?; - let resp = self.wait_for_schema(resp).await?; + let resp = self.wait_for_schema(resp, true).await?; let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?; Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows))) } @@ -209,8 +209,14 @@ impl<'o> RestAPIConnection { }) } - async fn wait_for_schema(&self, resp: QueryResponse) -> Result { - if !resp.data.is_empty() || !resp.schema.is_empty() || resp.stats.progresses.has_progress() + async fn wait_for_schema( + &self, + resp: QueryResponse, + return_on_progress: bool, + ) -> Result { + if !resp.data.is_empty() + || !resp.schema.is_empty() + || (return_on_progress && resp.stats.progresses.has_progress()) { return Ok(resp); } @@ -228,7 +234,7 @@ impl<'o> RestAPIConnection { if !result.data.is_empty() || !result.schema.is_empty() - || result.stats.progresses.has_progress() + || (return_on_progress && result.stats.progresses.has_progress()) { break; } @@ -250,6 +256,12 @@ impl<'o> RestAPIConnection { fn default_copy_options() -> BTreeMap<&'o str, &'o str> { vec![("purge", "true")].into_iter().collect() } + + pub async fn query_row_batch(&self, sql: &str) -> Result { + let resp = self.client.start_query(sql).await?; + let resp = self.wait_for_schema(resp, false).await?; + Ok(RowBatch::from_response(self.client.clone(), resp)?) + } } type PageFut = Pin> + Send>>; @@ -341,3 +353,48 @@ impl Stream for RestAPIRows { } } } + +pub struct RowBatch { + schema: Vec, + client: Arc, + query_id: String, + node_id: Option, + + next_uri: Option, + data: Vec>>, +} + +impl RowBatch { + pub fn schema(&self) -> Vec { + self.schema.clone() + } + + fn from_response(client: Arc, mut resp: QueryResponse) -> Result { + Ok(Self { + schema: std::mem::take(&mut resp.schema), + client, + query_id: resp.id, + node_id: resp.node_id, + next_uri: resp.next_uri, + data: resp.data, + }) + } + + pub async fn fetch_next_page(&mut self) -> Result>>> { + if !self.data.is_empty() { + return Ok(std::mem::take(&mut self.data)); + } + while let Some(next_uri) = &self.next_uri { + let resp = self + .client + .query_page(&self.query_id, &next_uri, &self.node_id) + .await?; + + self.next_uri = resp.next_uri; + if !resp.data.is_empty() { + return Ok(resp.data); + } + } + Ok(vec![]) + } +}