diff --git a/Cargo.lock b/Cargo.lock index 8c7be4e69..ebd3e286a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4861,6 +4861,7 @@ dependencies = [ "vortex-runend-bool", "vortex-sampling-compressor", "vortex-scalar", + "vortex-scan", "vortex-zigzag", ] diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index d213fc35e..f9e701230 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -27,9 +27,10 @@ use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions}; +use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions}; use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; +use vortex::scan::Scan; use vortex::stream::ArrayStreamExt; use vortex::{ArrayData, IntoArrayData, IntoCanonical}; @@ -105,12 +106,14 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu async fn take_vortex( reader: T, - indices: &[u64], + _indices: &[u64], ) -> VortexResult { VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) .open(reader) .await? - .scan_rows(Scan::all(), indices.iter().copied())? + // FIXME(ngates): support row indices + // .scan_rows(Scan::all(), indices.iter().copied())? + .scan(Scan::all())? .into_array_data() .await? // For equivalence.... we decompress to make sure we're not cheating too much. diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 9df7e4519..8cf5f97f5 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -29,20 +29,6 @@ pub struct VortexFile { impl VortexFile {} -/// When the underling `R` is `Clone`, we can clone the `VortexFile`. -// TODO(ngates): remove Clone from VortexReadAt? -impl Clone for VortexFile { - fn clone(&self) -> Self { - Self { - ctx: self.ctx.clone(), - file_layout: self.file_layout.clone(), - segments: self.segments.clone(), - splits: self.splits.clone(), - thread_pool: self.thread_pool.clone(), - } - } -} - /// Async implementation of Vortex File. impl VortexFile { /// Returns the number of rows in the file. diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 862a344af..c48b6bd8e 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -102,6 +102,8 @@ impl VortexOpenOptions { file_layout, segments, splits, + // FIXME(ngates): we need some way to share the underlying I/O system. This + // segment cache + thread pool. thread_pool: Arc::new( rayon::ThreadPoolBuilder::new() .build() diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index b97d516b4..849f34fc9 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -43,6 +43,7 @@ vortex-runend = { workspace = true } vortex-runend-bool = { workspace = true } vortex-sampling-compressor = { workspace = true } vortex-scalar = { workspace = true, default-features = true } +vortex-scan = { workspace = true, default-features = true } vortex-zigzag = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 52c0c86e1..df7c858c5 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -4,6 +4,7 @@ pub use { vortex_error as error, vortex_expr as expr, vortex_file as file, vortex_flatbuffers as flatbuffers, vortex_io as io, vortex_ipc as ipc, vortex_proto as proto, vortex_sampling_compressor as sampling_compressor, vortex_scalar as scalar, + vortex_scan as scan, }; pub mod encodings {