From 76cad696c6ce26d9377491ca7ae46e078d6efbea Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Fri, 10 Apr 2026 18:41:48 +0800 Subject: [PATCH] feat: add parquet nested leaf projection (#7900) * feat: add parquet nested leaf projection * rename ParquetProjection related struct * add some apis * extract common build schema function for test * remove unsed method * keep only deduped parquet root projection constructor * add more unit tests * fix: typo * fix: cr * fast-path parquet root projection without nested fields * extract a build_projection_mask method * fix: cargo clippy --- src/mito2/src/sst/parquet.rs | 1 + src/mito2/src/sst/parquet/read_columns.rs | 316 ++++++++++++++++++++++ src/mito2/src/sst/parquet/reader.rs | 10 +- 3 files changed, 323 insertions(+), 4 deletions(-) create mode 100644 src/mito2/src/sst/parquet/read_columns.rs diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 2447824ad9..90395642b6 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -31,6 +31,7 @@ pub mod format; pub(crate) mod helper; pub(crate) mod metadata; pub mod prefilter; +pub mod read_columns; pub mod reader; pub mod row_group; pub mod row_selection; diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs new file mode 100644 index 0000000000..f0f35a4099 --- /dev/null +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -0,0 +1,316 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use parquet::arrow::ProjectionMask; +use parquet::schema::types::SchemaDescriptor; + +/// A nested field access path inside one parquet root column. +pub type ParquetNestedPath = Vec; + +/// The parquet columns to read. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParquetReadColumns { + cols: Vec, + has_nested: bool, +} + +impl ParquetReadColumns { + /// Builds root-column projections from root indices that are already + /// deduplicated. + /// + /// Note: this constructor does not check for duplicates. + pub fn from_deduped_root_indices(root_indices: impl IntoIterator) -> Self { + let cols = root_indices + .into_iter() + .map(ParquetReadColumn::new) + .collect(); + Self { + cols, + has_nested: false, + } + } + + pub fn columns(&self) -> &[ParquetReadColumn] { + &self.cols + } + + pub fn has_nested(&self) -> bool { + self.has_nested + } + + pub fn root_indices_iter(&self) -> impl Iterator + '_ { + self.cols.iter().map(|col| col.root_index) + } +} + +/// Read requirement for a single parquet root column. +/// +/// `root_index` identifies the root column in the parquet schema. +/// +/// If `nested_paths` is empty, the whole root column is read. Otherwise, only +/// leaves under the specified nested paths are read. +/// +/// To construct a [`ParquetReadColumn`]: +/// - `ParquetReadColumn::new(0)` reads the whole root column at index `0`. +/// - `ParquetReadColumn::new(0).with_nested_paths(vec![vec!["j".into(), "b".into()]])` +/// reads only leaves under `j.b`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParquetReadColumn { + /// Root field index in the parquet schema. + root_index: usize, + /// Nested paths to read under this root column. + /// + /// Each path includes the root column itself. For example, for a root + /// column `j`, path `["j", "a", "b"]` refers to `j.a.b`. + /// + /// If empty, the whole root column is read. + nested_paths: Vec, +} + +impl ParquetReadColumn { + pub fn new(root_index: usize) -> Self { + Self { + root_index, + nested_paths: vec![], + } + } + + pub fn with_nested_paths(self, nested_paths: Vec) -> Self { + Self { + nested_paths, + ..self + } + } + + pub fn root_index(&self) -> usize { + self.root_index + } + + pub fn nested_paths(&self) -> &[ParquetNestedPath] { + &self.nested_paths + } +} + +/// Builds a projection mask from parquet read columns. +pub fn build_projection_mask( + parquet_read_cols: &ParquetReadColumns, + parquet_schema_desc: &SchemaDescriptor, +) -> ProjectionMask { + if parquet_read_cols.has_nested() { + let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, parquet_read_cols); + ProjectionMask::leaves(parquet_schema_desc, leaf_indices) + } else { + ProjectionMask::roots(parquet_schema_desc, parquet_read_cols.root_indices_iter()) + } +} + +/// Builds parquet leaf-column indices from parquet read columns. +fn build_parquet_leaves_indices( + parquet_schema_desc: &SchemaDescriptor, + projection: &ParquetReadColumns, +) -> Vec { + let mut map = HashMap::with_capacity(projection.cols.len()); + for col in &projection.cols { + map.insert(col.root_index, &col.nested_paths); + } + + let mut leaf_indices = Vec::new(); + for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() { + let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx); + let Some(nested_paths) = map.get(&root_idx) else { + continue; + }; + if nested_paths.is_empty() { + leaf_indices.push(leaf_idx); + continue; + } + + let leaf_path = leaf_col.path().parts(); + if nested_paths + .iter() + .any(|nested_path| leaf_path.starts_with(nested_path)) + { + leaf_indices.push(leaf_idx); + } + } + leaf_indices +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use parquet::basic::Repetition; + use parquet::schema::types::Type; + + use super::*; + + #[test] + fn test_reads_whole_root() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![], + }], + has_nested: false, + }; + + assert_eq!( + vec![0, 1, 2], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_filters_nested_paths() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ + ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }, + ParquetReadColumn { + root_index: 1, + nested_paths: vec![], + }, + ], + has_nested: true, + }; + + assert_eq!( + vec![1, 2, 3], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_reads_middle_level_path() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }], + has_nested: true, + }; + + assert_eq!( + vec![1, 2], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_reads_leaf_level_path() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]], + }], + has_nested: true, + }; + + assert_eq!( + vec![1], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_merges_mixed_paths() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![ + vec!["j".to_string(), "a".to_string()], + vec!["j".to_string(), "b".to_string(), "d".to_string()], + ], + }], + has_nested: true, + }; + + assert_eq!( + vec![0, 2], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + // Test schema: + // schema + // |- j + // | |- a: INT64 + // | `- b + // | |- c: INT64 + // | `- d: INT64 + // `- k: INT64 + fn build_test_nested_parquet_schema() -> SchemaDescriptor { + let leaf_a = Arc::new( + Type::primitive_type_builder("a", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_c = Arc::new( + Type::primitive_type_builder("c", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_d = Arc::new( + Type::primitive_type_builder("d", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let group_b = Arc::new( + Type::group_type_builder("b") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_c, leaf_d]) + .build() + .unwrap(), + ); + let root_j = Arc::new( + Type::group_type_builder("j") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_a, group_b]) + .build() + .unwrap(), + ); + let root_k = Arc::new( + Type::primitive_type_builder("k", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let schema = Arc::new( + Type::group_type_builder("schema") + .with_fields(vec![root_j, root_k]) + .build() + .unwrap(), + ); + + SchemaDescriptor::new(schema) + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 73ca7748e9..6fdbb6f243 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -79,6 +79,7 @@ use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter, }; +use crate::sst::parquet::read_columns::{ParquetReadColumns, build_projection_mask}; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -406,10 +407,11 @@ impl ParquetReaderBuilder { // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let indices = read_format.projection_indices(); - // Now we assumes we don't have nested schemas. - // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. - let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); + let parquet_read_cols = ParquetReadColumns::from_deduped_root_indices( + read_format.projection_indices().iter().copied(), + ); + + let projection_mask = build_projection_mask(&parquet_read_cols, parquet_schema_desc); let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await;