Skip to content

Commit

Permalink
refactored code, started to work on lsdb syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
Schwarzam committed May 25, 2024
1 parent a52a7f1 commit 7f08156
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 170 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ path = "src/bin.rs"

[dependencies]
axum = "0.7.5"
polars = { version = "0.39.2", features = ["lazy", "parquet", "dtype-u8"] }
polars = { version = "0.40.0", features = ["lazy", "parquet", "dtype-u8"] }
tokio = { version = "1.37.0", features = ["full"] }
hyper = { version="1.3.1", features = ["full"] }
tower = "0.4.13"
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,15 @@ server {
```
---

### Parameters

- `columns` : The columns to return in the response. Default is all columns. Default is all columns.
- `exclude_cols`: The columns to exclude from the response. Default is no columns.
- `filters`: A list of filters to apply ["r_auto < 18", ...]. Default is no filters.

### Caveats

- The server only works with lsdb while using dtype_backend = "numpy_nullable"

This is because polars writes strings as LargeStrings or LargeUtf8, which causes issues with the metadata read in lsdb.
4 changes: 2 additions & 2 deletions src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use axum::{
routing::{any, get}, Router
};

use lsdb_server::routes::catch_all;
use lsdb_server::routes::entry_route;

#[tokio::main]
async fn main() {
println!("Starting Server");

let app = Router::new()
.route("/", get(|| async { "online" }))
.fallback(any(catch_all)); // This will catch all other paths
.fallback(any(entry_route)); // This will catch all other paths

let listener = tokio::net::TcpListener::bind("0.0.0.0:5000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod loaders;
pub mod parsers;
pub mod routes;
3 changes: 2 additions & 1 deletion src/loaders/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod parquet;
pub mod parquet;
pub mod parsers;
39 changes: 20 additions & 19 deletions src/loaders/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,48 @@

use std::collections::HashMap;

use polars::{lazy::dsl::col, prelude::*};
use polars::prelude::*;
use polars::io::HiveOptions;
use crate::loaders::parsers::parse_params;

use crate::parsers::helpers;
use crate::parsers::parse_filters;

pub async fn process_and_return_parquet_file_lazy(
file_path: &str,
params: &HashMap<String, String>
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let mut args = ScanArgsParquet::default();

// TODO: fix the parquet reader hive_options with _hipscat_index
args.hive_options = HiveOptions{enabled:false, schema: None};

let lf = LazyFrame::scan_parquet(file_path, args).unwrap();
let mut selected_cols = parse_params::parse_columns_from_params(&params).unwrap_or(Vec::new());
selected_cols = parse_params::parse_exclude_columns_from_params(&params, &lf).unwrap_or(selected_cols);

let mut selected_cols = helpers::parse_columns_from_params(&params).unwrap_or(Vec::new());
selected_cols = helpers::parse_exclude_columns_from_params(&params, &lf).unwrap_or(selected_cols);

let combined_condition = parse_filters::parse_querie_from_params(&params);

//println!("{:?}", &params.get("filters").unwrap());
let filters = parse_params::parse_filters_from_params(&params);

// HACK: Find a better way to handle each combination of selected params
let mut df;
//In case we have selected columns and a combined condition
if combined_condition.is_ok() && selected_cols.len() > 0{
//In case we have selected columns and filters
if filters.is_ok() && selected_cols.len() > 0{
df = lf
.drop(["_hipscat_index"])
.select(selected_cols)
.filter(
// only if combined_condition is not empty
combined_condition?
filters?
)
.collect()?;
}
// In case we have only a combined condition
else if combined_condition.is_ok() {
// In case we have only filters
else if filters.is_ok() {
df = lf
//TODO: Remove later
//TODO: fix the parquet reader hive_options with _hipscat_index
.drop(["_hipscat_index"])
.filter(
// only if combined_condition is not empty
combined_condition?
filters?
)
.collect()?;
}
Expand All @@ -51,13 +52,13 @@ pub async fn process_and_return_parquet_file_lazy(
.select(selected_cols)
.collect()?;
}
// In case we have no selected columns or combined condition
// In case we have no selected columns or filters, return whole dataframe
else {
df = lf.collect()?;
df = lf.drop(["_hipscat_index"]).collect()?;
}

let mut buf = Vec::new();
ParquetWriter::new(&mut buf).finish(&mut df)?;

ParquetWriter::new(&mut buf)
.finish(&mut df)?;
Ok(buf)
}
92 changes: 92 additions & 0 deletions src/loaders/parsers/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use polars::prelude::*;
use std::collections::HashMap;

/// Returns the column names of a LazyFrame.
///
/// # Arguments
///
/// * `lf` - A reference to a LazyFrame.
///
/// # Returns
///
/// A vector of strings representing the column names of the DataFrame.
pub fn get_lazyframe_column_names(lf : &LazyFrame) -> Vec<String> {
let df = lf.clone().first().collect().unwrap();
df.get_column_names().iter().map(|x| x.to_string()).collect()
}

/// Parses a filter condition from a string into a Polars expression.
///
/// The expected format for `condition` is "{column_name} {operator} {value}", where:
/// - `column_name` identifies a DataFrame column.
/// - `operator` is one of `<`, `<=`, `>`, `>=`, or `=`.
/// - `value` is a number compared against the column's values.
///
/// # Parameters
/// * `condition` - A string slice representing the filter condition.
///
/// # Returns
/// A `Result` containing either:
/// - `Ok(Expr)`: A Polars expression if the parsing succeeds.
/// - `Err(Box<dyn Error>)`: An error if the format is incorrect or parsing fails.
pub fn str_filter_to_expr(condition: &str) -> Result<Expr, Box<dyn std::error::Error>> {
use regex::Regex;

// Regex to catch "{column_name} {operator} {value}"
let re = Regex::new(r"([a-zA-Z_]+)([<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap();
let parts = re.captures(condition).unwrap();

if parts.len() == 4 {
let column = parts.get(1).unwrap().as_str();
let operator = parts.get(2).unwrap().as_str();
let value = parts.get(3).unwrap().as_str();

match operator {
"<" => Ok(col(column).lt(lit(value.parse::<f64>()?))),
"<=" => Ok(col(column).lt_eq(lit(value.parse::<f64>()?))),
">" => Ok(col(column).gt(lit(value.parse::<f64>()?))),
">=" => Ok(col(column).gt_eq(lit(value.parse::<f64>()?))),
"=" => Ok(col(column).eq(lit(value.parse::<f64>()?))),
_ => Err("Unsupported operator".into()),
}
} else {
Err("Invalid condition format".into())
}
}


/// Parses filter conditions from a list of tuples into Polars expressions.
///
/// The expected format for each tuple in `filters` is (column_name, operator, value), where:
/// - `column_name` identifies a DataFrame column.
/// - `operator` is one of "==", "=", ">", ">=", "<", "<=", "!=", "in", "not in".
/// - `value` is a number or a list of values compared against the column's values.
///
/// # Parameters
/// * `filters` - An optional vector of tuples representing the filter conditions.
///
/// # Returns
/// A `Result` containing either:
/// - `Ok(Vec<Expr>)`: A vector of Polars expressions if parsing succeeds.
/// - `Err(Box<dyn Error>)`: An error if the format is incorrect or parsing fails.
pub fn filters_to_expr(filters: Option<Vec<(String, String, Vec<f64>)>>) -> Result<Vec<Expr>, Box<dyn std::error::Error>> {
let mut expressions = Vec::new();

if let Some(conditions) = filters {
for (column, operator, values) in conditions {
let expression = match operator.as_str() {
"=" | "==" => col(&column).eq(lit(values[0])),
"!=" => col(&column).neq(lit(values[0])),
">" => col(&column).gt(lit(values[0])),
">=" => col(&column).gt_eq(lit(values[0])),
"<" => col(&column).lt(lit(values[0])),
"<=" => col(&column).lt_eq(lit(values[0])),
_ => return Err("Unsupported operator".into()),
};
expressions.push(expression);
}
}

Ok(expressions)
}

2 changes: 2 additions & 0 deletions src/loaders/parsers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod parse_params;
pub mod helpers;
77 changes: 77 additions & 0 deletions src/loaders/parsers/parse_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use polars::{lazy::dsl::col, prelude::*};
use std::collections::HashMap;
use crate::loaders::parsers::helpers;

/// # Arguments
///
/// * `params` - A reference to a HashMap of parameters containing 'columns' key.
///
/// # Returns
///
/// A vector of Polars with the selected columns.
pub fn parse_columns_from_params( params: &HashMap<String, String> ) -> Option<Vec<Expr>> {
// Parse columns from params
if let Some(cols) = params.get("columns") {
let cols = cols.split(",").collect::<Vec<_>>();
let select_cols = cols.iter().map(|x| col(x)).collect::<Vec<_>>();
return Some(select_cols);
}
None
}

/// Parses a list of filter conditions from query parameter of hashmap.
///
/// # Arguments
///
/// * `params` - A reference to a HashMap of parameters.
///
/// # Returns
///
/// A Polars expression representing the combined filter conditions.
pub fn parse_filters_from_params(params: &HashMap<String, String>) -> Result<Expr, Box<dyn std::error::Error>> {
let mut filters = Vec::new();
if let Some(query) = params.get("filters") {
filters = query.split(",").collect::<Vec<_>>();
}

//TODO: DEPRECATE
let conditions: Result<Vec<Expr>, _> = filters.iter()
.map(|condition: &&str| helpers::str_filter_to_expr(*condition))
.collect();

let combined_condition = conditions?.into_iter()
.reduce(|acc, cond| acc.and(cond))
.ok_or(""); // Handle case where no conditions are provided

match combined_condition {
Ok(_) => { Ok(combined_condition.unwrap()) },
Err(_) => { Err( "Couldnt parse queries".into() ) },
}
}


/// # Arguments
///
/// * `params` - The client request HashMap of parameters.
/// * `lf` - A reference to a LazyFrame.
///
/// # Returns
///
/// A vector of Polars expressions representing the columns to exclude.
pub fn parse_exclude_columns_from_params( params: &HashMap<String, String>, lf : &LazyFrame ) -> Option<Vec<Expr>> {
// Parse columns from params
if let Some(exclude_cols) = params.get("exclude_cols") {
let exclude_cols = exclude_cols.split(",").collect::<Vec<_>>();
let exclude_cols = exclude_cols.iter().map(|&x| x).collect::<Vec<_>>();

let cols = helpers::get_lazyframe_column_names(&lf);

let select_cols = cols.iter()
.filter(|&x| !exclude_cols.contains( &x.as_str() ))
.map(|x| col(x))
.collect::<Vec<_>>();

return Some(select_cols);
}
None
}
66 changes: 0 additions & 66 deletions src/parsers/helpers.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/parsers/mod.rs

This file was deleted.

Loading

0 comments on commit 7f08156

Please sign in to comment.