Skip to content

Commit

Permalink
I/O driver
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Jan 10, 2025
1 parent 3bda2c3 commit 71cc555
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions};
use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::scan::Scan;
use vortex::stream::ArrayStreamExt;
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

Expand Down Expand Up @@ -105,12 +106,14 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu

async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
indices: &[u64],
_indices: &[u64],
) -> VortexResult<ArrayData> {
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
.open(reader)
.await?
.scan_rows(Scan::all(), indices.iter().copied())?
// FIXME(ngates): support row indices
// .scan_rows(Scan::all(), indices.iter().copied())?
.scan(Scan::all())?
.into_array_data()
.await?
// For equivalence.... we decompress to make sure we're not cheating too much.
Expand Down
14 changes: 0 additions & 14 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,6 @@ pub struct VortexFile<R> {

impl<R> VortexFile<R> {}

/// When the underling `R` is `Clone`, we can clone the `VortexFile`.
// TODO(ngates): remove Clone from VortexReadAt?
impl<R: Clone> Clone for VortexFile<R> {
fn clone(&self) -> Self {
Self {
ctx: self.ctx.clone(),
file_layout: self.file_layout.clone(),
segments: self.segments.clone(),
splits: self.splits.clone(),
thread_pool: self.thread_pool.clone(),
}
}
}

/// Async implementation of Vortex File.
impl<R: VortexReadAt + Unpin> VortexFile<R> {
/// Returns the number of rows in the file.
Expand Down
2 changes: 2 additions & 0 deletions vortex-file/src/v2/open/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl VortexOpenOptions {
file_layout,
segments,
splits,
// FIXME(ngates): we need some way to share the underlying I/O system. This
// segment cache + thread pool.
thread_pool: Arc::new(
rayon::ThreadPoolBuilder::new()
.build()
Expand Down
1 change: 1 addition & 0 deletions vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ vortex-runend = { workspace = true }
vortex-runend-bool = { workspace = true }
vortex-sampling-compressor = { workspace = true }
vortex-scalar = { workspace = true, default-features = true }
vortex-scan = { workspace = true, default-features = true }
vortex-zigzag = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
1 change: 1 addition & 0 deletions vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub use {
vortex_error as error, vortex_expr as expr, vortex_file as file,
vortex_flatbuffers as flatbuffers, vortex_io as io, vortex_ipc as ipc, vortex_proto as proto,
vortex_sampling_compressor as sampling_compressor, vortex_scalar as scalar,
vortex_scan as scan,
};

pub mod encodings {
Expand Down

0 comments on commit 71cc555

Please sign in to comment.