From 5b805e7123b2e695c3f9ac0f7b6234ece55774a8 Mon Sep 17 00:00:00 2001 From: Harvey Yue Date: Fri, 3 Jan 2025 10:57:14 +0800 Subject: [PATCH] Fix reading multiple complex data types exception (#140) --- examples/datafusion_integration.rs | 10 ++++++++++ src/physical_exec.rs | 17 +++++++++++++---- tests/basic/data/map_list.snappy.orc | Bin 0 -> 804 bytes 3 files changed, 23 insertions(+), 4 deletions(-) create mode 100644 tests/basic/data/map_list.snappy.orc diff --git a/examples/datafusion_integration.rs b/examples/datafusion_integration.rs index 78d8fa0..e720c09 100644 --- a/examples/datafusion_integration.rs +++ b/examples/datafusion_integration.rs @@ -25,6 +25,16 @@ async fn main() -> Result<()> { // methods available on SessionContext. With that done, we are able to process // ORC files using SQL or the DataFrame API. let ctx = SessionContext::new(); + + ctx.register_orc( + "table2", + "tests/basic/data/map_list.snappy.orc", + OrcReadOptions::default(), + ) + .await?; + + ctx.sql("select id,m,l from table2").await?.show().await?; + ctx.register_orc( "table1", "tests/basic/data/alltypes.snappy.orc", diff --git a/src/physical_exec.rs b/src/physical_exec.rs index 2a3918f..ffd1ee8 100644 --- a/src/physical_exec.rs +++ b/src/physical_exec.rs @@ -126,7 +126,7 @@ impl ExecutionPlan for OrcExec { projection, batch_size: context.session_config().batch_size(), _limit: self.config.limit, - _table_schema: self.config.file_schema.clone(), + table_schema: self.config.file_schema.clone(), _metrics: self.metrics.clone(), object_store, }; @@ -142,7 +142,7 @@ struct OrcOpener { projection: Vec, batch_size: usize, _limit: Option, - _table_schema: SchemaRef, + table_schema: SchemaRef, _metrics: ExecutionPlanMetricsSet, object_store: Arc, } @@ -152,12 +152,21 @@ impl FileOpener for OrcOpener { let reader = ObjectStoreReader::new(self.object_store.clone(), file_meta.object_meta.clone()); let batch_size = self.batch_size; - // Offset by 1 since index 0 is the root - let projection = self.projection.iter().map(|i| i + 1).collect::>(); + let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); + Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader) .await .map_err(ArrowError::from)?; + // Find complex data type column index as projection + let mut projection = Vec::with_capacity(projected_schema.fields().len()); + for named_column in builder.file_metadata().root_data_type().children() { + if let Some((_table_idx, _table_field)) = + projected_schema.fields().find(named_column.name()) + { + projection.push(named_column.data_type().column_index()); + } + } let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); if let Some(range) = file_meta.range.clone() { diff --git a/tests/basic/data/map_list.snappy.orc b/tests/basic/data/map_list.snappy.orc new file mode 100644 index 0000000000000000000000000000000000000000..ee5fcf3d50f50b63824162526dd8d34300624db1 GIT binary patch literal 804 zcmZ{iv2GJV5Qb;=Zuf>g+gY!1G8hYzBZURXvI*5mNG2AD269Su6_NpqL?m%!TNG)q zBIxO9D0u`*I$nYYpah8miH5O_Axc7Vv$MP3&g}o+Zf$qV0M_`BFoPSCR%`$OzCh>= zDW_Ova9P&l`TC5zBlEh|!U0J7Nf*yiwTvAe=v zS+G@Mm#luW*E^DOr5{r^ZR&PZrW~1 z+QCnApaL+hJ!>;V?!^FqXOJm`k65>cj027}5i(0u>?8)fCNbo?CeTGD!`HOyM7%CM zI%FfG7^mqXx`3@k2SL%v0j(W`+R3KYceS@~Oum`~-M|JN6ju z%yU~h`A2eTg3BLdm^yaa{@hvVa?)%Tr6smu~)ZGcSEX_=R67aPNXz zL59!B4_9Af1CuHhe27X%HMRS!hWj<#tKle76efmZuYyqWC_xcS)}u)<{V5!`{=Kesgr#8@_11><>q~g9EQOn(gMT7N#??=_#?PIGZ88KVNKmMr>OCzuNSV n+6$|#>AT5Gowtu3K#ues<+V>HE>TcuylO?qKZ0OuyUV`;UsP?F literal 0 HcmV?d00001