From 7f081560007f7fa554e79c09bc14f108797033c8 Mon Sep 17 00:00:00 2001 From: schwarzam Date: Sat, 25 May 2024 18:55:38 +0000 Subject: [PATCH] refactored code, started to work on lsdb syntax --- Cargo.toml | 2 +- README.md | 12 ++++ src/bin.rs | 4 +- src/lib.rs | 1 - src/loaders/mod.rs | 3 +- src/loaders/parquet.rs | 39 ++++++------ src/loaders/parsers/helpers.rs | 92 +++++++++++++++++++++++++++++ src/loaders/parsers/mod.rs | 2 + src/loaders/parsers/parse_params.rs | 77 ++++++++++++++++++++++++ src/parsers/helpers.rs | 66 --------------------- src/parsers/mod.rs | 2 - src/parsers/parse_filters.rs | 72 ---------------------- src/routes.rs | 2 +- tests/parquet.rs | 1 + tests/parsers.rs | 10 ++-- 15 files changed, 215 insertions(+), 170 deletions(-) create mode 100644 src/loaders/parsers/helpers.rs create mode 100644 src/loaders/parsers/mod.rs create mode 100644 src/loaders/parsers/parse_params.rs delete mode 100644 src/parsers/helpers.rs delete mode 100644 src/parsers/mod.rs delete mode 100644 src/parsers/parse_filters.rs diff --git a/Cargo.toml b/Cargo.toml index 4628b3d..d34eaa4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ path = "src/bin.rs" [dependencies] axum = "0.7.5" -polars = { version = "0.39.2", features = ["lazy", "parquet", "dtype-u8"] } +polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8"] } tokio = { version = "1.37.0", features = ["full"] } hyper = { version="1.3.1", features = ["full"] } tower = "0.4.13" diff --git a/README.md b/README.md index 01c9c08..4d58198 100644 --- a/README.md +++ b/README.md @@ -48,3 +48,15 @@ server { ``` --- + +### Parameters + +- `columns` : The columns to return in the response. Default is all columns. Default is all columns. +- `exclude_cols`: The columns to exclude from the response. Default is no columns. +- `filters`: A list of filters to apply ["r_auto < 18", ...]. Default is no filters. + +### Caveats + +- The server only works with lsdb while using dtype_backend = "numpy_nullable" + +This is because polars writes strings as LargeStrings or LargeUtf8, which causes issues with the metadata read in lsdb. \ No newline at end of file diff --git a/src/bin.rs b/src/bin.rs index bd00c8f..2f900a5 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -4,7 +4,7 @@ use axum::{ routing::{any, get}, Router }; -use lsdb_server::routes::catch_all; +use lsdb_server::routes::entry_route; #[tokio::main] async fn main() { @@ -12,7 +12,7 @@ async fn main() { let app = Router::new() .route("/", get(|| async { "online" })) - .fallback(any(catch_all)); // This will catch all other paths + .fallback(any(entry_route)); // This will catch all other paths let listener = tokio::net::TcpListener::bind("0.0.0.0:5000").await.unwrap(); axum::serve(listener, app).await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index c4e5b61..0d91dde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,2 @@ pub mod loaders; -pub mod parsers; pub mod routes; \ No newline at end of file diff --git a/src/loaders/mod.rs b/src/loaders/mod.rs index 4f83702..aee2043 100644 --- a/src/loaders/mod.rs +++ b/src/loaders/mod.rs @@ -1 +1,2 @@ -pub mod parquet; \ No newline at end of file +pub mod parquet; +pub mod parsers; \ No newline at end of file diff --git a/src/loaders/parquet.rs b/src/loaders/parquet.rs index 78ca34c..8e9f0a1 100644 --- a/src/loaders/parquet.rs +++ b/src/loaders/parquet.rs @@ -1,11 +1,10 @@ use std::collections::HashMap; -use polars::{lazy::dsl::col, prelude::*}; +use polars::prelude::*; use polars::io::HiveOptions; +use crate::loaders::parsers::parse_params; -use crate::parsers::helpers; -use crate::parsers::parse_filters; pub async fn process_and_return_parquet_file_lazy( file_path: &str, @@ -13,35 +12,37 @@ pub async fn process_and_return_parquet_file_lazy( ) -> Result, Box> { let mut args = ScanArgsParquet::default(); + // TODO: fix the parquet reader hive_options with _hipscat_index args.hive_options = HiveOptions{enabled:false, schema: None}; let lf = LazyFrame::scan_parquet(file_path, args).unwrap(); + let mut selected_cols = parse_params::parse_columns_from_params(¶ms).unwrap_or(Vec::new()); + selected_cols = parse_params::parse_exclude_columns_from_params(¶ms, &lf).unwrap_or(selected_cols); - let mut selected_cols = helpers::parse_columns_from_params(¶ms).unwrap_or(Vec::new()); - selected_cols = helpers::parse_exclude_columns_from_params(¶ms, &lf).unwrap_or(selected_cols); - - let combined_condition = parse_filters::parse_querie_from_params(¶ms); - + //println!("{:?}", ¶ms.get("filters").unwrap()); + let filters = parse_params::parse_filters_from_params(¶ms); + // HACK: Find a better way to handle each combination of selected params let mut df; - //In case we have selected columns and a combined condition - if combined_condition.is_ok() && selected_cols.len() > 0{ + //In case we have selected columns and filters + if filters.is_ok() && selected_cols.len() > 0{ df = lf + .drop(["_hipscat_index"]) .select(selected_cols) .filter( // only if combined_condition is not empty - combined_condition? + filters? ) .collect()?; } - // In case we have only a combined condition - else if combined_condition.is_ok() { + // In case we have only filters + else if filters.is_ok() { df = lf - //TODO: Remove later + //TODO: fix the parquet reader hive_options with _hipscat_index .drop(["_hipscat_index"]) .filter( // only if combined_condition is not empty - combined_condition? + filters? ) .collect()?; } @@ -51,13 +52,13 @@ pub async fn process_and_return_parquet_file_lazy( .select(selected_cols) .collect()?; } - // In case we have no selected columns or combined condition + // In case we have no selected columns or filters, return whole dataframe else { - df = lf.collect()?; + df = lf.drop(["_hipscat_index"]).collect()?; } let mut buf = Vec::new(); - ParquetWriter::new(&mut buf).finish(&mut df)?; - + ParquetWriter::new(&mut buf) + .finish(&mut df)?; Ok(buf) } \ No newline at end of file diff --git a/src/loaders/parsers/helpers.rs b/src/loaders/parsers/helpers.rs new file mode 100644 index 0000000..f50dc16 --- /dev/null +++ b/src/loaders/parsers/helpers.rs @@ -0,0 +1,92 @@ +use polars::prelude::*; +use std::collections::HashMap; + +/// Returns the column names of a LazyFrame. +/// +/// # Arguments +/// +/// * `lf` - A reference to a LazyFrame. +/// +/// # Returns +/// +/// A vector of strings representing the column names of the DataFrame. +pub fn get_lazyframe_column_names(lf : &LazyFrame) -> Vec { + let df = lf.clone().first().collect().unwrap(); + df.get_column_names().iter().map(|x| x.to_string()).collect() +} + +/// Parses a filter condition from a string into a Polars expression. +/// +/// The expected format for `condition` is "{column_name} {operator} {value}", where: +/// - `column_name` identifies a DataFrame column. +/// - `operator` is one of `<`, `<=`, `>`, `>=`, or `=`. +/// - `value` is a number compared against the column's values. +/// +/// # Parameters +/// * `condition` - A string slice representing the filter condition. +/// +/// # Returns +/// A `Result` containing either: +/// - `Ok(Expr)`: A Polars expression if the parsing succeeds. +/// - `Err(Box)`: An error if the format is incorrect or parsing fails. +pub fn str_filter_to_expr(condition: &str) -> Result> { + use regex::Regex; + + // Regex to catch "{column_name} {operator} {value}" + let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap(); + let parts = re.captures(condition).unwrap(); + + if parts.len() == 4 { + let column = parts.get(1).unwrap().as_str(); + let operator = parts.get(2).unwrap().as_str(); + let value = parts.get(3).unwrap().as_str(); + + match operator { + "<" => Ok(col(column).lt(lit(value.parse::()?))), + "<=" => Ok(col(column).lt_eq(lit(value.parse::()?))), + ">" => Ok(col(column).gt(lit(value.parse::()?))), + ">=" => Ok(col(column).gt_eq(lit(value.parse::()?))), + "=" => Ok(col(column).eq(lit(value.parse::()?))), + _ => Err("Unsupported operator".into()), + } + } else { + Err("Invalid condition format".into()) + } +} + + +/// Parses filter conditions from a list of tuples into Polars expressions. +/// +/// The expected format for each tuple in `filters` is (column_name, operator, value), where: +/// - `column_name` identifies a DataFrame column. +/// - `operator` is one of "==", "=", ">", ">=", "<", "<=", "!=", "in", "not in". +/// - `value` is a number or a list of values compared against the column's values. +/// +/// # Parameters +/// * `filters` - An optional vector of tuples representing the filter conditions. +/// +/// # Returns +/// A `Result` containing either: +/// - `Ok(Vec)`: A vector of Polars expressions if parsing succeeds. +/// - `Err(Box)`: An error if the format is incorrect or parsing fails. +pub fn filters_to_expr(filters: Option)>>) -> Result, Box> { + let mut expressions = Vec::new(); + + if let Some(conditions) = filters { + for (column, operator, values) in conditions { + let expression = match operator.as_str() { + "=" | "==" => col(&column).eq(lit(values[0])), + "!=" => col(&column).neq(lit(values[0])), + ">" => col(&column).gt(lit(values[0])), + ">=" => col(&column).gt_eq(lit(values[0])), + "<" => col(&column).lt(lit(values[0])), + "<=" => col(&column).lt_eq(lit(values[0])), + _ => return Err("Unsupported operator".into()), + }; + expressions.push(expression); + } + } + + Ok(expressions) +} + diff --git a/src/loaders/parsers/mod.rs b/src/loaders/parsers/mod.rs new file mode 100644 index 0000000..7f23e7e --- /dev/null +++ b/src/loaders/parsers/mod.rs @@ -0,0 +1,2 @@ +pub mod parse_params; +pub mod helpers; \ No newline at end of file diff --git a/src/loaders/parsers/parse_params.rs b/src/loaders/parsers/parse_params.rs new file mode 100644 index 0000000..14b37c1 --- /dev/null +++ b/src/loaders/parsers/parse_params.rs @@ -0,0 +1,77 @@ +use polars::{lazy::dsl::col, prelude::*}; +use std::collections::HashMap; +use crate::loaders::parsers::helpers; + +/// # Arguments +/// +/// * `params` - A reference to a HashMap of parameters containing 'columns' key. +/// +/// # Returns +/// +/// A vector of Polars with the selected columns. +pub fn parse_columns_from_params( params: &HashMap ) -> Option> { + // Parse columns from params + if let Some(cols) = params.get("columns") { + let cols = cols.split(",").collect::>(); + let select_cols = cols.iter().map(|x| col(x)).collect::>(); + return Some(select_cols); + } + None +} + +/// Parses a list of filter conditions from query parameter of hashmap. +/// +/// # Arguments +/// +/// * `params` - A reference to a HashMap of parameters. +/// +/// # Returns +/// +/// A Polars expression representing the combined filter conditions. +pub fn parse_filters_from_params(params: &HashMap) -> Result> { + let mut filters = Vec::new(); + if let Some(query) = params.get("filters") { + filters = query.split(",").collect::>(); + } + + //TODO: DEPRECATE + let conditions: Result, _> = filters.iter() + .map(|condition: &&str| helpers::str_filter_to_expr(*condition)) + .collect(); + + let combined_condition = conditions?.into_iter() + .reduce(|acc, cond| acc.and(cond)) + .ok_or(""); // Handle case where no conditions are provided + + match combined_condition { + Ok(_) => { Ok(combined_condition.unwrap()) }, + Err(_) => { Err( "Couldnt parse queries".into() ) }, + } +} + + +/// # Arguments +/// +/// * `params` - The client request HashMap of parameters. +/// * `lf` - A reference to a LazyFrame. +/// +/// # Returns +/// +/// A vector of Polars expressions representing the columns to exclude. +pub fn parse_exclude_columns_from_params( params: &HashMap, lf : &LazyFrame ) -> Option> { + // Parse columns from params + if let Some(exclude_cols) = params.get("exclude_cols") { + let exclude_cols = exclude_cols.split(",").collect::>(); + let exclude_cols = exclude_cols.iter().map(|&x| x).collect::>(); + + let cols = helpers::get_lazyframe_column_names(&lf); + + let select_cols = cols.iter() + .filter(|&x| !exclude_cols.contains( &x.as_str() )) + .map(|x| col(x)) + .collect::>(); + + return Some(select_cols); + } + None +} diff --git a/src/parsers/helpers.rs b/src/parsers/helpers.rs deleted file mode 100644 index 69788de..0000000 --- a/src/parsers/helpers.rs +++ /dev/null @@ -1,66 +0,0 @@ -use polars::prelude::*; -use std::collections::HashMap; - -/// Returns the column names of a LazyFrame. -/// -/// # Arguments -/// -/// * `lf` - A reference to a LazyFrame. -/// -/// # Returns -/// -/// A vector of strings representing the column names of the DataFrame. -pub fn get_lazyframe_column_names(lf : &LazyFrame) -> Vec { - let df = lf.clone().first().collect().unwrap(); - df.get_column_names().iter().map(|x| x.to_string()).collect() -} - -/// Parses a list of columns from a HashMap of parameters. -/// -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters. -/// -/// # Returns -/// -/// A vector of Polars expressions representing the columns to select. -pub fn parse_columns_from_params( params: &HashMap ) -> Option> { - // Parse columns from params - if let Some(cols) = params.get("cols") { - let cols = cols.split(",").collect::>(); - let select_cols = cols.iter().map(|x| col(x)).collect::>(); - return Some(select_cols); - } - None -} - -/// Parses a list of columns to exclude from a HashMap of parameters. -/// -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters. -/// * `lf` - A reference to a LazyFrame. -/// -/// # Returns -/// -/// A vector of Polars expressions representing the columns to exclude. -pub fn parse_exclude_columns_from_params( params: &HashMap, lf : &LazyFrame ) -> Option> { - // Parse columns from params - if let Some(exclude_cols) = params.get("exclude_cols") { - let exclude_cols = exclude_cols.split(",").collect::>(); - let exclude_cols = exclude_cols.iter().map(|&x| x).collect::>(); - - let cols = get_lazyframe_column_names(&lf); - - let select_cols = cols.iter() - .filter(|&x| !exclude_cols.contains( &x.as_str() )) - .map(|x| col(x)) - .collect::>(); - - return Some(select_cols); - } - None -} - - - diff --git a/src/parsers/mod.rs b/src/parsers/mod.rs deleted file mode 100644 index 5597450..0000000 --- a/src/parsers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod parse_filters; -pub mod helpers; \ No newline at end of file diff --git a/src/parsers/parse_filters.rs b/src/parsers/parse_filters.rs deleted file mode 100644 index 362faae..0000000 --- a/src/parsers/parse_filters.rs +++ /dev/null @@ -1,72 +0,0 @@ -use polars::{lazy::dsl::col, prelude::*}; -use std::collections::HashMap; - -/// Parses a list of filter conditions from query parameter of hashmap. -/// -/// # Arguments -/// -/// * `params` - A reference to a HashMap of parameters. -/// -/// # Returns -/// -/// A Polars expression representing the combined filter conditions. -pub fn parse_querie_from_params(params: &HashMap) -> Result> { - let mut queries = Vec::new(); - if let Some(query) = params.get("query") { - queries = query.split(",").collect::>(); - } - - let conditions: Result, _> = - queries.iter() - .map(|condition: &&str| parse_condition(*condition)) - .collect(); - - let combined_condition = conditions?.into_iter() - .reduce(|acc, cond| acc.and(cond)) - .ok_or(""); // Handle case where no conditions are provided - - match combined_condition { - Ok(_) => { Ok(combined_condition.unwrap()) }, - Err(_) => { Err( "Couldnt parse queries".into() ) }, - } -} - -/// Parses a filter condition from a string into a Polars expression. -/// -/// The expected format for `condition` is "{column_name} {operator} {value}", where: -/// - `column_name` identifies a DataFrame column. -/// - `operator` is one of `<`, `<=`, `>`, `>=`, or `=`. -/// - `value` is a number compared against the column's values. -/// -/// # Parameters -/// * `condition` - A string slice representing the filter condition. -/// -/// # Returns -/// A `Result` containing either: -/// - `Ok(Expr)`: A Polars expression if the parsing succeeds. -/// - `Err(Box)`: An error if the format is incorrect or parsing fails. -pub fn parse_condition(condition: &str) -> Result> { - use regex::Regex; - - // Regex to catch "{column_name} {operator} {value}" - let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap(); - - let parts = re.captures(condition).unwrap(); - - if parts.len() == 4 { - let column = parts.get(1).unwrap().as_str(); - let operator = parts.get(2).unwrap().as_str(); - let value = parts.get(3).unwrap().as_str(); - - match operator { - "<" => Ok(col(column).lt(lit(value.parse::()?))), - "<=" => Ok(col(column).lt_eq(lit(value.parse::()?))), - ">" => Ok(col(column).gt(lit(value.parse::()?))), - ">=" => Ok(col(column).gt_eq(lit(value.parse::()?))), - "=" => Ok(col(column).eq(lit(value.parse::()?))), - _ => Err("Unsupported operator".into()), - } - } else { - Err("Invalid condition format".into()) - } -} \ No newline at end of file diff --git a/src/routes.rs b/src/routes.rs index 0abc929..2dd53eb 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -20,7 +20,7 @@ use crate::loaders; /// /// # Returns /// This function returns a byte stream that can be directly used as an HTTP response body. -pub async fn catch_all(uri: OriginalUri, Query(params): Query>) -> impl IntoResponse { +pub async fn entry_route(uri: OriginalUri, Query(params): Query>) -> impl IntoResponse { let path = uri.0.path().trim_start_matches("/"); let base_path = PathBuf::from("/storage2/splus"); diff --git a/tests/parquet.rs b/tests/parquet.rs index 9cae96a..f697003 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -21,6 +21,7 @@ mod parser { ¶ms ).await; + // Add assertions here to verify the result } diff --git a/tests/parsers.rs b/tests/parsers.rs index 1d71c17..1f2ccbe 100644 --- a/tests/parsers.rs +++ b/tests/parsers.rs @@ -1,20 +1,20 @@ #[cfg(test)] mod parser { - use lsdb_server::parsers::parse_filters::*; + use lsdb_server::loaders::parsers::helpers; use polars::{lazy::dsl::col, prelude::*}; #[test] fn test_parse_condition() { - let expr = parse_condition("ra>=30.1241").unwrap(); + let expr = helpers::str_filter_to_expr("ra>=30.1241").unwrap(); assert_eq!(expr, col("ra").gt_eq(lit(30.1241))); - let expr = parse_condition("dec<=-30.3").unwrap(); + let expr = helpers::str_filter_to_expr("dec<=-30.3").unwrap(); assert_eq!(expr, col("dec").lt_eq(lit(-30.3))); - let expr = parse_condition("dec>4").unwrap(); + let expr = helpers::str_filter_to_expr("dec>4").unwrap(); assert_eq!(expr, col("dec").gt(lit(4.0))); - let expr = parse_condition("dec=4").unwrap(); + let expr = helpers::str_filter_to_expr("dec=4").unwrap(); assert_eq!(expr, col("dec").eq(lit(4.0))); }