From 53599efc5afbb013630528f6d4324cc883e95f34 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 06:45:17 +0000 Subject: [PATCH 1/9] DataFusion Layouts --- Cargo.lock | 1 + vortex-datafusion/Cargo.toml | 1 + vortex-datafusion/src/persistent/cache.rs | 22 ++++++---- vortex-datafusion/src/persistent/execution.rs | 6 +-- vortex-datafusion/src/persistent/format.rs | 43 ++++++++++++------- vortex-datafusion/src/persistent/opener.rs | 4 +- vortex-file/src/v2/file.rs | 21 +++++++-- vortex-file/src/v2/footer/file_layout.rs | 17 +++++++- vortex-file/src/v2/footer/mod.rs | 2 +- vortex-file/src/v2/mod.rs | 2 +- vortex-file/src/v2/open.rs | 17 ++++++++ 11 files changed, 98 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 707e9d7d75..37ba55c815 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4966,6 +4966,7 @@ dependencies = [ "vortex-expr", "vortex-file", "vortex-io", + "vortex-layout", ] [[package]] diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 29e26bc1b3..2c40ae189e 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -42,6 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true, features = ["datafusion"] } vortex-file = { workspace = true, features = ["object_store"] } vortex-io = { workspace = true, features = ["object_store", "tokio"] } +vortex-layout = { workspace = true } [dev-dependencies] anyhow = { workspace = true } diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 7a6c174f22..7b5a497ce2 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -4,13 +4,15 @@ use chrono::{DateTime, Utc}; use moka::future::Cache; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use vortex_array::ContextRef; use vortex_error::{vortex_err, VortexError, VortexResult}; -use vortex_file::{read_initial_bytes, InitialRead}; +use vortex_file::v2::footer::FileLayout; +use vortex_file::v2::OpenOptions; use vortex_io::ObjectStoreReadAt; #[derive(Debug, Clone)] -pub struct InitialReadCache { - inner: Cache, +pub struct FileLayoutCache { + inner: Cache, } #[derive(Hash, Eq, PartialEq, Debug)] @@ -28,28 +30,30 @@ impl From<&ObjectMeta> for Key { } } -impl InitialReadCache { +impl FileLayoutCache { pub fn new(size_mb: usize) -> Self { let inner = Cache::builder() - .weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32) .max_capacity(size_mb as u64 * (2 << 20)) - .eviction_listener(|k, _v, cause| { + .eviction_listener(|k: Arc, _v, cause| { log::trace!("Removed {} due to {:?}", k.location, cause); }) .build(); Self { inner } } + pub async fn try_get( &self, object: &ObjectMeta, store: Arc, - ) -> VortexResult { + ) -> VortexResult { self.inner .try_get_with(Key::from(object), async { let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?; - VortexResult::Ok(initial_read) + let vxf = OpenOptions::new(ContextRef::default()) + .open(os_read_at) + .await?; + VortexResult::Ok(vxf.file_layout()) }) .await .map_err(|e: Arc| match Arc::try_unwrap(e) { diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 40514c1b90..94e28bdc60 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -13,7 +13,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Plan use itertools::Itertools; use vortex_array::ContextRef; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; use crate::persistent::opener::VortexFileOpener; #[derive(Debug, Clone)] @@ -24,7 +24,7 @@ pub struct VortexExec { plan_properties: PlanProperties, projected_statistics: Statistics, ctx: ContextRef, - initial_read_cache: InitialReadCache, + initial_read_cache: FileLayoutCache, } impl VortexExec { @@ -33,7 +33,7 @@ impl VortexExec { metrics: ExecutionPlanMetricsSet, predicate: Option>, ctx: ContextRef, - initial_read_cache: InitialReadCache, + initial_read_cache: FileLayoutCache, ) -> DFResult { let projected_schema = project_schema( &file_scan_config.file_schema, diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 8ab8d72248..50d5cc9b86 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -23,12 +23,13 @@ use vortex_array::arrow::infer_schema; use vortex_array::ContextRef; use vortex_error::VortexResult; use vortex_file::metadata::fetch_metadata; +use vortex_file::v2::OpenOptions; use vortex_file::{ LayoutContext, LayoutDeserializer, LayoutMessageCache, LayoutPath, Scan, VORTEX_FILE_EXTENSION, }; use vortex_io::{IoDispatcher, ObjectStoreReadAt}; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; use super::execution::VortexExec; use super::statistics::{array_to_col_statistics, uncompressed_col_size}; use crate::can_be_pushed_down; @@ -36,7 +37,7 @@ use crate::can_be_pushed_down; #[derive(Debug)] pub struct VortexFormat { context: ContextRef, - initial_read_cache: InitialReadCache, + file_layout_cache: FileLayoutCache, opts: VortexFormatOptions, } @@ -61,7 +62,7 @@ impl Default for VortexFormat { Self { context: Default::default(), - initial_read_cache: InitialReadCache::new(opts.cache_size_mb), + file_layout_cache: FileLayoutCache::new(opts.cache_size_mb), opts, } } @@ -72,7 +73,7 @@ impl VortexFormat { let opts = VortexFormatOptions::default(); Self { context, - initial_read_cache: InitialReadCache::new(opts.cache_size_mb), + file_layout_cache: FileLayoutCache::new(opts.cache_size_mb), opts, } } @@ -109,12 +110,10 @@ impl FileFormat for VortexFormat { let file_schemas = stream::iter(objects.iter().cloned()) .map(|o| { let store = store.clone(); - let cache = self.initial_read_cache.clone(); + let cache = self.file_layout_cache.clone(); async move { - let initial_read = cache.try_get(&o, store).await?; - let lazy_dtype = initial_read.lazy_dtype(); - let s = infer_schema(lazy_dtype.value()?)?; - + let file_layout = cache.try_get(&o, store).await?; + let s = infer_schema(file_layout.dtype())?; VortexResult::Ok(s) } }) @@ -134,22 +133,34 @@ impl FileFormat for VortexFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> DFResult { - let initial_read = self - .initial_read_cache + let file_layout = self + .file_layout_cache .try_get(object, store.clone()) .await?; - let layout = initial_read.fb_layout(); - let row_count = layout.row_count(); + // Re-open the vortex file using the cached file layout + let vxf = OpenOptions::new(self.context.clone()) + .with_file_layout(file_layout) + .open(ObjectStoreReadAt::new( + store.clone(), + object.location.clone(), + )) + .await?; + + // Now we have to compute the column statistics for the table. + // If we assume a top-level struct DType (which is true for a DataFusion/Vortex + // integration), then we need some way to ask the layouts to fetch and compute the stats. + + let row_count = file_layout.row_count(); let layout_deserializer = LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into()); let root_layout = layout_deserializer.read_layout( LayoutPath::default(), - initial_read.fb_layout(), + file_layout.fb_layout(), Scan::empty(), - initial_read.lazy_dtype().into(), + file_layout.lazy_dtype().into(), )?; let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); @@ -199,7 +210,7 @@ impl FileFormat for VortexFormat { metrics, filters.cloned(), self.context.clone(), - self.initial_read_cache.clone(), + self.file_layout_cache.clone(), )? .into_arc(); diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index b5ce6bfe96..8cf286ef76 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -13,7 +13,7 @@ use vortex_expr::RowFilter; use vortex_file::{LayoutContext, LayoutDeserializer, Projection, VortexReadBuilder}; use vortex_io::{IoDispatcher, ObjectStoreReadAt}; -use super::cache::InitialReadCache; +use super::cache::FileLayoutCache; /// Share an IO dispatcher across all DataFusion instances. static IO_DISPATCHER: LazyLock> = @@ -26,7 +26,7 @@ pub struct VortexFileOpener { pub projection: Option>, pub predicate: Option>, pub arrow_schema: SchemaRef, - pub(crate) initial_read_cache: InitialReadCache, + pub(crate) initial_read_cache: FileLayoutCache, } impl FileOpener for VortexFileOpener { diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 039b21f2d8..3ddce8011f 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -9,7 +9,7 @@ use vortex_io::VortexReadAt; use vortex_layout::scanner::{Poll, Scan}; use vortex_layout::{LayoutData, RowMask}; -use crate::v2::footer::Segment; +use crate::v2::footer::{FileLayout, Segment}; use crate::v2::segments::SegmentCache; pub struct VortexFile { @@ -20,18 +20,31 @@ pub struct VortexFile { pub(crate) segment_cache: SegmentCache, } -/// Async implementation of Vortex File. -impl VortexFile { +impl VortexFile { /// Returns the number of rows in the file. pub fn row_count(&self) -> u64 { self.layout.row_count() } - /// Returns the DType of the file. + /// Returns the [`DType`] of the file. pub fn dtype(&self) -> &DType { self.layout.dtype() } + /// Returns the [`FileLayout`] of the file. + /// + /// This can be passed to [`vortex_file::v2::VortexOpenOptions`] to reconstruct a + /// [`VortexFile`] without re-reading the footer. + pub fn file_layout(&self) -> FileLayout { + FileLayout { + root_layout: self.layout.clone(), + segments: self.segments.clone(), + } + } +} + +/// Async implementation of Vortex File. +impl VortexFile { /// Performs a scan operation over the file. pub fn scan(&self, scan: Scan) -> VortexResult { let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; diff --git a/vortex-file/src/v2/footer/file_layout.rs b/vortex-file/src/v2/footer/file_layout.rs index 03bb5cb984..32ab6b5793 100644 --- a/vortex-file/src/v2/footer/file_layout.rs +++ b/vortex-file/src/v2/footer/file_layout.rs @@ -1,15 +1,28 @@ +use vortex_dtype::DType; use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer}; use vortex_layout::LayoutData; use crate::v2::footer::segment::Segment; /// Captures the layout information of a Vortex file. -#[derive(Clone)] -pub(crate) struct FileLayout { +#[derive(Clone, Debug)] +pub struct FileLayout { pub(crate) root_layout: LayoutData, pub(crate) segments: Vec, } +impl FileLayout { + /// The [`DType`] of the file. + pub fn dtype(&self) -> &DType { + &self.root_layout.dtype() + } + + /// The row count of the file. + pub fn row_count(&self) -> u64 { + self.root_layout.row_count() + } +} + impl FlatBufferRoot for FileLayout {} impl WriteFlatBuffer for FileLayout { diff --git a/vortex-file/src/v2/footer/mod.rs b/vortex-file/src/v2/footer/mod.rs index a1448e0b5e..3e020e680f 100644 --- a/vortex-file/src/v2/footer/mod.rs +++ b/vortex-file/src/v2/footer/mod.rs @@ -2,6 +2,6 @@ mod file_layout; mod postscript; mod segment; -pub(crate) use file_layout::*; +pub use file_layout::*; pub(crate) use postscript::*; pub(crate) use segment::*; diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs index a380f37e2f..93f635d6b0 100644 --- a/vortex-file/src/v2/mod.rs +++ b/vortex-file/src/v2/mod.rs @@ -1,5 +1,5 @@ mod file; -mod footer; +pub mod footer; mod open; mod segments; mod strategy; diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs index 48906da71b..e821fcac6f 100644 --- a/vortex-file/src/v2/open.rs +++ b/vortex-file/src/v2/open.rs @@ -53,6 +53,12 @@ impl OpenOptions { self.initial_read_size = initial_read_size; Ok(self) } + + /// Configure a pre-existing file layout for the Vortex file. + pub fn with_file_layout(mut self, file_layout: FileLayout) -> Self { + self.file_layout = Some(file_layout); + self + } } impl OpenOptions { @@ -63,6 +69,17 @@ impl OpenOptions { /// Open the Vortex file using asynchronous IO. pub async fn open(self, read: R) -> VortexResult> { + // If we already have the file layout, we can skip the initial read entirely. + if let Some(file_layout) = self.file_layout { + return Ok(VortexFile { + read, + ctx: self.ctx.clone(), + layout: file_layout.root_layout, + segments: file_layout.segments, + segment_cache: Default::default(), + }); + } + // Fetch the file size and perform the initial read. let file_size = read.size().await?; let initial_read_size = self.initial_read_size.min(file_size); From e01cedcf42876c7a6de97e1ce7d28d5db05093d2 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 07:34:33 +0000 Subject: [PATCH 2/9] DataFusion Layouts --- .history | 11 ++ vortex-datafusion/src/persistent/execution.rs | 2 +- vortex-datafusion/src/persistent/format.rs | 143 ++++++++---------- vortex-datafusion/src/persistent/opener.rs | 87 +++++------ vortex-file/src/v2/file.rs | 10 +- vortex-file/src/v2/open.rs | 14 +- vortex-file/src/v2/tests.rs | 9 +- vortex-layout/src/segments/mod.rs | 1 + 8 files changed, 138 insertions(+), 139 deletions(-) create mode 100644 .history diff --git a/.history b/.history new file mode 100644 index 0000000000..f95c77bcde --- /dev/null +++ b/.history @@ -0,0 +1,11 @@ +#V2 +copy (select arrow_cast(1, 'Int8') as x) to '/tmp/foo.parquet'; +describe '/tmp/foo.parquet'; +explain select x = arrow_cast(10000, 'Int32') from '/tmp/foo.parquet'; +select x + 10000 from '/tmp/foo.parquet'; +explain select x + 10000 from '/tmp/foo.parquet'; +explain select x = 10000 from '/tmp/foo.parquet'; +explain select x = arrow_cast(10000, "Int32") from '/tmp/foo.parquet'; +explain select x = cast(10000 AS Int32) from '/tmp/foo.parquet'; +explain select x = cast(10000 AS Int) from '/tmp/foo.parquet'; +explain select x = cast(10000 AS int) from '/tmp/foo.parquet'; diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 94e28bdc60..233eb10182 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -122,7 +122,7 @@ impl ExecutionPlan for VortexExec { object_store, projection: self.file_scan_config.projection.clone(), predicate: self.predicate.clone(), - initial_read_cache: self.initial_read_cache.clone(), + file_layout_cache: self.initial_read_cache.clone(), arrow_schema, }; let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 50d5cc9b86..1f06a22a08 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -8,30 +8,20 @@ use datafusion::datasource::file_format::{FileFormat, FilePushdownSupport}; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::execution::SessionState; use datafusion_common::parsers::CompressionTypeVariant; -use datafusion_common::stats::Precision; -use datafusion_common::{ - not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, Statistics, -}; +use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics}; use datafusion_expr::Expr; use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; use futures::{stream, StreamExt as _, TryStreamExt as _}; use object_store::{ObjectMeta, ObjectStore}; -use vortex_array::array::StructArray; use vortex_array::arrow::infer_schema; use vortex_array::ContextRef; use vortex_error::VortexResult; -use vortex_file::metadata::fetch_metadata; -use vortex_file::v2::OpenOptions; -use vortex_file::{ - LayoutContext, LayoutDeserializer, LayoutMessageCache, LayoutPath, Scan, VORTEX_FILE_EXTENSION, -}; -use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use vortex_file::VORTEX_FILE_EXTENSION; use super::cache::FileLayoutCache; use super::execution::VortexExec; -use super::statistics::{array_to_col_statistics, uncompressed_col_size}; use crate::can_be_pushed_down; #[derive(Debug)] @@ -129,72 +119,73 @@ impl FileFormat for VortexFormat { async fn infer_stats( &self, _state: &SessionState, - store: &Arc, + _store: &Arc, table_schema: SchemaRef, - object: &ObjectMeta, + _object: &ObjectMeta, ) -> DFResult { - let file_layout = self - .file_layout_cache - .try_get(object, store.clone()) - .await?; - - // Re-open the vortex file using the cached file layout - let vxf = OpenOptions::new(self.context.clone()) - .with_file_layout(file_layout) - .open(ObjectStoreReadAt::new( - store.clone(), - object.location.clone(), - )) - .await?; - - // Now we have to compute the column statistics for the table. - // If we assume a top-level struct DType (which is true for a DataFusion/Vortex - // integration), then we need some way to ask the layouts to fetch and compute the stats. - - let row_count = file_layout.row_count(); - - let layout_deserializer = - LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into()); - - let root_layout = layout_deserializer.read_layout( - LayoutPath::default(), - file_layout.fb_layout(), - Scan::empty(), - file_layout.lazy_dtype().into(), - )?; - - let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let io = IoDispatcher::default(); - let mut stats = Statistics::new_unknown(&table_schema); - stats.num_rows = Precision::Exact(row_count as usize); - - let msgs = LayoutMessageCache::default(); - - if let Some(metadata_table) = - fetch_metadata(os_read_at, io.into(), root_layout, msgs).await? - { - let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); - let mut total_size = 0_u64; - - for col_stats in metadata_table.into_iter() { - let col_stats = match col_stats { - Some(array) => { - let col_metadata_array = StructArray::try_from(array)?; - let col_stats = array_to_col_statistics(&col_metadata_array)?; - - total_size += - uncompressed_col_size(&col_metadata_array)?.unwrap_or_default(); - col_stats - } - None => ColumnStatistics::new_unknown(), - }; - column_statistics.push(col_stats); - } - stats.column_statistics = column_statistics; - stats.total_byte_size = Precision::Inexact(total_size as usize); - } - - Ok(stats) + Ok(Statistics::new_unknown(&table_schema)) + // let file_layout = self + // .file_layout_cache + // .try_get(object, store.clone()) + // .await?; + // + // // Re-open the vortex file using the cached file layout + // let vxf = OpenOptions::new(self.context.clone()) + // .with_file_layout(file_layout) + // .open(ObjectStoreReadAt::new( + // store.clone(), + // object.location.clone(), + // )) + // .await?; + // + // // Now we have to compute the column statistics for the table. + // // If we assume a top-level struct DType (which is true for a DataFusion/Vortex + // // integration), then we need some way to ask the layouts to fetch and compute the stats. + // + // let row_count = file_layout.row_count(); + // + // let layout_deserializer = + // LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into()); + // + // let root_layout = layout_deserializer.read_layout( + // LayoutPath::default(), + // file_layout.fb_layout(), + // Scan::empty(), + // file_layout.lazy_dtype().into(), + // )?; + // + // let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); + // let io = IoDispatcher::default(); + // let mut stats = Statistics::new_unknown(&table_schema); + // stats.num_rows = Precision::Exact(row_count as usize); + // + // let msgs = LayoutMessageCache::default(); + // + // if let Some(metadata_table) = + // fetch_metadata(os_read_at, io.into(), root_layout, msgs).await? + // { + // let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); + // let mut total_size = 0_u64; + // + // for col_stats in metadata_table.into_iter() { + // let col_stats = match col_stats { + // Some(array) => { + // let col_metadata_array = StructArray::try_from(array)?; + // let col_stats = array_to_col_statistics(&col_metadata_array)?; + // + // total_size += + // uncompressed_col_size(&col_metadata_array)?.unwrap_or_default(); + // col_stats + // } + // None => ColumnStatistics::new_unknown(), + // }; + // column_statistics.push(col_stats); + // } + // stats.column_statistics = column_statistics; + // stats.total_byte_size = Precision::Inexact(total_size as usize); + // } + // + // Ok(stats) } async fn create_physical_plan( diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 8cf286ef76..bf3ed79eb8 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,24 +1,21 @@ -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion_common::Result as DFResult; -use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; +use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, StreamExt, TryStreamExt}; use object_store::ObjectStore; use vortex_array::ContextRef; use vortex_expr::datafusion::convert_expr_to_vortex; -use vortex_expr::RowFilter; -use vortex_file::{LayoutContext, LayoutDeserializer, Projection, VortexReadBuilder}; -use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use vortex_expr::Identity; +use vortex_file::v2::OpenOptions; +use vortex_io::ObjectStoreReadAt; +use vortex_layout::scanner::Scan; use super::cache::FileLayoutCache; -/// Share an IO dispatcher across all DataFusion instances. -static IO_DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::default())); - #[derive(Clone)] pub struct VortexFileOpener { pub ctx: ContextRef, @@ -26,61 +23,45 @@ pub struct VortexFileOpener { pub projection: Option>, pub predicate: Option>, pub arrow_schema: SchemaRef, - pub(crate) initial_read_cache: FileLayoutCache, + pub(crate) file_layout_cache: FileLayoutCache, } impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { let this = self.clone(); - let f = async move { - let read_at = - ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - let initial_read = this - .initial_read_cache - .try_get(&file_meta.object_meta, this.object_store.clone()) - .await?; - - let mut builder = VortexReadBuilder::new( - read_at, - LayoutDeserializer::new(this.ctx.clone(), Arc::new(LayoutContext::default())), - ) - .with_io_dispatcher(IO_DISPATCHER.clone()) - .with_file_size(file_meta.object_meta.size as u64) - .with_initial_read(initial_read); - // We split the predicate and filter out the conjunction members that we can't push down - let row_filter = this + // TODO(ngates): figure out how to map the column index projection into a projection expression. + let projection = Identity::new_expr(); + let scan = Scan { + projection, + filter: self .predicate .as_ref() - .map(|filter_expr| { - split_conjunction(filter_expr) - .into_iter() - .filter_map(|e| convert_expr_to_vortex(e.clone()).ok()) - .collect::>() - }) - .filter(|conjunction| !conjunction.is_empty()) - .map(RowFilter::from_conjunction); + .map(|expr| convert_expr_to_vortex(expr.clone())) + .transpose()?, + }; - if let Some(row_filter) = row_filter { - builder = builder.with_row_filter(row_filter); - } + Ok(async move { + let read_at = + ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - if let Some(projection) = this.projection.as_ref() { - builder = builder.with_projection(Projection::new(projection)); - } + let vxf = OpenOptions::new(this.ctx.clone()) + .with_file_size(file_meta.object_meta.size as u64) + .with_file_layout( + this.file_layout_cache + .try_get(&file_meta.object_meta, this.object_store.clone()) + .await?, + ) + .open(read_at) + .await?; - Ok(Box::pin( - builder - .build() - .await? - .into_stream() - .map_ok(RecordBatch::try_from) - .map(|r| r.and_then(|inner| inner)) - .map_err(|e| e.into()), - ) as _) + Ok(vxf + .scan(scan)? + .map_ok(RecordBatch::try_from) + .map(|r| r.and_then(|inner| inner)) + .map_err(|e| e.into()) + .boxed()) } - .boxed(); - - Ok(f) + .boxed()) } } diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 3ddce8011f..7ab7a6e59d 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -46,7 +46,7 @@ impl VortexFile { /// Async implementation of Vortex File. impl VortexFile { /// Performs a scan operation over the file. - pub fn scan(&self, scan: Scan) -> VortexResult { + pub fn scan(self, scan: Scan) -> VortexResult { let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; let scan_dtype = layout_scan.dtype().clone(); @@ -57,17 +57,19 @@ impl VortexFile { let mut scanner = layout_scan.create_scanner(row_mask)?; let stream = stream::once(async move { + let segment_cache = self.segment_cache.clone(); + let segments = self.segments.clone(); loop { - match scanner.poll(&self.segment_cache)? { + match scanner.poll(&segment_cache)? { Poll::Some(array) => return Ok(array), Poll::NeedMore(segment_ids) => { for segment_id in segment_ids { - let segment = &self.segments[*segment_id as usize]; + let segment = &segments[*segment_id as usize]; let bytes = self .read .read_byte_range(segment.offset, segment.length as u64) .await?; - self.segment_cache.set(segment_id, bytes); + segment_cache.set(segment_id, bytes); } } } diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs index e821fcac6f..351dca1957 100644 --- a/vortex-file/src/v2/open.rs +++ b/vortex-file/src/v2/open.rs @@ -24,6 +24,8 @@ pub struct OpenOptions { ctx: ContextRef, /// The Vortex Layout encoding context. layout_ctx: LayoutContextRef, + /// An optional, externally provided, file size. + file_size: Option, /// An optional, externally provided, file layout. file_layout: Option, /// An optional, externally provided, dtype. @@ -39,6 +41,7 @@ impl OpenOptions { Self { ctx, layout_ctx: LayoutContextRef::default(), + file_size: None, file_layout: None, dtype: None, initial_read_size: INITIAL_READ_SIZE, @@ -54,6 +57,12 @@ impl OpenOptions { Ok(self) } + /// Configure a known file size for the Vortex file. + pub fn with_file_size(mut self, file_size: u64) -> Self { + self.file_size = Some(file_size); + self + } + /// Configure a pre-existing file layout for the Vortex file. pub fn with_file_layout(mut self, file_layout: FileLayout) -> Self { self.file_layout = Some(file_layout); @@ -81,7 +90,10 @@ impl OpenOptions { } // Fetch the file size and perform the initial read. - let file_size = read.size().await?; + let file_size = match self.file_size { + None => read.size().await?, + Some(file_size) => file_size, + }; let initial_read_size = self.initial_read_size.min(file_size); let initial_offset = file_size - initial_read_size; let initial_read: ByteBuffer = read diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs index 4718e33251..70d301644b 100644 --- a/vortex-file/src/v2/tests.rs +++ b/vortex-file/src/v2/tests.rs @@ -15,14 +15,15 @@ async fn write_read() { ]) .into_array(); - let written = WriteOptions::default() + let written: Bytes = WriteOptions::default() .write_async(vec![], arr.into_array_stream()) .await - .unwrap(); + .unwrap() + // TODO(ngates): no need to wrap Vec in Bytes if VortexReadAt doesn't require clone. + .into(); - // TODO(ngates): no need to wrap Vec in Bytes if VortexReadAt doesn't require clone. let vxf = OpenOptions::new(ContextRef::default()) - .open(Bytes::from(written)) + .open(written) .await .unwrap(); diff --git a/vortex-layout/src/segments/mod.rs b/vortex-layout/src/segments/mod.rs index 8b205e2632..276abe7886 100644 --- a/vortex-layout/src/segments/mod.rs +++ b/vortex-layout/src/segments/mod.rs @@ -27,6 +27,7 @@ pub trait SegmentReader { /// Attempt to get the data associated with a given segment ID. /// /// If the segment ID is not found, `None` is returned. + // TODO(ngates): we should probably take Alignment and return ByteBuffer here. fn get(&self, id: SegmentId) -> Option; } From 3c9f28594ce4648c90ccd3acb7254a430494f58a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 08:49:00 +0000 Subject: [PATCH 3/9] DataFusion Layouts --- bench-vortex/src/clickbench.rs | 23 ++++---- bench-vortex/src/reader.rs | 63 ++++++++-------------- bench-vortex/src/tpch/mod.rs | 9 ++-- vortex-datafusion/src/persistent/cache.rs | 4 +- vortex-datafusion/src/persistent/format.rs | 10 ++++ vortex-datafusion/src/persistent/opener.rs | 22 ++++++-- vortex-file/src/v2/file.rs | 28 +++++++++- vortex-file/src/v2/mod.rs | 2 + vortex-file/src/v2/open.rs | 6 +-- vortex-file/src/v2/tests.rs | 8 +-- vortex-file/src/v2/writer.rs | 10 ++-- 11 files changed, 111 insertions(+), 74 deletions(-) diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index ad73f0056e..e1a51c8076 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -7,13 +7,16 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::executor::block_on; +use itertools::Itertools; +use rayon::prelude::*; use tokio::fs::{create_dir_all, OpenOptions}; use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; use vortex::dtype::DType; use vortex::error::vortex_err; -use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION}; +use vortex::file::v2::VortexWriteOptions; +use vortex::file::VORTEX_FILE_EXTENSION; use vortex::sampling_compressor::SamplingCompressor; use vortex::variants::StructArrayTrait; use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -149,7 +152,9 @@ pub async fn register_vortex_files( let vortex_dir = input_path.join("vortex"); create_dir_all(&vortex_dir).await?; - stream::iter(0..100) + (0..100) + .collect_vec() + .par_iter() .map(|idx| { let parquet_file_path = input_path .join("parquet") @@ -158,7 +163,7 @@ pub async fn register_vortex_files( let session = session.clone(); let schema = schema.clone(); - tokio::spawn(async move { + block_on(async move { let output_path = output_path.clone(); idempotent_async(&output_path, move |vtx_file| async move { eprintln!("Processing file {idx}"); @@ -219,9 +224,9 @@ pub async fn register_vortex_files( .open(&vtx_file) .await?; - let mut writer = VortexFileWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + VortexWriteOptions::default() + .write(f, data.into_array_stream()) + .await?; anyhow::Ok(()) }) @@ -229,9 +234,7 @@ pub async fn register_vortex_files( .expect("Failed to write Vortex file") }) }) - .buffered(16) - .try_collect::>() - .await?; + .collect::>(); let format = Arc::new(VortexFormat::new(CTX.clone())); let table_path = vortex_dir diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 9fc1e4441c..d213fc35e5 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::ops::Range; use std::path::{Path, PathBuf}; use std::process::Command; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use arrow_array::types::Int64Type; use arrow_array::{ @@ -24,18 +24,15 @@ use stream::StreamExt; use vortex::aliases::hash_map::HashMap; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; -use vortex::buffer::Buffer; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder}; -use vortex::io::{IoDispatcher, ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; +use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions}; +use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; +use vortex::stream::ArrayStreamExt; use vortex::{ArrayData, IntoArrayData, IntoCanonical}; -static DISPATCHER: LazyLock> = - LazyLock::new(|| Arc::new(IoDispatcher::default())); - pub const BATCH_SIZE: usize = 65_536; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -48,19 +45,12 @@ pub struct VortexFooter { pub async fn open_vortex(path: &Path) -> VortexResult { let file = TokioFile::open(path).unwrap(); - VortexReadBuilder::new( - file, - LayoutDeserializer::new( - ALL_ENCODINGS_CONTEXT.clone(), - LayoutContext::default().into(), - ), - ) - .with_io_dispatcher(DISPATCHER.clone()) - .build() - .await? - .into_stream() - .read_all() - .await + VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) + .open(file) + .await? + .scan(Scan::all())? + .into_array_data() + .await } pub async fn rewrite_parquet_as_vortex( @@ -69,11 +59,10 @@ pub async fn rewrite_parquet_as_vortex( ) -> VortexResult<()> { let chunked = compress_parquet_to_vortex(parquet_path.as_path())?; - VortexFileWriter::new(write) - .write_array_columns(chunked) - .await? - .finalize() + VortexWriteOptions::default() + .write(write, chunked.into_array_stream()) .await?; + Ok(()) } @@ -118,23 +107,15 @@ async fn take_vortex( reader: T, indices: &[u64], ) -> VortexResult { - VortexReadBuilder::new( - reader, - LayoutDeserializer::new( - ALL_ENCODINGS_CONTEXT.clone(), - LayoutContext::default().into(), - ), - ) - .with_io_dispatcher(DISPATCHER.clone()) - .with_indices(Buffer::copy_from(indices).into_array()) - .build() - .await? - .into_stream() - .read_all() - .await - // For equivalence.... we decompress to make sure we're not cheating too much. - .and_then(IntoCanonical::into_canonical) - .map(ArrayData::from) + VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) + .open(reader) + .await? + .scan_rows(Scan::all(), indices.iter().copied())? + .into_array_data() + .await? + // For equivalence.... we decompress to make sure we're not cheating too much. + .into_canonical() + .map(ArrayData::from) } pub async fn take_vortex_object_store( diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 3dabb7bb97..bbd4b4d5e7 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -16,7 +16,7 @@ use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; use vortex::arrow::FromArrowArray; use vortex::dtype::DType; -use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION}; +use vortex::file::VORTEX_FILE_EXTENSION; use vortex::sampling_compressor::SamplingCompressor; use vortex::variants::StructArrayTrait; use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -31,6 +31,7 @@ mod execute; pub mod schema; pub use execute::*; +use vortex::file::v2::VortexWriteOptions; pub const EXPECTED_ROW_COUNTS: [usize; 23] = [ 0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7, @@ -275,9 +276,9 @@ async fn register_vortex_file( .open(&vtx_file) .await?; - let mut writer = VortexFileWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + VortexWriteOptions::default() + .write(f, data.into_array_stream()) + .await?; anyhow::Ok(()) }) diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 7b5a497ce2..4057afd9dd 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -7,7 +7,7 @@ use object_store::{ObjectMeta, ObjectStore}; use vortex_array::ContextRef; use vortex_error::{vortex_err, VortexError, VortexResult}; use vortex_file::v2::footer::FileLayout; -use vortex_file::v2::OpenOptions; +use vortex_file::v2::VortexOpenOptions; use vortex_io::ObjectStoreReadAt; #[derive(Debug, Clone)] @@ -50,7 +50,7 @@ impl FileLayoutCache { self.inner .try_get_with(Key::from(object), async { let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let vxf = OpenOptions::new(ContextRef::default()) + let vxf = VortexOpenOptions::new(ContextRef::default()) .open(os_read_at) .await?; VortexResult::Ok(vxf.file_layout()) diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 1f06a22a08..bc312d1968 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -194,6 +194,8 @@ impl FileFormat for VortexFormat { file_scan_config: FileScanConfig, filters: Option<&Arc>, ) -> DFResult> { + println!("PHYSICAL PLAN FILTERS {:?}", filters); + let _filters = format!("{:?}", filters); let metrics = ExecutionPlanMetricsSet::new(); let exec = VortexExec::try_new( @@ -224,6 +226,14 @@ impl FileFormat for VortexFormat { table_schema: &Schema, filters: &[&Expr], ) -> DFResult { + for filter in filters { + println!( + "FILTER {} {}", + filter, + can_be_pushed_down(filter, table_schema) + ); + } + let is_pushdown = filters .iter() .all(|expr| can_be_pushed_down(expr, table_schema)); diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index bf3ed79eb8..6d5f204ee6 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -7,10 +7,11 @@ use datafusion_common::Result as DFResult; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, StreamExt, TryStreamExt}; use object_store::ObjectStore; -use vortex_array::ContextRef; +use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; +use vortex_dtype::field::Field; use vortex_expr::datafusion::convert_expr_to_vortex; use vortex_expr::Identity; -use vortex_file::v2::OpenOptions; +use vortex_file::v2::VortexOpenOptions; use vortex_io::ObjectStoreReadAt; use vortex_layout::scanner::Scan; @@ -30,7 +31,8 @@ impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { let this = self.clone(); - // TODO(ngates): figure out how to map the column index projection into a projection expression. + // FIXME(ngates): figure out how to map the column index projection into a projection expression. + // For now, we select columns later. let projection = Identity::new_expr(); let scan = Scan { projection, @@ -45,7 +47,7 @@ impl FileOpener for VortexFileOpener { let read_at = ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); - let vxf = OpenOptions::new(this.ctx.clone()) + let vxf = VortexOpenOptions::new(this.ctx.clone()) .with_file_size(file_meta.object_meta.size as u64) .with_file_layout( this.file_layout_cache @@ -55,8 +57,20 @@ impl FileOpener for VortexFileOpener { .open(read_at) .await?; + let vortex_projection: Option> = this + .projection + .map(|p| p.iter().map(|idx| Field::Index(*idx)).collect()); + Ok(vxf .scan(scan)? + .map_ok(move |array| { + if let Some(projection) = &vortex_projection { + Ok(array.into_struct()?.project(&projection)?.into_array()) + } else { + Ok(array) + } + }) + .map(|r| r.and_then(|inner| inner)) .map_ok(RecordBatch::try_from) .map(|r| r.and_then(|inner| inner)) .map_err(|e| e.into()) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 7ab7a6e59d..217513684f 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -1,6 +1,7 @@ use std::io::Read; use futures_util::stream; +use vortex_array::compute::FilterMask; use vortex_array::stream::{ArrayStream, ArrayStreamAdapter}; use vortex_array::ContextRef; use vortex_dtype::DType; @@ -47,13 +48,38 @@ impl VortexFile { impl VortexFile { /// Performs a scan operation over the file. pub fn scan(self, scan: Scan) -> VortexResult { + let row_count = self.row_count(); + self.scan_range(scan, RowMask::new_valid_between(0, row_count)) + } + + /// Performs a scan operation over the file. + pub fn scan_rows>( + self, + scan: Scan, + indices: I, + ) -> VortexResult { + let row_count = self.row_count(); + + // TODO(ngates): do we only support "take" over usize rows? + let filter_mask = FilterMask::from_indices( + usize::try_from(row_count).expect("row count is too large for usize"), + indices + .into_iter() + .map(|i| usize::try_from(i).expect("row index is too large for usize")), + ); + let row_mask = RowMask::try_new(filter_mask, 0, row_count)?; + + self.scan_range(scan, row_mask) + } + + /// Performs a scan operation over a [`RowMask`] of the file. + fn scan_range(self, scan: Scan, row_mask: RowMask) -> VortexResult { let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; let scan_dtype = layout_scan.dtype().clone(); // TODO(ngates): we could query the layout for splits and then process them in parallel. // For now, we just scan the entire layout with one mask. // Note that to implement this we would use stream::try_unfold - let row_mask = RowMask::new_valid_between(0, layout_scan.layout().row_count()); let mut scanner = layout_scan.create_scanner(row_mask)?; let stream = stream::once(async move { diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs index 93f635d6b0..571191ddd7 100644 --- a/vortex-file/src/v2/mod.rs +++ b/vortex-file/src/v2/mod.rs @@ -9,4 +9,6 @@ mod writer; pub use file::*; pub use open::*; +// TODO(ngates): probably can separate these APIs? For now, re-export the Scan. +pub use vortex_layout::scanner::Scan; pub use writer::*; diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs index 351dca1957..35baa00714 100644 --- a/vortex-file/src/v2/open.rs +++ b/vortex-file/src/v2/open.rs @@ -19,7 +19,7 @@ use crate::{EOF_SIZE, MAGIC_BYTES, VERSION}; const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB /// Open options for a Vortex file reader. -pub struct OpenOptions { +pub struct VortexOpenOptions { /// The Vortex Array encoding context. ctx: ContextRef, /// The Vortex Layout encoding context. @@ -36,7 +36,7 @@ pub struct OpenOptions { initial_read_size: u64, } -impl OpenOptions { +impl VortexOpenOptions { pub fn new(ctx: ContextRef) -> Self { Self { ctx, @@ -70,7 +70,7 @@ impl OpenOptions { } } -impl OpenOptions { +impl VortexOpenOptions { /// Open the Vortex file using synchronous IO. pub fn open_sync(self, _read: R) -> VortexResult> { todo!() diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs index 70d301644b..734e265a11 100644 --- a/vortex-file/src/v2/tests.rs +++ b/vortex-file/src/v2/tests.rs @@ -5,7 +5,7 @@ use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; use vortex_buffer::buffer; use vortex_layout::scanner::Scan; -use crate::v2::{OpenOptions, WriteOptions}; +use crate::v2::{VortexOpenOptions, VortexWriteOptions}; #[tokio::test] async fn write_read() { @@ -15,14 +15,14 @@ async fn write_read() { ]) .into_array(); - let written: Bytes = WriteOptions::default() - .write_async(vec![], arr.into_array_stream()) + let written: Bytes = VortexWriteOptions::default() + .write(vec![], arr.into_array_stream()) .await .unwrap() // TODO(ngates): no need to wrap Vec in Bytes if VortexReadAt doesn't require clone. .into(); - let vxf = OpenOptions::new(ContextRef::default()) + let vxf = VortexOpenOptions::new(ContextRef::default()) .open(written) .await .unwrap(); diff --git a/vortex-file/src/v2/writer.rs b/vortex-file/src/v2/writer.rs index b1706c7ebc..1ba763cf1d 100644 --- a/vortex-file/src/v2/writer.rs +++ b/vortex-file/src/v2/writer.rs @@ -13,11 +13,11 @@ use crate::v2::segments::BufferedSegmentWriter; use crate::v2::strategy::VortexLayoutStrategy; use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION}; -pub struct WriteOptions { +pub struct VortexWriteOptions { strategy: Box, } -impl Default for WriteOptions { +impl Default for VortexWriteOptions { fn default() -> Self { Self { strategy: Box::new(VortexLayoutStrategy), @@ -25,7 +25,7 @@ impl Default for WriteOptions { } } -impl WriteOptions { +impl VortexWriteOptions { /// Replace the default layout strategy with the provided one. pub fn with_strategy(mut self, strategy: Box) -> Self { self.strategy = strategy; @@ -33,14 +33,14 @@ impl WriteOptions { } } -impl WriteOptions { +impl VortexWriteOptions { /// Perform a blocking write of the provided iterator of `ArrayData`. pub fn write_sync(self, _write: W, _iter: I) -> VortexResult<()> { todo!() } /// Perform an async write of the provided stream of `ArrayData`. - pub async fn write_async( + pub async fn write( self, write: W, mut stream: S, From 6f4b711718539514c8093d61bd1236ee983a76e5 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 09:15:23 +0000 Subject: [PATCH 4/9] DataFusion Layouts --- vortex-file/src/v2/file.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 217513684f..138c45b8ed 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -69,6 +69,20 @@ impl VortexFile { ); let row_mask = RowMask::try_new(filter_mask, 0, row_count)?; + // TODO(ngates): support thread pool execution. + // The plan is to have OpenOptions configure a Rayon ThreadPool for reading. We would + // par_iter each of the row masks (based on configured split by size or row count), + // launching their `poll` operation onto the thread pool. If a task returns NeedMore, + // then the segment IDs are handed to the IO dispatcher and a synchronous latch is + // returned. The IO dispatcher has visibility into all requested segments and can perform + // coalescing over ranges. Once a coalesced read returns, the dispatcher updates the + // segment cache with all read segments (including those that were incidentally read by + // in-between the coalesced ranges). A map of segment IDs -> set then provides + // a way for the dispatcher to notify the waiting tasks that their data is ready. When + // finished, the tasks push their results in order onto a channel that acts as the + // ArrayStream. + // This keeps I/O on the current thread (using the caller's existing runtime), while still + // enabling a CPU pool for decompression and filtering. self.scan_range(scan, row_mask) } From f1e11d1b677dcbc7cf2a1df4d800ee46955d8ff0 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Jan 2025 09:20:40 +0000 Subject: [PATCH 5/9] DataFusion Layouts --- vortex-file/src/v2/file.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 138c45b8ed..d75e7d2cfe 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -49,6 +49,22 @@ impl VortexFile { /// Performs a scan operation over the file. pub fn scan(self, scan: Scan) -> VortexResult { let row_count = self.row_count(); + + // TODO(ngates): support thread pool execution. + // The plan is to have OpenOptions configure a Rayon ThreadPool for reading. We would + // par_iter each of the row masks (based on configured split by size or row count), + // launching their `poll` operation onto the thread pool. If a task returns NeedMore, + // then the segment IDs are handed to the IO dispatcher and a synchronous latch is + // returned. The IO dispatcher has visibility into all requested segments and can perform + // coalescing over ranges. Once a coalesced read returns, the dispatcher updates the + // segment cache with all read segments (including those that were incidentally read by + // in-between the coalesced ranges). A map of segment IDs -> set then provides + // a way for the dispatcher to notify the waiting tasks that their data is ready. When + // finished, the tasks push their results in order onto a channel that acts as the + // ArrayStream. + // This keeps I/O on the current thread (using the caller's existing runtime), while still + // enabling a CPU pool for decompression and filtering. + self.scan_range(scan, RowMask::new_valid_between(0, row_count)) } @@ -69,20 +85,6 @@ impl VortexFile { ); let row_mask = RowMask::try_new(filter_mask, 0, row_count)?; - // TODO(ngates): support thread pool execution. - // The plan is to have OpenOptions configure a Rayon ThreadPool for reading. We would - // par_iter each of the row masks (based on configured split by size or row count), - // launching their `poll` operation onto the thread pool. If a task returns NeedMore, - // then the segment IDs are handed to the IO dispatcher and a synchronous latch is - // returned. The IO dispatcher has visibility into all requested segments and can perform - // coalescing over ranges. Once a coalesced read returns, the dispatcher updates the - // segment cache with all read segments (including those that were incidentally read by - // in-between the coalesced ranges). A map of segment IDs -> set then provides - // a way for the dispatcher to notify the waiting tasks that their data is ready. When - // finished, the tasks push their results in order onto a channel that acts as the - // ArrayStream. - // This keeps I/O on the current thread (using the caller's existing runtime), while still - // enabling a CPU pool for decompression and filtering. self.scan_range(scan, row_mask) } From 42c1373331a0ca1c944b481f8b942f5d8c0d56bf Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 15:43:03 +0000 Subject: [PATCH 6/9] merge --- Cargo.lock | 2 +- vortex-datafusion/Cargo.toml | 2 +- vortex-datafusion/src/persistent/cache.rs | 2 +- vortex-datafusion/src/persistent/opener.rs | 50 +++++++++++----------- vortex-expr/src/identity.rs | 6 ++- vortex-file/src/v2/file.rs | 41 ++++++++++++++---- vortex-file/src/v2/mod.rs | 2 - vortex-file/src/v2/open/mod.rs | 12 +++--- vortex-file/src/v2/tests.rs | 2 +- 9 files changed, 72 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d22a70689..f34ef806f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4979,7 +4979,7 @@ dependencies = [ "vortex-expr", "vortex-file", "vortex-io", - "vortex-layout", + "vortex-scan", ] [[package]] diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 2c40ae189e..7347474a15 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -42,7 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true, features = ["datafusion"] } vortex-file = { workspace = true, features = ["object_store"] } vortex-io = { workspace = true, features = ["object_store", "tokio"] } -vortex-layout = { workspace = true } +vortex-scan = { workspace = true } [dev-dependencies] anyhow = { workspace = true } diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 4057afd9dd..099dd960d2 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -53,7 +53,7 @@ impl FileLayoutCache { let vxf = VortexOpenOptions::new(ContextRef::default()) .open(os_read_at) .await?; - VortexResult::Ok(vxf.file_layout()) + VortexResult::Ok(vxf.file_layout().clone()) }) .await .map_err(|e: Arc| match Arc::try_unwrap(e) { diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 6d5f204ee6..f81a3465ad 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -7,13 +7,13 @@ use datafusion_common::Result as DFResult; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, StreamExt, TryStreamExt}; use object_store::ObjectStore; -use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; -use vortex_dtype::field::Field; +use vortex_array::ContextRef; +use vortex_dtype::Field; use vortex_expr::datafusion::convert_expr_to_vortex; -use vortex_expr::Identity; +use vortex_expr::{Identity, Select, SelectField}; use vortex_file::v2::VortexOpenOptions; use vortex_io::ObjectStoreReadAt; -use vortex_layout::scanner::Scan; +use vortex_scan::Scan; use super::cache::FileLayoutCache; @@ -31,22 +31,32 @@ impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { let this = self.clone(); - // FIXME(ngates): figure out how to map the column index projection into a projection expression. - // For now, we select columns later. - let projection = Identity::new_expr(); - let scan = Scan { + // Construct the projection expression based on the DataFusion projection mask. + // Each index in the mask corresponds to the field position of the root DType. + let projection = self + .projection + .as_ref() + .map(|fields| { + Select::new_expr( + SelectField::Include(fields.iter().map(|idx| Field::Index(*idx)).collect()), + Identity::new_expr(), + ) + }) + .unwrap_or_else(|| Identity::new_expr()); + + let scan = Scan::new( projection, - filter: self - .predicate + self.predicate .as_ref() .map(|expr| convert_expr_to_vortex(expr.clone())) .transpose()?, - }; + ) + .into_arc(); - Ok(async move { - let read_at = - ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); + let read_at = + ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); + Ok(async move { let vxf = VortexOpenOptions::new(this.ctx.clone()) .with_file_size(file_meta.object_meta.size as u64) .with_file_layout( @@ -57,20 +67,8 @@ impl FileOpener for VortexFileOpener { .open(read_at) .await?; - let vortex_projection: Option> = this - .projection - .map(|p| p.iter().map(|idx| Field::Index(*idx)).collect()); - Ok(vxf .scan(scan)? - .map_ok(move |array| { - if let Some(projection) = &vortex_projection { - Ok(array.into_struct()?.project(&projection)?.into_array()) - } else { - Ok(array) - } - }) - .map(|r| r.and_then(|inner| inner)) .map_ok(RecordBatch::try_from) .map(|r| r.and_then(|inner| inner)) .map_err(|e| e.into()) diff --git a/vortex-expr/src/identity.rs b/vortex-expr/src/identity.rs index 72aa4bc32a..933fb1eca4 100644 --- a/vortex-expr/src/identity.rs +++ b/vortex-expr/src/identity.rs @@ -1,18 +1,20 @@ use std::any::Any; use std::fmt::Display; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use vortex_array::ArrayData; use vortex_error::VortexResult; use crate::{ExprRef, VortexExpr}; +static IDENTITY: LazyLock = LazyLock::new(|| Identity::new_expr()); + #[derive(Debug, PartialEq, Eq, Hash)] pub struct Identity; impl Identity { pub fn new_expr() -> ExprRef { - Arc::new(Identity) + IDENTITY.clone() } } diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index cdb8c5f10b..4850088f39 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -10,14 +10,15 @@ use vortex_array::ContextRef; use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_io::VortexReadAt; -use vortex_layout::{ExprEvaluator, LayoutData, LayoutReader}; +use vortex_layout::{ExprEvaluator, LayoutReader}; use vortex_scan::Scan; +use crate::v2::footer::FileLayout; use crate::v2::segments::cache::SegmentCache; pub struct VortexFile { pub(crate) ctx: ContextRef, - pub(crate) layout: LayoutData, + pub(crate) file_layout: FileLayout, pub(crate) segments: Arc>, // TODO(ngates): not yet used by the file reader #[allow(dead_code)] @@ -26,23 +27,42 @@ pub struct VortexFile { impl VortexFile {} +/// When the underling `R` is `Clone`, we can clone the `VortexFile`. +// TODO(ngates): remove Clone from VortexReadAt? +impl Clone for VortexFile { + fn clone(&self) -> Self { + Self { + ctx: self.ctx.clone(), + file_layout: self.file_layout.clone(), + segments: self.segments.clone(), + splits: self.splits.clone(), + } + } +} + /// Async implementation of Vortex File. impl VortexFile { /// Returns the number of rows in the file. pub fn row_count(&self) -> u64 { - self.layout.row_count() + self.file_layout.row_count() } /// Returns the DType of the file. pub fn dtype(&self) -> &DType { - self.layout.dtype() + self.file_layout.dtype() } - /// Performs a scan operation over the file. - pub fn scan(&self, scan: Arc) -> VortexResult { + /// Returns the [`FileLayout`] of the file. + pub fn file_layout(&self) -> &FileLayout { + &self.file_layout + } + + /// Performs a scan operation over the file. (TODO(ngates): remove the `Send`) + pub fn scan(&self, scan: Arc) -> VortexResult { // Create a shared reader for the scan. let reader: Arc = self - .layout + .file_layout + .root_layout .reader(self.segments.clone(), self.ctx.clone())?; let result_dtype = scan.result_dtype(self.dtype())?; // For each row-group, we set up a future that will evaluate the scan and post its. @@ -52,12 +72,13 @@ impl VortexFile { // Note that to implement this we would use stream::try_unfold let stream = stream::once(async move { // TODO(ngates): we should launch the evaluate_async onto a worker thread pool. - let row_range = 0..self.layout.row_count(); + let row_range = 0..self.row_count(); let eval = scan .range_scan(row_range)? .evaluate_async(|row_mask, expr| reader.evaluate_expr(row_mask, expr)); pin_mut!(eval); + send(eval); poll_fn(|cx| { // Now we alternate between polling the eval task and driving the I/O. @@ -79,3 +100,7 @@ impl VortexFile { Ok(ArrayStreamAdapter::new(result_dtype, stream)) } } + +fn send(t: T) -> T { + t +} diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs index 571191ddd7..93f635d6b0 100644 --- a/vortex-file/src/v2/mod.rs +++ b/vortex-file/src/v2/mod.rs @@ -9,6 +9,4 @@ mod writer; pub use file::*; pub use open::*; -// TODO(ngates): probably can separate these APIs? For now, re-export the Scan. -pub use vortex_layout::scanner::Scan; pub use writer::*; diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 8e8e2bc916..de90676019 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -38,6 +38,7 @@ pub struct VortexOpenOptions { // additional caching, metrics, or other intercepts, etc. It should support synchronous // read + write of Map or similar. initial_read_size: u64, + /// Configure how to split the file into batches for reading. split_by: SplitBy, } @@ -94,12 +95,13 @@ impl VortexOpenOptions { pub async fn open(self, read: R) -> VortexResult> { // If we already have the file layout, we can skip the initial read entirely. if let Some(file_layout) = self.file_layout { + let segments = Arc::new(SegmentCache::::new(read, file_layout.segments.clone())); + let splits = self.split_by.splits(&file_layout.root_layout)?.into(); return Ok(VortexFile { - read, ctx: self.ctx.clone(), - layout: file_layout.root_layout, - segments: file_layout.segments, - segment_cache: Default::default(), + file_layout, + segments, + splits, }); } @@ -169,7 +171,7 @@ impl VortexOpenOptions { // Finally, create the VortexFile. Ok(VortexFile { ctx: self.ctx.clone(), - layout: file_layout.root_layout, + file_layout, segments: Arc::new(segment_cache), splits, }) diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs index 303519be21..c1cf92abd6 100644 --- a/vortex-file/src/v2/tests.rs +++ b/vortex-file/src/v2/tests.rs @@ -20,7 +20,7 @@ fn basic_file_roundtrip() -> VortexResult<()> { .into_array(); let buffer: Bytes = VortexWriteOptions::default() - .write_async(vec![], array.into_array_stream()) + .write(vec![], array.into_array_stream()) .await? .into(); From 71cc5551d914ee65f2e7a4bb87d94ef9b78d841d Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 20:52:45 +0000 Subject: [PATCH 7/9] I/O driver --- Cargo.lock | 1 + bench-vortex/src/reader.rs | 9 ++++++--- vortex-file/src/v2/file.rs | 14 -------------- vortex-file/src/v2/open/mod.rs | 2 ++ vortex/Cargo.toml | 1 + vortex/src/lib.rs | 1 + 6 files changed, 11 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8c7be4e69a..ebd3e286a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4861,6 +4861,7 @@ dependencies = [ "vortex-runend-bool", "vortex-sampling-compressor", "vortex-scalar", + "vortex-scan", "vortex-zigzag", ] diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index d213fc35e5..f9e701230e 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -27,9 +27,10 @@ use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::file::v2::{Scan, VortexOpenOptions, VortexWriteOptions}; +use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions}; use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; +use vortex::scan::Scan; use vortex::stream::ArrayStreamExt; use vortex::{ArrayData, IntoArrayData, IntoCanonical}; @@ -105,12 +106,14 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu async fn take_vortex( reader: T, - indices: &[u64], + _indices: &[u64], ) -> VortexResult { VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone()) .open(reader) .await? - .scan_rows(Scan::all(), indices.iter().copied())? + // FIXME(ngates): support row indices + // .scan_rows(Scan::all(), indices.iter().copied())? + .scan(Scan::all())? .into_array_data() .await? // For equivalence.... we decompress to make sure we're not cheating too much. diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 9df7e45190..8cf5f97f56 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -29,20 +29,6 @@ pub struct VortexFile { impl VortexFile {} -/// When the underling `R` is `Clone`, we can clone the `VortexFile`. -// TODO(ngates): remove Clone from VortexReadAt? -impl Clone for VortexFile { - fn clone(&self) -> Self { - Self { - ctx: self.ctx.clone(), - file_layout: self.file_layout.clone(), - segments: self.segments.clone(), - splits: self.splits.clone(), - thread_pool: self.thread_pool.clone(), - } - } -} - /// Async implementation of Vortex File. impl VortexFile { /// Returns the number of rows in the file. diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 862a344aff..c48b6bd8ef 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -102,6 +102,8 @@ impl VortexOpenOptions { file_layout, segments, splits, + // FIXME(ngates): we need some way to share the underlying I/O system. This + // segment cache + thread pool. thread_pool: Arc::new( rayon::ThreadPoolBuilder::new() .build() diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index b97d516b4f..849f34fc9a 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -43,6 +43,7 @@ vortex-runend = { workspace = true } vortex-runend-bool = { workspace = true } vortex-sampling-compressor = { workspace = true } vortex-scalar = { workspace = true, default-features = true } +vortex-scan = { workspace = true, default-features = true } vortex-zigzag = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 52c0c86e14..df7c858c55 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -4,6 +4,7 @@ pub use { vortex_error as error, vortex_expr as expr, vortex_file as file, vortex_flatbuffers as flatbuffers, vortex_io as io, vortex_ipc as ipc, vortex_proto as proto, vortex_sampling_compressor as sampling_compressor, vortex_scalar as scalar, + vortex_scan as scan, }; pub mod encodings { From 545108dc42c55a500a223f72d6527b5969f84a56 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 21:17:28 +0000 Subject: [PATCH 8/9] I/O driver --- .history | 11 ----------- vortex-expr/src/identity.rs | 2 +- 2 files changed, 1 insertion(+), 12 deletions(-) delete mode 100644 .history diff --git a/.history b/.history deleted file mode 100644 index f95c77bcde..0000000000 --- a/.history +++ /dev/null @@ -1,11 +0,0 @@ -#V2 -copy (select arrow_cast(1, 'Int8') as x) to '/tmp/foo.parquet'; -describe '/tmp/foo.parquet'; -explain select x = arrow_cast(10000, 'Int32') from '/tmp/foo.parquet'; -select x + 10000 from '/tmp/foo.parquet'; -explain select x + 10000 from '/tmp/foo.parquet'; -explain select x = 10000 from '/tmp/foo.parquet'; -explain select x = arrow_cast(10000, "Int32") from '/tmp/foo.parquet'; -explain select x = cast(10000 AS Int32) from '/tmp/foo.parquet'; -explain select x = cast(10000 AS Int) from '/tmp/foo.parquet'; -explain select x = cast(10000 AS int) from '/tmp/foo.parquet'; diff --git a/vortex-expr/src/identity.rs b/vortex-expr/src/identity.rs index 5ae0289433..08df539576 100644 --- a/vortex-expr/src/identity.rs +++ b/vortex-expr/src/identity.rs @@ -7,7 +7,7 @@ use vortex_error::VortexResult; use crate::{ExprRef, VortexExpr}; -static IDENTITY: LazyLock = LazyLock::new(|| Identity::new_expr()); +static IDENTITY: LazyLock = LazyLock::new(|| Arc::new(Identity)); #[derive(Debug, PartialEq, Eq, Hash)] pub struct Identity; From 625aae74a5f45d5647a86191d53f3ac260680c01 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 22:29:50 +0000 Subject: [PATCH 9/9] Better buffer debug --- vortex-buffer/src/buffer.rs | 29 +++++++++++++++++++--------- vortex-buffer/src/lib.rs | 1 + vortex-file/src/v2/file.rs | 2 ++ vortex-file/src/v2/segments/cache.rs | 2 ++ vortex-layout/src/segments/mod.rs | 7 +++++++ 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index 74530f55b6..13ad4f874d 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -1,3 +1,4 @@ +use std::any::type_name; use std::collections::Bound; use std::fmt::{Debug, Formatter}; use std::ops::{Deref, RangeBounds}; @@ -281,21 +282,31 @@ impl Buffer { } } -impl Debug for Buffer { +impl Debug for Buffer +where + T: Debug, +{ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - const TRUNC_SIZE: usize = 512; - let mut binding = f.debug_struct("Buffer"); - let mut fields = binding + let mut binding = f.debug_struct(&format!("Buffer<{}>", type_name::())); + let fields = binding .field("length", &self.length) .field("alignment", &self.alignment); - let mut bytes = self.bytes.clone(); - if bytes.len() > TRUNC_SIZE { - fields = fields.field("truncated", &true); + const TRUNC_SIZE: usize = 16; + if self.len() <= TRUNC_SIZE { + fields.field("as_slice", &self.as_slice()); + } else { + fields.field_with(&"as_slice", |f| { + write!(f, "[")?; + for elem in self.as_slice().iter().take(TRUNC_SIZE) { + write!(f, "{:?}, ", *elem)?; + } + write!(f, "...")?; + write!(f, "]") + }); } - bytes.truncate(TRUNC_SIZE); - fields.field("bytes", &bytes).finish() + fields.finish() } } diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index 641bcd4104..e2263c4fd0 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -1,4 +1,5 @@ #![feature(unsigned_is_multiple_of)] +#![feature(debug_closure_helpers)] #![deny(missing_docs)] //! A library for working with custom aligned buffers of sized values. diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 8cf5f97f56..fe65ffc65a 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -48,6 +48,8 @@ impl VortexFile { /// Performs a scan operation over the file. pub fn scan(self, scan: Arc) -> VortexResult { + println!("File Layout: {:#?}", self.file_layout); + // Create a shared reader for the scan. let reader: Arc = self .file_layout() diff --git a/vortex-file/src/v2/segments/cache.rs b/vortex-file/src/v2/segments/cache.rs index bb04b6b5ae..0fc75b478a 100644 --- a/vortex-file/src/v2/segments/cache.rs +++ b/vortex-file/src/v2/segments/cache.rs @@ -69,10 +69,12 @@ impl SegmentCache { let segments = self.segments.clone(); async move { let segment = &segments[*request.id as usize]; + println!("Reading segment {} {:?}", *request.id, segment); let bytes = read .read_byte_range(segment.offset, segment.length as u64) .map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment)) .await?; + println!("Completed segment {} read", *request.id); request .callback .send(bytes) diff --git a/vortex-layout/src/segments/mod.rs b/vortex-layout/src/segments/mod.rs index 143b0fa47d..7b66f4c8d5 100644 --- a/vortex-layout/src/segments/mod.rs +++ b/vortex-layout/src/segments/mod.rs @@ -1,3 +1,4 @@ +use std::fmt::Display; use std::ops::Deref; use async_trait::async_trait; @@ -9,6 +10,12 @@ use vortex_error::VortexResult; #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SegmentId(u32); +impl Display for SegmentId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SegmentId({})", self.0) + } +} + impl From for SegmentId { fn from(value: u32) -> Self { Self(value)