From 78794ad2ad729df5d4004b0bf266112a8ac199a1 Mon Sep 17 00:00:00 2001 From: schwarzam Date: Sun, 9 Jun 2024 04:26:31 +0000 Subject: [PATCH] REDONE all with parquet, not polars --- Cargo.toml | 5 +- src/loaders/mod.rs | 3 +- src/loaders/parquet.rs | 197 ---------------------------- src/loaders/parquet/helpers.rs | 96 ++++++++++++++ src/loaders/parquet/mod.rs | 3 + src/loaders/parquet/parquet.rs | 125 ++++++++++++++++++ src/loaders/parquet/parse_params.rs | 46 +++++++ src/loaders/parsers/helpers.rs | 91 ------------- src/loaders/parsers/mod.rs | 2 - src/loaders/parsers/parse_params.rs | 94 ------------- src/routes.rs | 2 +- tests/parquet.rs | 5 +- tests/parsers.rs | 17 --- 13 files changed, 275 insertions(+), 411 deletions(-) delete mode 100644 src/loaders/parquet.rs create mode 100644 src/loaders/parquet/helpers.rs create mode 100644 src/loaders/parquet/mod.rs create mode 100644 src/loaders/parquet/parquet.rs create mode 100644 src/loaders/parquet/parse_params.rs delete mode 100644 src/loaders/parsers/helpers.rs delete mode 100644 src/loaders/parsers/mod.rs delete mode 100644 src/loaders/parsers/parse_params.rs diff --git a/Cargo.toml b/Cargo.toml index ffb37ef..12c4056 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,9 @@ path = "src/bin.rs" [dependencies] futures-util = "0.3.30" -arrow = "51.0.0" -parquet = { version = "51.0.0", features = ["arrow", "async"] } +arrow = "52.0.0" +parquet = { version = "52.0.0", features = ["arrow", "async"] } axum = "0.7.5" -polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8", "dtype-full"] } tokio = { version = "1.37.0", features = ["full"] } hyper = { version="1.3.1", features = ["full"] } tower = "0.4.13" diff --git a/src/loaders/mod.rs b/src/loaders/mod.rs index aee2043..4f83702 100644 --- a/src/loaders/mod.rs +++ b/src/loaders/mod.rs @@ -1,2 +1 @@ -pub mod parquet; -pub mod parsers; \ No newline at end of file +pub mod parquet; \ No newline at end of file diff --git a/src/loaders/parquet.rs b/src/loaders/parquet.rs deleted file mode 100644 index ae87f11..0000000 --- a/src/loaders/parquet.rs +++ /dev/null @@ -1,197 +0,0 @@ - -use std::collections::HashMap; -use std::os::fd::FromRawFd; - -use polars::prelude::*; -use polars::io::HiveOptions; -use crate::loaders::parsers::parse_params; - - -pub async fn process_and_return_parquet_file_lazy( - file_path: &str, - params: &HashMap -) -> Result, Box> { - let mut args = ScanArgsParquet::default(); - - // TODO: fix the parquet reader hive_options with _hipscat_index - args.hive_options = HiveOptions{enabled:false, schema: None}; - - let lf = LazyFrame::scan_parquet(file_path, args).unwrap(); - - // Retrieve the schema of the LazyFrame - let schema = lf.schema()?; - let all_columns: Vec<(String, DataType)> = schema - .iter_fields() - .map(|field| (field.name().to_string(), field.data_type().clone())) - .collect(); - - let mut selected_cols = parse_params::parse_columns_from_params(¶ms).unwrap_or(Vec::new()); - selected_cols = parse_params::parse_exclude_columns_from_params(¶ms, &lf).unwrap_or(selected_cols); - - //println!("{:?}", ¶ms.get("filters").unwrap()); - let filters = parse_params::parse_filters_from_params(¶ms); - - // HACK: Find a better way to handle each combination of selected params - let mut df; - //In case we have selected columns and filters - if filters.is_ok() && selected_cols.len() > 0{ - df = lf - .drop(["_hipscat_index"]) - .filter( - // only if combined_condition is not empty - filters? - ) - .select(selected_cols) - .collect()?; - } - // In case we have only filters - else if filters.is_ok() { - df = lf - //TODO: fix the parquet reader hive_options with _hipscat_index - .drop(["_hipscat_index"]) - .filter( - // only if combined_condition is not empty - filters? - ) - .collect()?; - } - // In case we have only selected columns - else if selected_cols.len() > 0 { - df = lf - .select(selected_cols) - .collect()?; - } - // In case we have no selected columns or filters, return whole dataframe - else { - df = lf.drop(["_hipscat_index"]).collect()?; - } - - for (col, dtype) in &all_columns { - if !df.get_column_names().contains(&col.as_str()) { - let series = Series::full_null(col, df.height(), &dtype); - df.with_column(series)?; - - } - } - df = df.select(&all_columns.iter().map(|(col, _)| col.as_str()).collect::>())?; - - let col_names = df.get_column_names_owned(); - for (index, name) in col_names.iter().enumerate() { - let col = df.column(&name).unwrap(); - if col.dtype() == &ArrowDataType::LargeUtf8 { - //modifying the column to categorical - // modify the schema to be categorica - // df.try_apply(name, |s| s.categorical().cloned())?; - } - } - //println!("{:?}", df.schema()); - - // Checking if anything changed - // for (index, name) in col_names.iter().enumerate() { - // let col = df.column(&name).unwrap(); - // if col.dtype() == &ArrowDataType::LargeUtf8 { - // println!("Column in LargeUtf8 {:?}", name); - // } - // } - - let mut buf = Vec::new(); - ParquetWriter::new(&mut buf) - .finish(&mut df)?; - Ok(buf) -} - - -use arrow::array::{ArrayRef, Float64Array, NullArray}; -use arrow::array::{BooleanArray, Float32Array, new_null_array}; -use arrow::array::{make_array, ArrayDataBuilder}; -use arrow::record_batch::RecordBatch; -use futures_util::stream::StreamExt; -use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; -use parquet::arrow::arrow_reader::ArrowReaderMetadata; -use parquet::arrow::arrow_writer::ArrowWriter; -use std::error::Error; -use std::sync::Arc; -use tokio::fs::File; -use parquet::file::properties::WriterProperties; - -fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc) -> arrow::error::Result> { - // Extract the "PROB_GAL" column and downcast it to Float32Array - let prob_gal = batch.column(original_schema.index_of("PROB_GAL")?); - // Downcast to original schema type - let prob_gal = prob_gal.as_any().downcast_ref::().unwrap(); - - // Create a boolean mask where true is prob_gal > 0.8 - let mut builder = BooleanArray::builder(prob_gal.len()); - for value in prob_gal.iter() { - builder.append_value(value.map_or(false, |v| v > 0.8)); - } - - let filter_mask = builder.finish(); - Ok(Arc::new(filter_mask)) -} - - -pub async fn process_and_return_parquet_file( - file_path: &str, - params: &HashMap -) -> Result, Box> { - // Open async file containing parquet data - let std_file = std::fs::File::open(file_path)?; - let mut file = File::from_std(std_file); - - // Parse selected columns from params - let selected_cols = parse_params::parse_columns_from_params_to_str(¶ms).unwrap_or(Vec::new()); - - let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?; - - let stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( - file.try_clone().await?, - meta.clone() - ); - let original_metadata = meta.metadata(); - let metadata_keys = original_metadata.file_metadata().key_value_metadata().unwrap().clone(); - let original_schema = stream_builder.schema().clone(); - - // Construct the reader - let mut stream = stream_builder - .with_batch_size(8192) - .build()?; - - let mut buf = Vec::new(); - - // Set writer properties with the original metadata - let writer_properties = WriterProperties::builder() - .set_key_value_metadata(Some(metadata_keys)) - .build(); - - let mut writer = ArrowWriter::try_new(&mut buf, original_schema.clone(), Some(writer_properties))?; - - // Collect all batches and write them to the buffer - while let Some(batch) = stream.next().await { - let mut batch = batch?; - - //let predicate = arrow::compute::FilterBuilder::new(&batch, &projection)?; - batch = arrow::compute::filter_record_batch(&batch, &create_boolean_mask(&batch, &original_schema).unwrap())?; - - let selected_arrays = original_schema.fields().iter() - .map(|field| { - if let Ok(index) = batch.schema().index_of(field.name()) { - if selected_cols.contains(&field.name().to_string()) || &field.name().to_string() == "_hipscat_index" { - batch.column(index).clone() - } else { - new_null_array(field.data_type(), batch.num_rows()) - } - } else { - Arc::new(NullArray::new(batch.num_rows())) as ArrayRef - } - }) - .collect::>(); - - let selected_batch = RecordBatch::try_new(original_schema.clone(), selected_arrays)?; - writer.write(&selected_batch)?; - } - - writer.finish()?; - let _ = writer.close(); - Ok(buf) -} diff --git a/src/loaders/parquet/helpers.rs b/src/loaders/parquet/helpers.rs new file mode 100644 index 0000000..6770c10 --- /dev/null +++ b/src/loaders/parquet/helpers.rs @@ -0,0 +1,96 @@ +use arrow::array::{Float64Array, Float32Array, Int16Array, Int32Array, Int64Array, Int8Array, BooleanArray}; +use arrow::record_batch::RecordBatch; +use arrow::array::BooleanBuilder; +use arrow::datatypes::Schema; +use std::sync::Arc; + +/// Create a boolean mask based on the filters provided. +/// +/// # Arguments +/// +/// * `batch` - A reference to a RecordBatch that will be filtered. +/// * `original_schema` - A reference to the original schema of the RecordBatch. +/// * `filters` - A vector of tuples containing the column name, the comparison operator and the value to compare. +/// +/// # Returns +/// +/// This function returns an Arrow Result with the boolean mask. +pub fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc, filters: Vec<(&str, &str, &str)>) -> arrow::error::Result> { + let num_rows = batch.num_rows(); + let mut boolean_builder = BooleanBuilder::new(); + + // Initialize all rows as true + for _ in 0..num_rows { + boolean_builder.append_value(true); + } + let mut boolean_mask = boolean_builder.finish(); + + for filter in filters.iter() { + let column = batch.column(original_schema.index_of(filter.0).unwrap()); + + if column.data_type() == &arrow::datatypes::DataType::Float32 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Float64 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int16 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int32 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int64 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Int8 { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else if column.data_type() == &arrow::datatypes::DataType::Boolean { + let column = column.as_any().downcast_ref::().unwrap(); + apply_filter(&mut boolean_mask, column, filter)?; + } else { + return Err(arrow::error::ArrowError::NotYetImplemented(format!("Data type {:?} not yet implemented", column.data_type()))); + } + } + Ok(Arc::new(boolean_mask)) +} + +/// Apply a filter to a column and update the boolean mask. +/// +/// # Arguments +/// +/// * `boolean_mask` - A mutable reference to a BooleanArray that will be updated with the filter results. +/// * `column` - A reference to a PrimitiveArray that will be filtered. +/// * `filter` - A tuple containing the column name, the comparison operator and the value to compare. +/// +/// # Returns +/// +/// This function returns an Arrow Result. +fn apply_filter(boolean_mask: &mut BooleanArray, column: &arrow::array::PrimitiveArray, filter: &(&str, &str, &str)) -> arrow::error::Result<()> +where + T: arrow::datatypes::ArrowPrimitiveType, + T::Native: std::cmp::PartialOrd + std::str::FromStr, + ::Err: std::fmt::Debug, +{ + let filter_value = filter.2.parse::().unwrap(); + let mut new_mask = BooleanBuilder::new(); + + for (index, value) in column.iter().enumerate() { + let current_mask = boolean_mask.value(index); + let result = match filter.1 { + ">" => value.map_or(false, |v| v > filter_value), + "<" => value.map_or(false, |v| v < filter_value), + "=" => value.map_or(false, |v| v == filter_value), + "!=" => value.map_or(false, |v| v != filter_value), + ">=" => value.map_or(false, |v| v >= filter_value), + "<=" => value.map_or(false, |v| v <= filter_value), + "==" => value.map_or(false, |v| v == filter_value), + _ => false, + }; + new_mask.append_value(current_mask && result); + } + + *boolean_mask = new_mask.finish(); + Ok(()) +} \ No newline at end of file diff --git a/src/loaders/parquet/mod.rs b/src/loaders/parquet/mod.rs new file mode 100644 index 0000000..aecf1a5 --- /dev/null +++ b/src/loaders/parquet/mod.rs @@ -0,0 +1,3 @@ +pub mod parse_params; +pub mod helpers; +pub mod parquet; \ No newline at end of file diff --git a/src/loaders/parquet/parquet.rs b/src/loaders/parquet/parquet.rs new file mode 100644 index 0000000..17f632f --- /dev/null +++ b/src/loaders/parquet/parquet.rs @@ -0,0 +1,125 @@ + +use std::collections::HashMap; +use std::error::Error; +use std::sync::Arc; + +use arrow::array::{ArrayRef, NullArray}; +use arrow::array::new_null_array; +use arrow::record_batch::RecordBatch; + +use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::arrow::arrow_writer::ArrowWriter; +use parquet::file::properties::WriterProperties; + +use futures_util::stream::StreamExt; +use tokio::fs::File; + +use crate::loaders::parquet::parse_params; +use crate::loaders::parquet::helpers::create_boolean_mask; + +/// Process a Parquet file and return the content as a byte stream. +/// +/// # Arguments +/// +/// * `file_path` - A reference to a string containing the path to the Parquet file. +/// * `params` - A reference to a HashMap of parameters containing 'columns' and 'filters' keys. +/// +/// # Returns +/// +/// This function returns a byte stream that can be directly used as an HTTP response body. +pub async fn process_and_return_parquet_file( + file_path: &str, + params: &HashMap +) -> Result, Box> { + // Open async file containing parquet data + let std_file = std::fs::File::open(file_path)?; + let mut file = File::from_std(std_file); + + let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?; + let stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + file.try_clone().await?, + meta.clone() + ); + let original_metadata = meta.metadata(); + let metadata_keys = original_metadata + .file_metadata() + .key_value_metadata() + .unwrap() + .clone(); + + let original_schema = stream_builder + .schema() + .clone(); + + let all_columns = original_schema + .fields() + .iter() + .map(|field| field.name().to_string()) + .collect::>(); + + // Parse selected columns from params + let columns = parse_params::parse_columns_from_params_to_str(¶ms) + .unwrap_or(all_columns); + + let filters = parse_params::parse_filters(¶ms); + + // Construct the reader stream + let mut stream = stream_builder + .with_batch_size(8192) + .build()?; + + // Set writer properties with the original metadata + let writer_properties = WriterProperties::builder() + .set_key_value_metadata(Some(metadata_keys)) + .build(); + + let mut out_buffer = Vec::new(); + let mut writer = ArrowWriter::try_new( + &mut out_buffer, + original_schema.clone(), + Some(writer_properties) + )?; + + // Collect all batches and write them to the buffer + while let Some(batch) = stream.next().await { + let mut batch = batch?; + + //let predicate = arrow::compute::FilterBuilder::new(&batch, &projection)?; + if filters.is_some() { + let filter_mask = &create_boolean_mask( + &batch, + &original_schema, + filters.clone().unwrap() + ).unwrap(); + batch = arrow::compute::filter_record_batch( + &batch, + &filter_mask + )?; + } + + let selected_arrays = original_schema.fields().iter() + .map(|field| { + if let Ok(index) = batch.schema().index_of(field.name()) { + if columns.contains(&field.name().to_string()) || &field.name().to_string() == "_hipscat_index" { + batch.column(index).clone() + } else { + new_null_array( + field.data_type(), + batch.num_rows() + ) + } + } else { + Arc::new(NullArray::new(batch.num_rows())) as ArrayRef + } + }) + .collect::>(); + + let selected_batch = RecordBatch::try_new(original_schema.clone(), selected_arrays)?; + writer.write(&selected_batch)?; + } + + writer.finish()?; + let _ = writer.close(); + Ok(out_buffer) +} diff --git a/src/loaders/parquet/parse_params.rs b/src/loaders/parquet/parse_params.rs new file mode 100644 index 0000000..8c59302 --- /dev/null +++ b/src/loaders/parquet/parse_params.rs @@ -0,0 +1,46 @@ +use std::collections::HashMap; +use regex::Regex; + +/// # Arguments +/// +/// * `params` - A reference to a HashMap of parameters containing 'columns' key. +/// +/// # Returns +/// +/// A vector of Polars with the selected columns. +pub fn parse_columns_from_params_to_str( params: &HashMap ) -> Option> { + // Parse columns from params + if let Some(cols) = params.get("columns") { + let cols = cols.split(",").collect::>(); + let select_cols = cols.iter().map(|x| x.to_string()).collect::>(); + return Some(select_cols); + } + None +} + +/// # Arguments +/// +/// * `params` - A reference to a HashMap of parameters containing 'filters' key. +/// +/// # Returns +/// +/// A vector of tuples containing the column name, the comparison operator and the value to compare. +pub fn parse_filters(params: &HashMap) -> Option> { + let mut filters = Vec::new(); + if let Some(query) = params.get("filters") { + filters = query.split(",").collect::>(); + } + + if filters.len() == 0 { + return None + } + + let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap(); + let mut filter_vec = Vec::new(); + for filter in filters { + let f_vec = re.captures(filter).unwrap(); + filter_vec.push((f_vec.get(1).unwrap().as_str(), f_vec.get(2).unwrap().as_str(), f_vec.get(3).unwrap().as_str())); + } + + Some(filter_vec) +} \ No newline at end of file diff --git a/src/loaders/parsers/helpers.rs b/src/loaders/parsers/helpers.rs deleted file mode 100644 index 89f8541..0000000 --- a/src/loaders/parsers/helpers.rs +++ /dev/null @@ -1,91 +0,0 @@ -use polars::prelude::*; - -/// Returns the column names of a LazyFrame. -/// -/// # Arguments -/// -/// * `lf` - A reference to a LazyFrame. -/// -/// # Returns -/// -/// A vector of strings representing the column names of the DataFrame. -pub fn get_lazyframe_column_names(lf : &LazyFrame) -> Vec { - let df = lf.clone().first().collect().unwrap(); - df.get_column_names().iter().map(|x| x.to_string()).collect() -} - -/// Parses a filter condition from a string into a Polars expression. -/// -/// The expected format for `condition` is "{column_name} {operator} {value}", where: -/// - `column_name` identifies a DataFrame column. -/// - `operator` is one of `<`, `<=`, `>`, `>=`, or `=`. -/// - `value` is a number compared against the column's values. -/// -/// # Parameters -/// * `condition` - A string slice representing the filter condition. -/// -/// # Returns -/// A `Result` containing either: -/// - `Ok(Expr)`: A Polars expression if the parsing succeeds. -/// - `Err(Box)`: An error if the format is incorrect or parsing fails. -pub fn str_filter_to_expr(condition: &str) -> Result> { - use regex::Regex; - - // Regex to catch "{column_name} {operator} {value}" - let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap(); - let parts = re.captures(condition).unwrap(); - - if parts.len() == 4 { - let column = parts.get(1).unwrap().as_str(); - let operator = parts.get(2).unwrap().as_str(); - let value = parts.get(3).unwrap().as_str(); - - match operator { - "<" => Ok(col(column).lt(lit(value.parse::()?))), - "<=" => Ok(col(column).lt_eq(lit(value.parse::()?))), - ">" => Ok(col(column).gt(lit(value.parse::()?))), - ">=" => Ok(col(column).gt_eq(lit(value.parse::()?))), - "=" => Ok(col(column).eq(lit(value.parse::()?))), - _ => Err("Unsupported operator".into()), - } - } else { - Err("Invalid condition format".into()) - } -} - - -/// Parses filter conditions from a list of tuples into Polars expressions. -/// -/// The expected format for each tuple in `filters` is (column_name, operator, value), where: -/// - `column_name` identifies a DataFrame column. -/// - `operator` is one of "==", "=", ">", ">=", "<", "<=", "!=", "in", "not in". -/// - `value` is a number or a list of values compared against the column's values. -/// -/// # Parameters -/// * `filters` - An optional vector of tuples representing the filter conditions. -/// -/// # Returns -/// A `Result` containing either: -/// - `Ok(Vec)`: A vector of Polars expressions if parsing succeeds. -/// - `Err(Box)`: An error if the format is incorrect or parsing fails. -pub fn filters_to_expr(filters: Option)>>) -> Result, Box> { - let mut expressions = Vec::new(); - - if let Some(conditions) = filters { - for (column, operator, values) in conditions { - let expression = match operator.as_str() { - "=" | "==" => col(&column).eq(lit(values[0])), - "!=" => col(&column).neq(lit(values[0])), - ">" => col(&column).gt(lit(values[0])), - ">=" => col(&column).gt_eq(lit(values[0])), - "<" => col(&column).lt(lit(values[0])), - "<=" => col(&column).lt_eq(lit(values[0])), - _ => return Err("Unsupported operator".into()), - }; - expressions.push(expression); - } - } - - Ok(expressions) -} - diff --git a/src/loaders/parsers/mod.rs b/src/loaders/parsers/mod.rs deleted file mode 100644 index 7f23e7e..0000000 --- a/src/loaders/parsers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod parse_params; -pub mod helpers; \ No newline at end of file diff --git a/src/loaders/parsers/parse_params.rs b/src/loaders/parsers/parse_params.rs deleted file mode 100644 index 53aca7e..0000000 --- a/src/loaders/parsers/parse_params.rs +++ /dev/null @@ -1,94 +0,0 @@ -use polars::{lazy::dsl::col, prelude::*}; -use std::collections::HashMap; -use crate::loaders::parsers::helpers; - -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters containing 'columns' key. -/// -/// # Returns -/// -/// A vector of Polars with the selected columns. -pub fn parse_columns_from_params( params: &HashMap ) -> Option> { - // Parse columns from params - if let Some(cols) = params.get("columns") { - let cols = cols.split(",").collect::>(); - let select_cols = cols.iter().map(|x| col(x)).collect::>(); - return Some(select_cols); - } - None -} - -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters containing 'columns' key. -/// -/// # Returns -/// -/// A vector of Polars with the selected columns. -pub fn parse_columns_from_params_to_str( params: &HashMap ) -> Option> { - // Parse columns from params - if let Some(cols) = params.get("columns") { - let cols = cols.split(",").collect::>(); - let select_cols = cols.iter().map(|x| x.to_string()).collect::>(); - return Some(select_cols); - } - None -} - -/// Parses a list of filter conditions from query parameter of hashmap. -/// -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters. -/// -/// # Returns -/// -/// A Polars expression representing the combined filter conditions. -pub fn parse_filters_from_params(params: &HashMap) -> Result> { - let mut filters = Vec::new(); - if let Some(query) = params.get("filters") { - filters = query.split(",").collect::>(); - } - - //TODO: DEPRECATE - let conditions: Result, _> = filters.iter() - .map(|condition: &&str| helpers::str_filter_to_expr(*condition)) - .collect(); - - let combined_condition = conditions?.into_iter() - .reduce(|acc, cond| acc.and(cond)) - .ok_or(""); // Handle case where no conditions are provided - - match combined_condition { - Ok(_) => { Ok(combined_condition.unwrap()) }, - Err(_) => { Err( "Couldnt parse queries".into() ) }, - } -} - - -/// # Arguments -/// -/// * `params` - The client request HashMap of parameters. -/// * `lf` - A reference to a LazyFrame. -/// -/// # Returns -/// -/// A vector of Polars expressions representing the columns to exclude. -pub fn parse_exclude_columns_from_params( params: &HashMap, lf : &LazyFrame ) -> Option> { - // Parse columns from params - if let Some(exclude_cols) = params.get("exclude_cols") { - let exclude_cols = exclude_cols.split(",").collect::>(); - let exclude_cols = exclude_cols.iter().map(|&x| x).collect::>(); - - let cols = helpers::get_lazyframe_column_names(&lf); - - let select_cols = cols.iter() - .filter(|&x| !exclude_cols.contains( &x.as_str() )) - .map(|x| col(x)) - .collect::>(); - - return Some(select_cols); - } - None -} diff --git a/src/routes.rs b/src/routes.rs index a24f173..d0c27a9 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -25,7 +25,7 @@ pub async fn entry_route(uri: OriginalUri, Query(params): Query=30.1241,DEC<=-30.3".to_string()); - let result = parquet::process_and_return_parquet_file_lazy( + let result = parquet::parquet::process_and_return_parquet_file( file_path.to_str().unwrap(), ¶ms ).await; - - // Add assertions here to verify the result } diff --git a/tests/parsers.rs b/tests/parsers.rs index 1f2ccbe..fba86a4 100644 --- a/tests/parsers.rs +++ b/tests/parsers.rs @@ -1,22 +1,5 @@ #[cfg(test)] mod parser { - use lsdb_server::loaders::parsers::helpers; - use polars::{lazy::dsl::col, prelude::*}; - #[test] - fn test_parse_condition() { - let expr = helpers::str_filter_to_expr("ra>=30.1241").unwrap(); - assert_eq!(expr, col("ra").gt_eq(lit(30.1241))); - - let expr = helpers::str_filter_to_expr("dec<=-30.3").unwrap(); - assert_eq!(expr, col("dec").lt_eq(lit(-30.3))); - - let expr = helpers::str_filter_to_expr("dec>4").unwrap(); - assert_eq!(expr, col("dec").gt(lit(4.0))); - - let expr = helpers::str_filter_to_expr("dec=4").unwrap(); - assert_eq!(expr, col("dec").eq(lit(4.0))); - } - } \ No newline at end of file