Skip to content

Commit

Permalink
Add quote_style option to csv IO functions (#1049)
Browse files Browse the repository at this point in the history
* Create datatype for quote style and trait implementations

* Add quote style to CSV IO functions

* Add quote_style option to polars backend

* Add quote_style type and options to backend

* Add quote_style

* Add tests

* Add documentation on supported quote_style values
  • Loading branch information
jdbarillas authored Jan 10, 2025
1 parent a31b629 commit d2dfe83
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 21 deletions.
9 changes: 8 additions & 1 deletion lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Explorer.Backend.DataFrame do
@type ok_result() :: :ok | {:error, Exception.t()}
@type io_result(t) :: {:ok, t} | {:error, Exception.t()}

@type quote_style :: :necessary | :always | :non_numeric | :never
# Generic result
@type result(t) :: {:ok, t} | {:error, term()}

Expand Down Expand Up @@ -65,10 +66,16 @@ defmodule Explorer.Backend.DataFrame do
entry :: fs_entry(),
header? :: boolean(),
delimiter :: String.t(),
quote_style :: quote_style,
streaming :: boolean()
) ::
ok_result()
@callback dump_csv(df, header? :: boolean(), delimiter :: String.t()) :: io_result(binary())
@callback dump_csv(
df,
header? :: boolean(),
delimiter :: String.t(),
quote_style :: quote_style
) :: io_result(binary())

@callback load_csv(
contents :: String.t(),
Expand Down
25 changes: 22 additions & 3 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,9 @@ defmodule Explorer.DataFrame do
* `:delimiter` - A single character used to separate fields within a record. (default: `","`)
* `:quote_style` - The quoting style to use. Possible values are `:necessary`, `:always`, `:non_numeric`, and `:never`.
(default: `:necessary`)
* `:config` - An optional struct, keyword list or map, normally associated with remote
file systems. See [IO section](#module-io-operations) for more details. (default: `nil`)
Expand All @@ -664,13 +667,21 @@ defmodule Explorer.DataFrame do
@spec to_csv(df :: DataFrame.t(), filename :: fs_entry() | String.t(), opts :: Keyword.t()) ::
:ok | {:error, Exception.t()}
def to_csv(df, filename, opts \\ []) do
opts = Keyword.validate!(opts, header: true, delimiter: ",", streaming: true, config: nil)
opts =
Keyword.validate!(opts,
header: true,
delimiter: ",",
quote_style: :necessary,
streaming: true,
config: nil
)

with {:ok, entry} <- normalise_entry(filename, opts[:config]) do
Shared.apply_dataframe(df, :to_csv, [
entry,
opts[:header],
opts[:delimiter],
opts[:quote_style],
opts[:streaming]
])
end
Expand Down Expand Up @@ -702,6 +713,8 @@ defmodule Explorer.DataFrame do
* `:header` - Should the column names be written as the first line of the file? (default: `true`)
* `:delimiter` - A single character used to separate fields within a record. (default: `","`)
* `:quote_style` - The quoting style to use. Possible values are `:necessary`, `:always`, `:non_numeric`, and `:never`.
(default: `:necessary`)
## Examples
Expand All @@ -713,8 +726,14 @@ defmodule Explorer.DataFrame do
@spec dump_csv(df :: DataFrame.t(), opts :: Keyword.t()) ::
{:ok, String.t()} | {:error, Exception.t()}
def dump_csv(df, opts \\ []) do
opts = Keyword.validate!(opts, header: true, delimiter: ",")
Shared.apply_dataframe(df, :dump_csv, [opts[:header], opts[:delimiter]], false)
opts = Keyword.validate!(opts, header: true, delimiter: ",", quote_style: :necessary)

Shared.apply_dataframe(
df,
:dump_csv,
[opts[:header], opts[:delimiter], opts[:quote_style]],
false
)
end

@doc """
Expand Down
26 changes: 20 additions & 6 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,28 +145,42 @@ defmodule Explorer.PolarsBackend.DataFrame do
end

@impl true
def to_csv(%DataFrame{data: df}, %Local.Entry{} = entry, header?, delimiter, _streaming) do
def to_csv(
%DataFrame{data: df},
%Local.Entry{} = entry,
header?,
delimiter,
quote_style,
_streaming
) do
<<delimiter::utf8>> = delimiter

case Native.df_to_csv(df, entry.path, header?, delimiter) do
case Native.df_to_csv(df, entry.path, header?, delimiter, quote_style) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
def to_csv(%DataFrame{data: df}, %S3.Entry{} = entry, header?, delimiter, _streaming) do
def to_csv(
%DataFrame{data: df},
%S3.Entry{} = entry,
header?,
delimiter,
quote_style,
_streaming
) do
<<delimiter::utf8>> = delimiter

case Native.df_to_csv_cloud(df, entry, header?, delimiter) do
case Native.df_to_csv_cloud(df, entry, header?, delimiter, quote_style) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
def dump_csv(%DataFrame{} = df, header?, <<delimiter::utf8>>) do
case Native.df_dump_csv(df.data, header?, delimiter) do
def dump_csv(%DataFrame{} = df, header?, <<delimiter::utf8>>, quote_style) do
case Native.df_dump_csv(df.data, header?, delimiter, quote_style) do
{:ok, string} -> {:ok, string}
{:error, error} -> {:error, RuntimeError.exception(error)}
end
Expand Down
10 changes: 5 additions & 5 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -351,20 +351,20 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end

@impl true
def to_csv(%DF{} = ldf, %Local.Entry{} = entry, header?, delimiter, streaming) do
def to_csv(%DF{} = ldf, %Local.Entry{} = entry, header?, delimiter, quote_style, streaming) do
<<delimiter::utf8>> = delimiter

case Native.lf_to_csv(ldf.data, entry.path, header?, delimiter, streaming) do
case Native.lf_to_csv(ldf.data, entry.path, header?, delimiter, quote_style, streaming) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
def to_csv(%DF{} = ldf, %S3.Entry{} = entry, header?, delimiter, _streaming) do
def to_csv(%DF{} = ldf, %S3.Entry{} = entry, header?, delimiter, quote_style, _streaming) do
eager_df = compute(ldf)

Eager.to_csv(eager_df, entry, header?, delimiter, false)
Eager.to_csv(eager_df, entry, header?, delimiter, quote_style, false)
end

@impl true
Expand Down Expand Up @@ -639,7 +639,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
covariance: 3,
nil_count: 1,
dummies: 3,
dump_csv: 3,
dump_csv: 4,
dump_ipc: 2,
dump_ipc_stream: 2,
dump_ndjson: 1,
Expand Down
8 changes: 4 additions & 4 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_concat_columns(_dfs), do: err()
def df_drop(_df, _name), do: err()
def df_dtypes(_df), do: err()
def df_dump_csv(_df, _has_headers, _delimiter), do: err()
def df_dump_csv(_df, _has_headers, _delimiter, _quote_style), do: err()
def df_dump_ndjson(_df), do: err()
def df_dump_parquet(_df, _compression), do: err()
def df_dump_ipc(_df, _compression), do: err()
Expand Down Expand Up @@ -156,8 +156,8 @@ defmodule Explorer.PolarsBackend.Native do
def df_slice_by_indices(_df, _indices, _groups), do: err()
def df_slice_by_series(_df, _series, _groups), do: err()
def df_transpose(_df, _keep_names_as, _new_col_names), do: err()
def df_to_csv(_df, _filename, _has_headers, _delimiter), do: err()
def df_to_csv_cloud(_df, _ex_entry, _has_headers, _delimiter), do: err()
def df_to_csv(_df, _filename, _has_headers, _delimiter, _quote_style), do: err()
def df_to_csv_cloud(_df, _ex_entry, _has_headers, _delimiter, _quote_style), do: err()
def df_to_dummies(_df, _columns), do: err()
def df_to_ipc(_df, _filename, _compression), do: err()
def df_to_ipc_cloud(_df, _ex_entry, _compression), do: err()
Expand Down Expand Up @@ -274,7 +274,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_to_parquet_cloud(_df, _filename, _compression), do: err()
def lf_to_ipc(_df, _filename, _compression, _streaming), do: err()
def lf_to_ipc_cloud(_df, _cloud_entry, _compression), do: err()
def lf_to_csv(_df, _filename, _header, _delimiter, _streaming), do: err()
def lf_to_csv(_df, _filename, _header, _delimiter, _quote_style, _streaming), do: err()
def lf_sql(_df, _sql_string, _table_name), do: err()

# Series
Expand Down
9 changes: 8 additions & 1 deletion native/explorer/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use rustler::{Binary, Env, NewBinary};
use std::fs::File;
use std::io::{BufReader, BufWriter, Cursor};

use crate::datatypes::{ExParquetCompression, ExS3Entry, ExSeriesDtype};
use crate::datatypes::{ExParquetCompression, ExQuoteStyle, ExS3Entry, ExSeriesDtype};
use crate::{ExDataFrame, ExplorerError};

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -100,12 +100,14 @@ pub fn df_to_csv(
filename: &str,
include_headers: bool,
delimiter: u8,
quote_style: ExQuoteStyle,
) -> Result<(), ExplorerError> {
let file = File::create(filename)?;
let mut buf_writer = BufWriter::new(file);
CsvWriter::new(&mut buf_writer)
.include_header(include_headers)
.with_separator(delimiter)
.with_quote_style(quote_style.into())
.finish(&mut data.clone())?;
Ok(())
}
Expand All @@ -117,12 +119,14 @@ pub fn df_to_csv_cloud(
ex_entry: ExS3Entry,
include_headers: bool,
delimiter: u8,
quote_style: ExQuoteStyle,
) -> Result<(), ExplorerError> {
let mut cloud_writer = build_aws_s3_cloud_writer(ex_entry)?;

CsvWriter::new(&mut cloud_writer)
.include_header(include_headers)
.with_separator(delimiter)
.with_quote_style(quote_style.into())
.finish(&mut data.clone())?;

let _ = cloud_writer.finish()?;
Expand All @@ -136,12 +140,14 @@ pub fn df_dump_csv(
data: ExDataFrame,
include_headers: bool,
delimiter: u8,
quote_style: ExQuoteStyle,
) -> Result<Binary, ExplorerError> {
let mut buf = vec![];

CsvWriter::new(&mut buf)
.include_header(include_headers)
.with_separator(delimiter)
.with_quote_style(quote_style.into())
.finish(&mut data.clone())?;

let mut values_binary = NewBinary::new(env, buf.len());
Expand Down Expand Up @@ -675,6 +681,7 @@ pub fn df_to_csv_cloud(
_ex_entry: ExS3Entry,
_has_headers: bool,
_delimiter: u8,
_quote_style: ExQuoteStyle,
) -> Result<(), ExplorerError> {
Err(ExplorerError::Other("Explorer was compiled without the \"aws\" feature enabled. \
This is mostly due to this feature being incompatible with your computer's architecture. \
Expand Down
40 changes: 40 additions & 0 deletions native/explorer/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,14 @@ impl TryFrom<ExParquetCompression> for ParquetCompression {
}
}

#[derive(NifTaggedEnum)]
pub enum QuoteStyle {
Necessary,
Always,
NonNumeric,
Never,
}

// =========================
// ====== FSS Structs ======
// =========================
Expand Down Expand Up @@ -849,3 +857,35 @@ impl From<ExExpr> for Expr {
ex_expr.clone_inner()
}
}

use polars::prelude::QuoteStyle as PolarsQuoteStyle;

#[derive(NifTaggedEnum)]
pub enum ExQuoteStyle {
Necessary,
Always,
NonNumeric,
Never,
}

impl From<ExQuoteStyle> for PolarsQuoteStyle {
fn from(style: ExQuoteStyle) -> Self {
match style {
ExQuoteStyle::Necessary => PolarsQuoteStyle::Necessary,
ExQuoteStyle::Always => PolarsQuoteStyle::Always,
ExQuoteStyle::NonNumeric => PolarsQuoteStyle::NonNumeric,
ExQuoteStyle::Never => PolarsQuoteStyle::Never,
}
}
}

impl From<PolarsQuoteStyle> for ExQuoteStyle {
fn from(style: PolarsQuoteStyle) -> Self {
match style {
PolarsQuoteStyle::Necessary => ExQuoteStyle::Necessary,
PolarsQuoteStyle::Always => ExQuoteStyle::Always,
PolarsQuoteStyle::NonNumeric => ExQuoteStyle::NonNumeric,
PolarsQuoteStyle::Never => ExQuoteStyle::Never,
}
}
}
4 changes: 3 additions & 1 deletion native/explorer/src/lazyframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::BufWriter;
use std::num::NonZeroUsize;

use crate::dataframe::io::schema_from_dtypes_pairs;
use crate::datatypes::{ExParquetCompression, ExS3Entry, ExSeriesDtype};
use crate::datatypes::{ExParquetCompression, ExQuoteStyle, ExS3Entry, ExSeriesDtype};
use crate::{ExLazyFrame, ExplorerError};

#[rustler::nif]
Expand Down Expand Up @@ -256,6 +256,7 @@ pub fn lf_to_csv(
filename: &str,
include_headers: bool,
delimiter: u8,
quote_style: ExQuoteStyle,
streaming: bool,
) -> Result<(), ExplorerError> {
let lf = data.clone_inner();
Expand Down Expand Up @@ -283,6 +284,7 @@ pub fn lf_to_csv(
CsvWriter::new(&mut buf_writer)
.include_header(include_headers)
.with_separator(delimiter)
.with_quote_style(quote_style.into())
.finish(&mut df.clone())?;
Ok(())
}
Expand Down
Loading

0 comments on commit d2dfe83

Please sign in to comment.