Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Cutover to Vortex Layouts #1899

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::{stream, StreamExt, TryStreamExt};
use futures::executor::block_on;
use itertools::Itertools;
use rayon::prelude::*;
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
use vortex::dtype::DType;
use vortex::error::vortex_err;
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
use vortex::file::v2::VortexWriteOptions;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
Expand Down Expand Up @@ -149,7 +152,9 @@ pub async fn register_vortex_files(
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

stream::iter(0..100)
(0..100)
.collect_vec()
.par_iter()
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
Expand All @@ -158,7 +163,7 @@ pub async fn register_vortex_files(
let session = session.clone();
let schema = schema.clone();

tokio::spawn(async move {
block_on(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
Expand Down Expand Up @@ -219,19 +224,17 @@ pub async fn register_vortex_files(
.open(&vtx_file)
.await?;

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
VortexWriteOptions::default()
.write(f, data.into_array_stream())
.await?;

anyhow::Ok(())
})
.await
.expect("Failed to write Vortex file")
})
})
.buffered(16)
.try_collect::<Vec<_>>()
.await?;
.collect::<Vec<_>>();

let format = Arc::new(VortexFormat::new(CTX.clone()));
let table_path = vortex_dir
Expand Down
68 changes: 26 additions & 42 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;

use arrow_array::types::Int64Type;
use arrow_array::{
Expand All @@ -24,18 +24,16 @@ use stream::StreamExt;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::io::{IoDispatcher, ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::scan::Scan;
use vortex::stream::ArrayStreamExt;
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::default()));

pub const BATCH_SIZE: usize = 65_536;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -48,19 +46,12 @@ pub struct VortexFooter {
pub async fn open_vortex(path: &Path) -> VortexResult<ArrayData> {
let file = TokioFile::open(path).unwrap();

VortexReadBuilder::new(
file,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.build()
.await?
.into_stream()
.read_all()
.await
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
.open(file)
.await?
.scan(Scan::all())?
.into_array_data()
.await
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
Expand All @@ -69,11 +60,10 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

VortexFileWriter::new(write)
.write_array_columns(chunked)
.await?
.finalize()
VortexWriteOptions::default()
.write(write, chunked.into_array_stream())
.await?;

Ok(())
}

Expand Down Expand Up @@ -116,25 +106,19 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu

async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
indices: &[u64],
_indices: &[u64],
) -> VortexResult<ArrayData> {
VortexReadBuilder::new(
reader,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.with_indices(Buffer::copy_from(indices).into_array())
.build()
.await?
.into_stream()
.read_all()
.await
// For equivalence.... we decompress to make sure we're not cheating too much.
.and_then(IntoCanonical::into_canonical)
.map(ArrayData::from)
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
.open(reader)
.await?
// FIXME(ngates): support row indices
// .scan_rows(Scan::all(), indices.iter().copied())?
.scan(Scan::all())?
.into_array_data()
.await?
// For equivalence.... we decompress to make sure we're not cheating too much.
.into_canonical()
.map(ArrayData::from)
}

pub async fn take_vortex_object_store(
Expand Down
9 changes: 5 additions & 4 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
Expand All @@ -31,6 +31,7 @@ mod execute;
pub mod schema;

pub use execute::*;
use vortex::file::v2::VortexWriteOptions;

pub const EXPECTED_ROW_COUNTS: [usize; 23] = [
0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7,
Expand Down Expand Up @@ -275,9 +276,9 @@ async fn register_vortex_file(
.open(&vtx_file)
.await?;

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
VortexWriteOptions::default()
.write(f, data.into_array_stream())
.await?;

anyhow::Ok(())
})
Expand Down
29 changes: 20 additions & 9 deletions vortex-buffer/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::any::type_name;
use std::collections::Bound;
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, RangeBounds};
Expand Down Expand Up @@ -281,21 +282,31 @@ impl<T> Buffer<T> {
}
}

impl<T> Debug for Buffer<T> {
impl<T> Debug for Buffer<T>
where
T: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
const TRUNC_SIZE: usize = 512;
let mut binding = f.debug_struct("Buffer");
let mut fields = binding
let mut binding = f.debug_struct(&format!("Buffer<{}>", type_name::<T>()));
let fields = binding
.field("length", &self.length)
.field("alignment", &self.alignment);

let mut bytes = self.bytes.clone();
if bytes.len() > TRUNC_SIZE {
fields = fields.field("truncated", &true);
const TRUNC_SIZE: usize = 16;
if self.len() <= TRUNC_SIZE {
fields.field("as_slice", &self.as_slice());
} else {
fields.field_with(&"as_slice", |f| {
write!(f, "[")?;
for elem in self.as_slice().iter().take(TRUNC_SIZE) {
write!(f, "{:?}, ", *elem)?;
}
write!(f, "...")?;
write!(f, "]")
});
}

bytes.truncate(TRUNC_SIZE);
fields.field("bytes", &bytes).finish()
fields.finish()
}
}

Expand Down
1 change: 1 addition & 0 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(unsigned_is_multiple_of)]
#![feature(debug_closure_helpers)]
#![deny(missing_docs)]

//! A library for working with custom aligned buffers of sized values.
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-scan = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
22 changes: 13 additions & 9 deletions vortex-datafusion/src/persistent/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use chrono::{DateTime, Utc};
use moka::future::Cache;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::ContextRef;
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_file::{read_initial_bytes, InitialRead};
use vortex_file::v2::footer::FileLayout;
use vortex_file::v2::VortexOpenOptions;
use vortex_io::ObjectStoreReadAt;

#[derive(Debug, Clone)]
pub struct InitialReadCache {
inner: Cache<Key, InitialRead>,
pub struct FileLayoutCache {
inner: Cache<Key, FileLayout>,
}

#[derive(Hash, Eq, PartialEq, Debug)]
Expand All @@ -28,28 +30,30 @@ impl From<&ObjectMeta> for Key {
}
}

impl InitialReadCache {
impl FileLayoutCache {
pub fn new(size_mb: usize) -> Self {
let inner = Cache::builder()
.weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32)
.max_capacity(size_mb as u64 * (2 << 20))
.eviction_listener(|k, _v, cause| {
.eviction_listener(|k: Arc<Key>, _v, cause| {
log::trace!("Removed {} due to {:?}", k.location, cause);
})
.build();

Self { inner }
}

pub async fn try_get(
&self,
object: &ObjectMeta,
store: Arc<dyn ObjectStore>,
) -> VortexResult<InitialRead> {
) -> VortexResult<FileLayout> {
self.inner
.try_get_with(Key::from(object), async {
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
VortexResult::Ok(initial_read)
let vxf = VortexOpenOptions::new(ContextRef::default())
.open(os_read_at)
.await?;
VortexResult::Ok(vxf.file_layout().clone())
})
.await
.map_err(|e: Arc<VortexError>| match Arc::try_unwrap(e) {
Expand Down
8 changes: 4 additions & 4 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Plan
use itertools::Itertools;
use vortex_array::ContextRef;

use super::cache::InitialReadCache;
use super::cache::FileLayoutCache;
use crate::persistent::opener::VortexFileOpener;

#[derive(Debug, Clone)]
Expand All @@ -24,7 +24,7 @@ pub struct VortexExec {
plan_properties: PlanProperties,
projected_statistics: Statistics,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
initial_read_cache: FileLayoutCache,
}

impl VortexExec {
Expand All @@ -33,7 +33,7 @@ impl VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
ctx: ContextRef,
initial_read_cache: InitialReadCache,
initial_read_cache: FileLayoutCache,
) -> DFResult<Self> {
let projected_schema = project_schema(
&file_scan_config.file_schema,
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ExecutionPlan for VortexExec {
object_store,
projection: self.file_scan_config.projection.clone(),
predicate: self.predicate.clone(),
initial_read_cache: self.initial_read_cache.clone(),
file_layout_cache: self.initial_read_cache.clone(),
arrow_schema,
};
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;
Expand Down
Loading
Loading