Skip to content

Commit

Permalink
Merge pull request #3 from Schwarzam/testing-new
Browse files Browse the repository at this point in the history
redone all code with parquet -- not using polars anymore
  • Loading branch information
Schwarzam authored Jun 9, 2024
2 parents b6ae52b + 78794ad commit 47b0f8c
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 276 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ name = "lsdb_server"
path = "src/bin.rs"

[dependencies]
futures-util = "0.3.30"
arrow = "52.0.0"
parquet = { version = "52.0.0", features = ["arrow", "async"] }
axum = "0.7.5"
polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8"] }
tokio = { version = "1.37.0", features = ["full"] }
hyper = { version="1.3.1", features = ["full"] }
tower = "0.4.13"
Expand Down
3 changes: 1 addition & 2 deletions src/loaders/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod parquet;
pub mod parsers;
pub mod parquet;
81 changes: 0 additions & 81 deletions src/loaders/parquet.rs

This file was deleted.

96 changes: 96 additions & 0 deletions src/loaders/parquet/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use arrow::array::{Float64Array, Float32Array, Int16Array, Int32Array, Int64Array, Int8Array, BooleanArray};
use arrow::record_batch::RecordBatch;
use arrow::array::BooleanBuilder;
use arrow::datatypes::Schema;
use std::sync::Arc;

/// Create a boolean mask based on the filters provided.
///
/// # Arguments
///
/// * `batch` - A reference to a RecordBatch that will be filtered.
/// * `original_schema` - A reference to the original schema of the RecordBatch.
/// * `filters` - A vector of tuples containing the column name, the comparison operator and the value to compare.
///
/// # Returns
///
/// This function returns an Arrow Result with the boolean mask.
pub fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc<Schema>, filters: Vec<(&str, &str, &str)>) -> arrow::error::Result<Arc<BooleanArray>> {
let num_rows = batch.num_rows();
let mut boolean_builder = BooleanBuilder::new();

// Initialize all rows as true
for _ in 0..num_rows {
boolean_builder.append_value(true);
}
let mut boolean_mask = boolean_builder.finish();

for filter in filters.iter() {
let column = batch.column(original_schema.index_of(filter.0).unwrap());

if column.data_type() == &arrow::datatypes::DataType::Float32 {
let column = column.as_any().downcast_ref::<Float32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Float64 {
let column = column.as_any().downcast_ref::<Float64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int16 {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int32 {
let column = column.as_any().downcast_ref::<Int32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int64 {
let column = column.as_any().downcast_ref::<Int64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int8 {
let column = column.as_any().downcast_ref::<Int8Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Boolean {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else {
return Err(arrow::error::ArrowError::NotYetImplemented(format!("Data type {:?} not yet implemented", column.data_type())));
}
}
Ok(Arc::new(boolean_mask))
}

/// Apply a filter to a column and update the boolean mask.
///
/// # Arguments
///
/// * `boolean_mask` - A mutable reference to a BooleanArray that will be updated with the filter results.
/// * `column` - A reference to a PrimitiveArray that will be filtered.
/// * `filter` - A tuple containing the column name, the comparison operator and the value to compare.
///
/// # Returns
///
/// This function returns an Arrow Result.
fn apply_filter<T>(boolean_mask: &mut BooleanArray, column: &arrow::array::PrimitiveArray<T>, filter: &(&str, &str, &str)) -> arrow::error::Result<()>
where
T: arrow::datatypes::ArrowPrimitiveType,
T::Native: std::cmp::PartialOrd + std::str::FromStr,
<T::Native as std::str::FromStr>::Err: std::fmt::Debug,
{
let filter_value = filter.2.parse::<T::Native>().unwrap();
let mut new_mask = BooleanBuilder::new();

for (index, value) in column.iter().enumerate() {
let current_mask = boolean_mask.value(index);
let result = match filter.1 {
">" => value.map_or(false, |v| v > filter_value),
"<" => value.map_or(false, |v| v < filter_value),
"=" => value.map_or(false, |v| v == filter_value),
"!=" => value.map_or(false, |v| v != filter_value),
">=" => value.map_or(false, |v| v >= filter_value),
"<=" => value.map_or(false, |v| v <= filter_value),
"==" => value.map_or(false, |v| v == filter_value),
_ => false,
};
new_mask.append_value(current_mask && result);
}

*boolean_mask = new_mask.finish();
Ok(())
}
3 changes: 3 additions & 0 deletions src/loaders/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod parse_params;
pub mod helpers;
pub mod parquet;
125 changes: 125 additions & 0 deletions src/loaders/parquet/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@

use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;

use arrow::array::{ArrayRef, NullArray};
use arrow::array::new_null_array;
use arrow::record_batch::RecordBatch;

use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::ArrowReaderMetadata;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;

use futures_util::stream::StreamExt;
use tokio::fs::File;

use crate::loaders::parquet::parse_params;
use crate::loaders::parquet::helpers::create_boolean_mask;

/// Process a Parquet file and return the content as a byte stream.
///
/// # Arguments
///
/// * `file_path` - A reference to a string containing the path to the Parquet file.
/// * `params` - A reference to a HashMap of parameters containing 'columns' and 'filters' keys.
///
/// # Returns
///
/// This function returns a byte stream that can be directly used as an HTTP response body.
pub async fn process_and_return_parquet_file(
file_path: &str,
params: &HashMap<String, String>
) -> Result<Vec<u8>, Box<dyn Error>> {
// Open async file containing parquet data
let std_file = std::fs::File::open(file_path)?;
let mut file = File::from_std(std_file);

let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?;
let stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
file.try_clone().await?,
meta.clone()
);
let original_metadata = meta.metadata();
let metadata_keys = original_metadata
.file_metadata()
.key_value_metadata()
.unwrap()
.clone();

let original_schema = stream_builder
.schema()
.clone();

let all_columns = original_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>();

// Parse selected columns from params
let columns = parse_params::parse_columns_from_params_to_str(&params)
.unwrap_or(all_columns);

let filters = parse_params::parse_filters(&params);

// Construct the reader stream
let mut stream = stream_builder
.with_batch_size(8192)
.build()?;

// Set writer properties with the original metadata
let writer_properties = WriterProperties::builder()
.set_key_value_metadata(Some(metadata_keys))
.build();

let mut out_buffer = Vec::new();
let mut writer = ArrowWriter::try_new(
&mut out_buffer,
original_schema.clone(),
Some(writer_properties)
)?;

// Collect all batches and write them to the buffer
while let Some(batch) = stream.next().await {
let mut batch = batch?;

//let predicate = arrow::compute::FilterBuilder::new(&batch, &projection)?;
if filters.is_some() {
let filter_mask = &create_boolean_mask(
&batch,
&original_schema,
filters.clone().unwrap()
).unwrap();
batch = arrow::compute::filter_record_batch(
&batch,
&filter_mask
)?;
}

let selected_arrays = original_schema.fields().iter()
.map(|field| {
if let Ok(index) = batch.schema().index_of(field.name()) {
if columns.contains(&field.name().to_string()) || &field.name().to_string() == "_hipscat_index" {
batch.column(index).clone()
} else {
new_null_array(
field.data_type(),
batch.num_rows()
)
}
} else {
Arc::new(NullArray::new(batch.num_rows())) as ArrayRef
}
})
.collect::<Vec<_>>();

let selected_batch = RecordBatch::try_new(original_schema.clone(), selected_arrays)?;
writer.write(&selected_batch)?;
}

writer.finish()?;
let _ = writer.close();
Ok(out_buffer)
}
46 changes: 46 additions & 0 deletions src/loaders/parquet/parse_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::collections::HashMap;
use regex::Regex;

/// # Arguments
///
/// * `params` - A reference to a HashMap of parameters containing 'columns' key.
///
/// # Returns
///
/// A vector of Polars with the selected columns.
pub fn parse_columns_from_params_to_str( params: &HashMap<String, String> ) -> Option<Vec<String>> {
// Parse columns from params
if let Some(cols) = params.get("columns") {
let cols = cols.split(",").collect::<Vec<_>>();
let select_cols = cols.iter().map(|x| x.to_string()).collect::<Vec<_>>();
return Some(select_cols);
}
None
}

/// # Arguments
///
/// * `params` - A reference to a HashMap of parameters containing 'filters' key.
///
/// # Returns
///
/// A vector of tuples containing the column name, the comparison operator and the value to compare.
pub fn parse_filters(params: &HashMap<String, String>) -> Option<Vec<(&str, &str, &str)>> {
let mut filters = Vec::new();
if let Some(query) = params.get("filters") {
filters = query.split(",").collect::<Vec<_>>();
}

if filters.len() == 0 {
return None
}

let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap();
let mut filter_vec = Vec::new();
for filter in filters {
let f_vec = re.captures(filter).unwrap();
filter_vec.push((f_vec.get(1).unwrap().as_str(), f_vec.get(2).unwrap().as_str(), f_vec.get(3).unwrap().as_str()));
}

Some(filter_vec)
}
Loading

0 comments on commit 47b0f8c

Please sign in to comment.