Skip to content

Commit

Permalink
REDONE all with parquet, not polars
Browse files Browse the repository at this point in the history
  • Loading branch information
Schwarzam committed Jun 9, 2024
1 parent 808b9b2 commit 78794ad
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 411 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ path = "src/bin.rs"

[dependencies]
futures-util = "0.3.30"
arrow = "51.0.0"
parquet = { version = "51.0.0", features = ["arrow", "async"] }
arrow = "52.0.0"
parquet = { version = "52.0.0", features = ["arrow", "async"] }
axum = "0.7.5"
polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8", "dtype-full"] }
tokio = { version = "1.37.0", features = ["full"] }
hyper = { version="1.3.1", features = ["full"] }
tower = "0.4.13"
Expand Down
3 changes: 1 addition & 2 deletions src/loaders/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod parquet;
pub mod parsers;
pub mod parquet;
197 changes: 0 additions & 197 deletions src/loaders/parquet.rs

This file was deleted.

96 changes: 96 additions & 0 deletions src/loaders/parquet/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use arrow::array::{Float64Array, Float32Array, Int16Array, Int32Array, Int64Array, Int8Array, BooleanArray};
use arrow::record_batch::RecordBatch;
use arrow::array::BooleanBuilder;
use arrow::datatypes::Schema;
use std::sync::Arc;

/// Create a boolean mask based on the filters provided.
///
/// # Arguments
///
/// * `batch` - A reference to a RecordBatch that will be filtered.
/// * `original_schema` - A reference to the original schema of the RecordBatch.
/// * `filters` - A vector of tuples containing the column name, the comparison operator and the value to compare.
///
/// # Returns
///
/// This function returns an Arrow Result with the boolean mask.
pub fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc<Schema>, filters: Vec<(&str, &str, &str)>) -> arrow::error::Result<Arc<BooleanArray>> {
let num_rows = batch.num_rows();
let mut boolean_builder = BooleanBuilder::new();

// Initialize all rows as true
for _ in 0..num_rows {
boolean_builder.append_value(true);
}
let mut boolean_mask = boolean_builder.finish();

for filter in filters.iter() {
let column = batch.column(original_schema.index_of(filter.0).unwrap());

if column.data_type() == &arrow::datatypes::DataType::Float32 {
let column = column.as_any().downcast_ref::<Float32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Float64 {
let column = column.as_any().downcast_ref::<Float64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int16 {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int32 {
let column = column.as_any().downcast_ref::<Int32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int64 {
let column = column.as_any().downcast_ref::<Int64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int8 {
let column = column.as_any().downcast_ref::<Int8Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Boolean {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else {
return Err(arrow::error::ArrowError::NotYetImplemented(format!("Data type {:?} not yet implemented", column.data_type())));
}
}
Ok(Arc::new(boolean_mask))
}

/// Apply a filter to a column and update the boolean mask.
///
/// # Arguments
///
/// * `boolean_mask` - A mutable reference to a BooleanArray that will be updated with the filter results.
/// * `column` - A reference to a PrimitiveArray that will be filtered.
/// * `filter` - A tuple containing the column name, the comparison operator and the value to compare.
///
/// # Returns
///
/// This function returns an Arrow Result.
fn apply_filter<T>(boolean_mask: &mut BooleanArray, column: &arrow::array::PrimitiveArray<T>, filter: &(&str, &str, &str)) -> arrow::error::Result<()>
where
T: arrow::datatypes::ArrowPrimitiveType,
T::Native: std::cmp::PartialOrd + std::str::FromStr,
<T::Native as std::str::FromStr>::Err: std::fmt::Debug,
{
let filter_value = filter.2.parse::<T::Native>().unwrap();
let mut new_mask = BooleanBuilder::new();

for (index, value) in column.iter().enumerate() {
let current_mask = boolean_mask.value(index);
let result = match filter.1 {
">" => value.map_or(false, |v| v > filter_value),
"<" => value.map_or(false, |v| v < filter_value),
"=" => value.map_or(false, |v| v == filter_value),
"!=" => value.map_or(false, |v| v != filter_value),
">=" => value.map_or(false, |v| v >= filter_value),
"<=" => value.map_or(false, |v| v <= filter_value),
"==" => value.map_or(false, |v| v == filter_value),
_ => false,
};
new_mask.append_value(current_mask && result);
}

*boolean_mask = new_mask.finish();
Ok(())
}
3 changes: 3 additions & 0 deletions src/loaders/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod parse_params;
pub mod helpers;
pub mod parquet;
Loading

0 comments on commit 78794ad

Please sign in to comment.