Skip to content

Commit

Permalink
feat: rest_api support fn query_row_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Jan 4, 2025
1 parent 987298f commit 925668d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.vscode
.idea
target/
**/target
Cargo.lock
venv/

Expand All @@ -9,3 +9,4 @@ venv/

/dist

**/.DS_Store
4 changes: 2 additions & 2 deletions core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::error_code::ErrorCode;
use crate::session::SessionState;
use serde::Deserialize;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Debug)]
pub struct QueryStats {
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct ProgressValues {
pub bytes: usize,
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SchemaField {
pub name: String,
#[serde(rename = "type")]
Expand Down
69 changes: 63 additions & 6 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_stream::Stream;

use databend_client::APIClient;
use databend_client::PresignedResponse;
use databend_client::QueryResponse;
use databend_client::{APIClient, SchemaField};
use databend_driver_core::error::{Error, Result};
use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats};
use databend_driver_core::schema::{Schema, SchemaRef};
Expand Down Expand Up @@ -84,7 +84,7 @@ impl Connection for RestAPIConnection {
async fn query_iter_ext(&self, sql: &str) -> Result<RowStatsIterator> {
info!("query iter ext: {}", sql);
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_schema(resp).await?;
let resp = self.wait_for_schema(resp, true).await?;
let (schema, rows) = RestAPIRows::<RowWithStats>::from_response(self.client.clone(), resp)?;
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
}
Expand All @@ -93,7 +93,7 @@ impl Connection for RestAPIConnection {
async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
info!("query raw iter: {}", sql);
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_schema(resp).await?;
let resp = self.wait_for_schema(resp, true).await?;
let (schema, rows) =
RestAPIRows::<RawRowWithStats>::from_response(self.client.clone(), resp)?;
Ok(RawRowIterator::new(Arc::new(schema), Box::pin(rows)))
Expand Down Expand Up @@ -221,8 +221,14 @@ impl<'o> RestAPIConnection {
})
}

async fn wait_for_schema(&self, resp: QueryResponse) -> Result<QueryResponse> {
if !resp.data.is_empty() || !resp.schema.is_empty() || resp.stats.progresses.has_progress()
async fn wait_for_schema(
&self,
resp: QueryResponse,
return_on_progress: bool,
) -> Result<QueryResponse> {
if !resp.data.is_empty()
|| !resp.schema.is_empty()
|| (return_on_progress && resp.stats.progresses.has_progress())
{
return Ok(resp);
}
Expand All @@ -240,7 +246,7 @@ impl<'o> RestAPIConnection {

if !result.data.is_empty()
|| !result.schema.is_empty()
|| result.stats.progresses.has_progress()
|| (return_on_progress && result.stats.progresses.has_progress())
{
break;
}
Expand All @@ -262,6 +268,12 @@ impl<'o> RestAPIConnection {
fn default_copy_options() -> BTreeMap<&'o str, &'o str> {
vec![("purge", "true")].into_iter().collect()
}

pub async fn query_row_batch(&self, sql: &str) -> Result<RowBatch> {
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_schema(resp, false).await?;
RowBatch::from_response(self.client.clone(), resp)
}
}

type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
Expand Down Expand Up @@ -380,3 +392,48 @@ impl FromRowStats for RawRowWithStats {
Ok(RawRowWithStats::Row(RawRow::new(rows, row)))
}
}

pub struct RowBatch {
schema: Vec<SchemaField>,
client: Arc<APIClient>,
query_id: String,
node_id: Option<String>,

next_uri: Option<String>,
data: Vec<Vec<Option<String>>>,
}

impl RowBatch {
pub fn schema(&self) -> Vec<SchemaField> {
self.schema.clone()
}

fn from_response(client: Arc<APIClient>, mut resp: QueryResponse) -> Result<Self> {
Ok(Self {
schema: std::mem::take(&mut resp.schema),
client,
query_id: resp.id,
node_id: resp.node_id,
next_uri: resp.next_uri,
data: resp.data,
})
}

pub async fn fetch_next_page(&mut self) -> Result<Vec<Vec<Option<String>>>> {
if !self.data.is_empty() {
return Ok(std::mem::take(&mut self.data));
}
while let Some(next_uri) = &self.next_uri {
let resp = self
.client
.query_page(&self.query_id, next_uri, &self.node_id)
.await?;

self.next_uri = resp.next_uri;
if !resp.data.is_empty() {
return Ok(resp.data);
}
}
Ok(vec![])
}
}

0 comments on commit 925668d

Please sign in to comment.