diff --git a/examples/datafusion_integration.rs b/examples/datafusion_integration.rs index 78d8fa0..e720c09 100644 --- a/examples/datafusion_integration.rs +++ b/examples/datafusion_integration.rs @@ -25,6 +25,16 @@ async fn main() -> Result<()> { // methods available on SessionContext. With that done, we are able to process // ORC files using SQL or the DataFrame API. let ctx = SessionContext::new(); + + ctx.register_orc( + "table2", + "tests/basic/data/map_list.snappy.orc", + OrcReadOptions::default(), + ) + .await?; + + ctx.sql("select id,m,l from table2").await?.show().await?; + ctx.register_orc( "table1", "tests/basic/data/alltypes.snappy.orc", diff --git a/src/physical_exec.rs b/src/physical_exec.rs index 2a3918f..ffd1ee8 100644 --- a/src/physical_exec.rs +++ b/src/physical_exec.rs @@ -126,7 +126,7 @@ impl ExecutionPlan for OrcExec { projection, batch_size: context.session_config().batch_size(), _limit: self.config.limit, - _table_schema: self.config.file_schema.clone(), + table_schema: self.config.file_schema.clone(), _metrics: self.metrics.clone(), object_store, }; @@ -142,7 +142,7 @@ struct OrcOpener { projection: Vec, batch_size: usize, _limit: Option, - _table_schema: SchemaRef, + table_schema: SchemaRef, _metrics: ExecutionPlanMetricsSet, object_store: Arc, } @@ -152,12 +152,21 @@ impl FileOpener for OrcOpener { let reader = ObjectStoreReader::new(self.object_store.clone(), file_meta.object_meta.clone()); let batch_size = self.batch_size; - // Offset by 1 since index 0 is the root - let projection = self.projection.iter().map(|i| i + 1).collect::>(); + let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); + Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader) .await .map_err(ArrowError::from)?; + // Find complex data type column index as projection + let mut projection = Vec::with_capacity(projected_schema.fields().len()); + for named_column in builder.file_metadata().root_data_type().children() { + if let Some((_table_idx, _table_field)) = + projected_schema.fields().find(named_column.name()) + { + projection.push(named_column.data_type().column_index()); + } + } let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); if let Some(range) = file_meta.range.clone() { diff --git a/tests/basic/data/map_list.snappy.orc b/tests/basic/data/map_list.snappy.orc new file mode 100644 index 0000000..ee5fcf3 Binary files /dev/null and b/tests/basic/data/map_list.snappy.orc differ