Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading multiple complex data types exception #140

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/datafusion_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ async fn main() -> Result<()> {
// methods available on SessionContext. With that done, we are able to process
// ORC files using SQL or the DataFrame API.
let ctx = SessionContext::new();

ctx.register_orc(
"table2",
"tests/basic/data/map_list.snappy.orc",
OrcReadOptions::default(),
)
.await?;

ctx.sql("select id,m,l from table2").await?.show().await?;

ctx.register_orc(
"table1",
"tests/basic/data/alltypes.snappy.orc",
Expand Down
17 changes: 13 additions & 4 deletions src/physical_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl ExecutionPlan for OrcExec {
projection,
batch_size: context.session_config().batch_size(),
_limit: self.config.limit,
_table_schema: self.config.file_schema.clone(),
table_schema: self.config.file_schema.clone(),
_metrics: self.metrics.clone(),
object_store,
};
Expand All @@ -142,7 +142,7 @@ struct OrcOpener {
projection: Vec<usize>,
batch_size: usize,
_limit: Option<usize>,
_table_schema: SchemaRef,
table_schema: SchemaRef,
_metrics: ExecutionPlanMetricsSet,
object_store: Arc<dyn ObjectStore>,
}
Expand All @@ -152,12 +152,21 @@ impl FileOpener for OrcOpener {
let reader =
ObjectStoreReader::new(self.object_store.clone(), file_meta.object_meta.clone());
let batch_size = self.batch_size;
// Offset by 1 since index 0 is the root
let projection = self.projection.iter().map(|i| i + 1).collect::<Vec<_>>();
let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?);

Ok(Box::pin(async move {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
.await
.map_err(ArrowError::from)?;
// Find complex data type column index as projection
let mut projection = Vec::with_capacity(projected_schema.fields().len());
for named_column in builder.file_metadata().root_data_type().children() {
if let Some((_table_idx, _table_field)) =
projected_schema.fields().find(named_column.name())
{
projection.push(named_column.data_type().column_index());
}
}
Comment on lines +162 to +169
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut projection = Vec::with_capacity(projected_schema.fields().len());
for named_column in builder.file_metadata().root_data_type().children() {
if let Some((_table_idx, _table_field)) =
projected_schema.fields().find(named_column.name())
{
projection.push(named_column.data_type().column_index());
}
}
let projection = builder
.file_metadata()
.root_data_type()
.children()
.iter()
.filter(|named_column| {
projected_schema
.fields()
.find(named_column.name())
.is_some()
})
.map(|named_column| named_column.data_type().column_index());

Thoughts on using a more iterator based approach? Could potentially be simplified further.

let projection_mask =
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
if let Some(range) = file_meta.range.clone() {
Expand Down
Binary file added tests/basic/data/map_list.snappy.orc
Binary file not shown.
Loading