From c90e4147de7e77afda2d9715889428fe2824be44 Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Tue, 14 Apr 2026 17:29:33 +0800 Subject: [PATCH] refactor: introduce the ProjectInput structure (#7908) * refactor: introduce the ProjectInput structure * remove unused import * fix: cr * fix: cr * fix: code review * add more unit test * avoid clone of input.projection --- src/catalog/src/system_schema.rs | 8 +- .../system_schema/information_schema/ssts.rs | 6 +- src/cmd/src/datanode/scanbench.rs | 7 +- src/file-engine/src/query.rs | 11 +-- src/metric-engine/src/engine/read.rs | 36 ++++++--- src/metric-engine/src/metadata_region.rs | 9 ++- src/mito2/src/engine/partition_filter_test.rs | 6 +- src/mito2/src/engine/projection_test.rs | 6 +- src/mito2/src/read/scan_region.rs | 20 ++--- src/query/src/dist_plan/analyzer/test.rs | 2 +- src/query/src/dummy_catalog.rs | 2 +- src/store-api/src/sst_entry.rs | 13 ++-- src/store-api/src/storage.rs | 2 + src/store-api/src/storage/projection.rs | 78 +++++++++++++++++++ src/store-api/src/storage/requests.rs | 43 +++++++--- src/table/src/table/adapter.rs | 3 +- src/table/src/table/numbers.rs | 5 +- src/table/src/test_util/memtable.rs | 7 +- 18 files changed, 202 insertions(+), 62 deletions(-) create mode 100644 src/store-api/src/storage/projection.rs diff --git a/src/catalog/src/system_schema.rs b/src/catalog/src/system_schema.rs index 00b96e5292..6c6a5fc202 100644 --- a/src/catalog/src/system_schema.rs +++ b/src/catalog/src/system_schema.rs @@ -139,12 +139,16 @@ impl DataSource for SystemTableDataSource { &self, request: ScanRequest, ) -> std::result::Result { - let projected_schema = match &request.projection { + let projection = request + .projection_input + .as_ref() + .map(|input| input.projection.clone()); + + let projected_schema = match projection.as_ref() { Some(projection) => self.try_project(projection)?, None => self.table.schema(), }; - let projection = request.projection.clone(); let stream = self .table .to_stream(request) diff --git a/src/catalog/src/system_schema/information_schema/ssts.rs b/src/catalog/src/system_schema/information_schema/ssts.rs index 1c0d507a29..520fc6d485 100644 --- a/src/catalog/src/system_schema/information_schema/ssts.rs +++ b/src/catalog/src/system_schema/information_schema/ssts.rs @@ -63,7 +63,7 @@ impl InformationTable for InformationSchemaSstsManifest { } fn to_stream(&self, request: ScanRequest) -> Result { - let schema = if let Some(p) = &request.projection { + let schema = if let Some(p) = request.projection_indices() { Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?) } else { self.schema.clone() @@ -117,7 +117,7 @@ impl InformationTable for InformationSchemaSstsStorage { } fn to_stream(&self, request: ScanRequest) -> Result { - let schema = if let Some(p) = &request.projection { + let schema = if let Some(p) = request.projection_indices() { Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?) } else { self.schema.clone() @@ -172,7 +172,7 @@ impl InformationTable for InformationSchemaSstsIndexMeta { } fn to_stream(&self, request: ScanRequest) -> Result { - let schema = if let Some(p) = &request.projection { + let schema = if let Some(p) = request.projection_indices() { Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?) } else { self.schema.clone() diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs index a93aca430a..b26705991c 100644 --- a/src/cmd/src/datanode/scanbench.rs +++ b/src/cmd/src/datanode/scanbench.rs @@ -53,7 +53,9 @@ use store_api::metadata::RegionMetadata; use store_api::path_utils::WAL_DIR; use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine}; use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest}; -use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; +use store_api::storage::{ + ProjectionInput, RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, +}; use tokio::fs; use crate::datanode::objbench::{build_object_store, parse_config}; @@ -615,9 +617,10 @@ impl ScanbenchCommand { let mut total_rows_all = 0u64; let mut total_elapsed_all = std::time::Duration::ZERO; + let projection_input = projection.map(ProjectionInput::new); for iteration in 0..self.iterations { let request = ScanRequest { - projection: projection.clone(), + projection_input: projection_input.clone(), filters: filters.clone(), series_row_selector, distribution, diff --git a/src/file-engine/src/query.rs b/src/file-engine/src/query.rs index 2182978631..0ac563606a 100644 --- a/src/file-engine/src/query.rs +++ b/src/file-engine/src/query.rs @@ -44,7 +44,8 @@ impl FileRegion { pub fn query(&self, request: ScanRequest) -> Result { let store = build_backend(&self.url, &self.options).context(BuildBackendSnafu)?; - let file_projection = self.projection_pushdown_to_file(&request.projection)?; + let projection = request.projection_indices(); + let file_projection = self.projection_pushdown_to_file(projection)?; let file_filters = self.filters_pushdown_to_file(&request.filters)?; let file_schema = Arc::new(Schema::new(self.file_options.file_column_schemas.clone())); @@ -70,7 +71,7 @@ impl FileRegion { }, )?; - let scan_schema = self.scan_schema(&request.projection)?; + let scan_schema = self.scan_schema(projection)?; Ok(Box::pin(FileToScanRegionStream::new( scan_schema, @@ -81,9 +82,9 @@ impl FileRegion { fn projection_pushdown_to_file( &self, - req_projection: &Option>, + req_projection: Option<&[usize]>, ) -> Result>> { - let Some(scan_projection) = req_projection.as_ref() else { + let Some(scan_projection) = req_projection else { return Ok(None); }; @@ -136,7 +137,7 @@ impl FileRegion { Ok(file_filters) } - fn scan_schema(&self, req_projection: &Option>) -> Result { + fn scan_schema(&self, req_projection: Option<&[usize]>) -> Result { let schema = if let Some(indices) = req_projection { Arc::new( self.metadata diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 13ae461db1..178c2e5b92 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -143,17 +143,26 @@ impl MetricEngineInner { mut request: ScanRequest, ) -> Result { // transform projection - let physical_projection = if let Some(projection) = &request.projection { - self.transform_projection(physical_region_id, logical_region_id, projection) - .await? - } else { - self.default_projection(physical_region_id, logical_region_id) + let physical_projection = match request.projection_input.as_ref() { + Some(projection_input) => { + self.transform_projection( + physical_region_id, + logical_region_id, + &projection_input.projection, + ) .await? + } + None => { + self.default_projection(physical_region_id, logical_region_id) + .await? + } }; - request.projection = Some(physical_projection); + // Rewrite the top-level projection from logical-region schema indices to + // physical-region schema indices. `nested_paths` are left unchanged because + // they are expressed by column name rather than schema index. + request.projection_input.get_or_insert_default().projection = physical_projection; - // add table filter request .filters .push(self.table_id_filter(logical_region_id)); @@ -325,8 +334,9 @@ mod test { .unwrap(); // check explicit projection + let projection_input = Some(vec![0, 1, 2, 3, 4, 5, 6].into()); let scan_req = ScanRequest { - projection: Some(vec![0, 1, 2, 3, 4, 5, 6]), + projection_input, filters: vec![], ..Default::default() }; @@ -338,7 +348,10 @@ mod test { .await .unwrap(); - assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]); + assert_eq!( + scan_req.projection_indices().unwrap(), + &[11, 10, 9, 8, 0, 1, 4] + ); assert_eq!(scan_req.filters.len(), 1); assert_eq!( scan_req.filters[0], @@ -354,6 +367,9 @@ mod test { .transform_request(physical_region_id, logical_region_id, scan_req) .await .unwrap(); - assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]); + assert_eq!( + scan_req.projection_indices().unwrap(), + &[11, 10, 9, 8, 0, 1, 4] + ); } } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index d84cbad946..76abaacf10 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -349,8 +349,9 @@ impl MetadataRegion { METADATA_SCHEMA_VALUE_COLUMN_INDEX, ] }; + let projection_input = Some(projection.into()); ScanRequest { - projection: Some(projection), + projection_input, filters: vec![filter_expr], ..Default::default() } @@ -361,8 +362,9 @@ impl MetadataRegion { METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_INDEX, ]; + let projection_input = Some(projection.into()); ScanRequest { - projection: Some(projection), + projection_input, ..Default::default() } } @@ -679,8 +681,9 @@ impl MetadataRegion { let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME) .eq(datafusion::prelude::lit(key)); + let projection_input = Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX].into()); let scan_req = ScanRequest { - projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), + projection_input, filters: vec![filter_expr], ..Default::default() }; diff --git a/src/mito2/src/engine/partition_filter_test.rs b/src/mito2/src/engine/partition_filter_test.rs index 61db52484e..ff247d0a21 100644 --- a/src/mito2/src/engine/partition_filter_test.rs +++ b/src/mito2/src/engine/partition_filter_test.rs @@ -130,8 +130,9 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) { .unwrap(); // Scan data in staging mode - should only see initial 5 rows (staging SST not visible) + let projection_input = Some(vec![1].into()); let request = ScanRequest { - projection: Some(vec![1]), + projection_input, ..Default::default() }; let scanner = engine.scanner(region_id, request).await.unwrap(); @@ -153,8 +154,9 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) { // Scan after exiting staging - the old SST (tag_0 = "0".."4") should have // rows filtered by partition expr (tag_0 >= "5"), which means none of them pass. // But the staging SST (tag_0 = "5".."10") satisfies the partition expr. + let projection_input = Some(vec![1].into()); let request = ScanRequest { - projection: Some(vec![1]), + projection_input, ..Default::default() }; let scanner = engine.scanner(region_id, request).await.unwrap(); diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index afa505a3ee..3bc449d07b 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -106,8 +106,9 @@ async fn test_scan_projection_with_format(flat_format: bool) { put_rows(&engine, region_id, rows).await; // Scans tag_1, field_1, ts + let projection_input = Some(vec![1, 3, 4].into()); let request = ScanRequest { - projection: Some(vec![1, 3, 4]), + projection_input, filters: Vec::new(), ..Default::default() }; @@ -183,8 +184,9 @@ async fn test_scan_projection_without_primary_key_with_format(flat_format: bool) put_rows(&engine, region_id, rows).await; // Scan with projection on field_0 and field_1, filter ts >= 2s + let projection_input = Some(vec![0, 1].into()); let request = ScanRequest { - projection: Some(vec![0, 1]), // field_0 and field_1 (not ts) + projection_input, // field_0 and field_1 (not ts) filters: vec![col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(2000), None)))], ..Default::default() }; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index f645e3dc26..99b59ae577 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -390,7 +390,7 @@ impl ScanRegion { let time_range = self.build_time_range_predicate(); let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?; - let read_column_ids = match &self.request.projection { + let read_column_ids = match self.request.projection_indices() { Some(p) => self.build_read_column_ids(p, &predicate)?, None => self .version @@ -402,7 +402,7 @@ impl ScanRegion { }; // The mapper always computes projected column ids as the schema of SSTs may change. - let mapper = match &self.request.projection { + let mapper = match self.request.projection_indices() { Some(p) => ProjectionMapper::new_with_read_columns( &self.version.metadata, p.iter().copied(), @@ -1756,7 +1756,9 @@ mod tests { use datatypes::value::Value; use partition::expr::col as partition_col; use store_api::metadata::RegionMetadataBuilder; - use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; + use store_api::storage::{ + ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, + }; use super::*; use crate::cache::CacheManager; @@ -1801,7 +1803,7 @@ mod tests { let version = new_version(metadata.clone()); let env = SchedulerEnv::new().await; let request = ScanRequest { - projection: Some(vec![4]), + projection_input: Some(vec![4].into()), filters: vec![ col("v0").gt(lit(1)), col("ts").gt(lit(0)), @@ -1817,7 +1819,7 @@ mod tests { ); let predicate = PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap(); - let projection = scan_region.request.projection.as_ref().unwrap(); + let projection = &scan_region.request.projection_indices().unwrap(); let read_ids = scan_region .build_read_column_ids(projection, &predicate) .unwrap(); @@ -1830,7 +1832,7 @@ mod tests { let version = new_version(metadata.clone()); let env = SchedulerEnv::new().await; let request = ScanRequest { - projection: Some(vec![]), + projection_input: Some(ProjectionInput::default()), ..Default::default() }; let scan_region = ScanRegion::new( @@ -1841,7 +1843,7 @@ mod tests { ); let predicate = PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap(); - let projection = scan_region.request.projection.as_ref().unwrap(); + let projection = &scan_region.request.projection_indices().unwrap(); let read_ids = scan_region .build_read_column_ids(projection, &predicate) .unwrap(); @@ -1855,7 +1857,7 @@ mod tests { let version = new_version(metadata.clone()); let env = SchedulerEnv::new().await; let request = ScanRequest { - projection: Some(vec![4, 1]), + projection_input: Some(vec![4, 1].into()), filters: vec![col("v0").gt(lit(1))], ..Default::default() }; @@ -1867,7 +1869,7 @@ mod tests { ); let predicate = PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap(); - let projection = scan_region.request.projection.as_ref().unwrap(); + let projection = &scan_region.request.projection_indices().unwrap(); let read_ids = scan_region .build_read_column_ids(projection, &predicate) .unwrap(); diff --git a/src/query/src/dist_plan/analyzer/test.rs b/src/query/src/dist_plan/analyzer/test.rs index 47848c9ceb..9f72df55a5 100644 --- a/src/query/src/dist_plan/analyzer/test.rs +++ b/src/query/src/dist_plan/analyzer/test.rs @@ -126,7 +126,7 @@ impl TestDataSource { impl DataSource for TestDataSource { fn get_stream(&self, request: ScanRequest) -> Result { - let projected_schema = match &request.projection { + let projected_schema = match request.projection_indices() { Some(projection) => Arc::new(self.schema.try_project(projection).unwrap()), None => self.schema.clone(), }; diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 7ce85afbbb..591d068f52 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -178,7 +178,7 @@ impl TableProvider for DummyTableProvider { limit: Option, ) -> datafusion::error::Result> { let mut request = self.scan_request.lock().unwrap().clone(); - request.projection = projection.cloned(); + request.projection_input = projection.map(|p| p.clone().into()); request.filters = filters.to_vec(); request.limit = limit; diff --git a/src/store-api/src/sst_entry.rs b/src/store-api/src/sst_entry.rs index 832bfc1155..840fef2268 100644 --- a/src/store-api/src/sst_entry.rs +++ b/src/store-api/src/sst_entry.rs @@ -384,11 +384,8 @@ fn build_plan_helper( ) -> Result { let table_source = LogicalTableSource::new(schema.arrow_schema().clone()); - let mut builder = LogicalPlanBuilder::scan( - table_name, - Arc::new(table_source), - scan_request.projection.clone(), - )?; + let projection = scan_request.projection_input.map(|input| input.projection); + let mut builder = LogicalPlanBuilder::scan(table_name, Arc::new(table_source), projection)?; for filter in scan_request.filters { builder = builder.filter(filter)?; @@ -910,8 +907,9 @@ mod tests { #[test] fn test_manifest_build_plan() { // Note: filter must reference a column in the projected schema + let projection_input = Some(vec![0, 1, 2].into()); let request = ScanRequest { - projection: Some(vec![0, 1, 2]), + projection_input, filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))], limit: Some(5), ..Default::default() @@ -941,8 +939,9 @@ mod tests { #[test] fn test_storage_build_plan() { + let projection_input = Some(vec![0, 2].into()); let request = ScanRequest { - projection: Some(vec![0, 2]), + projection_input, filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))], limit: Some(1), ..Default::default() diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index b97fe0b3ad..33aa3a7b41 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -17,6 +17,7 @@ pub mod consts; mod descriptors; mod file; +mod projection; mod requests; mod types; @@ -27,6 +28,7 @@ pub use datatypes::schema::{ pub use self::descriptors::*; pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError}; +pub use self::projection::{NestedPath, ProjectionInput}; pub use self::requests::{ ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, VectorDistanceMetric, VectorIndexEngine, VectorIndexEngineType, VectorSearchMatches, VectorSearchRequest, diff --git a/src/store-api/src/storage/projection.rs b/src/store-api/src/storage/projection.rs new file mode 100644 index 0000000000..f8f27fe5a4 --- /dev/null +++ b/src/store-api/src/storage/projection.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Display, Formatter}; + +/// A nested field access path. +/// +/// Each path represents a field access on a nested column. +/// +/// Example: +/// - `j.a.b` -> `["j", "a", "b"]` +pub type NestedPath = Vec; + +/// Projection information for a table scan. +#[derive(Default, Debug, Clone, PartialEq)] +pub struct ProjectionInput { + /// Top-level column projection. + /// + /// The indices are based on the schema exposed by the table scan input, + /// such as the schema passed to `TableProvider::scan`. + /// + /// Only the root columns with the specified schema indices are needed. + pub projection: Vec, + /// Nested field access paths used for sub-field projection. + /// + /// It extends and refines the top-level projection by specifying nested + /// field accesses inside complex columns such as JSON or struct columns. + /// + /// In other words: + /// - `projection` determines **which root columns are needed** + /// - `nested_paths` further determines **which sub-fields inside those + /// columns are required** + /// + /// Each path starts with the root column name and continues with + /// nested field names. + pub nested_paths: Vec, +} + +impl ProjectionInput { + pub fn new(projection: Vec) -> Self { + Self { + projection, + nested_paths: Vec::new(), + } + } + + pub fn with_nested_paths(mut self, nested_paths: Vec) -> Self { + self.nested_paths = nested_paths; + self + } +} + +impl From> for ProjectionInput { + fn from(projection: Vec) -> Self { + Self::new(projection) + } +} + +impl Display for ProjectionInput { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ProjectionInput {{ projection: {:?}, nested_paths: {:?} }}", + self.projection, self.nested_paths + ) + } +} diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index db3fb0388a..9d2f1f2006 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -21,7 +21,7 @@ use datafusion_expr::expr::Expr; pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType}; use strum::Display; -use crate::storage::{ColumnId, SequenceNumber}; +use crate::storage::{ColumnId, ProjectionInput, SequenceNumber}; /// A hint for KNN vector search. #[derive(Debug, Clone, PartialEq)] @@ -95,9 +95,9 @@ pub enum TimeSeriesDistribution { #[derive(Default, Clone, Debug, PartialEq)] pub struct ScanRequest { - /// Indices of columns to read, `None` to read all columns. This indices is - /// based on table schema. - pub projection: Option>, + /// Optional projection information for the scan. `None` reads all root + /// columns. + pub projection_input: Option, /// Filters pushed down pub filters: Vec, /// Expected output ordering. This is only a hint and isn't guaranteed. @@ -130,6 +130,15 @@ pub struct ScanRequest { pub vector_search: Option, } +impl ScanRequest { + /// Returns the top-level projected column indices. + pub fn projection_indices(&self) -> Option<&[usize]> { + self.projection_input + .as_ref() + .map(|projection_input| projection_input.projection.as_slice()) + } +} + impl Display for ScanRequest { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { enum Delimiter { @@ -152,7 +161,7 @@ impl Display for ScanRequest { let mut delimiter = Delimiter::None; write!(f, "ScanRequest {{ ")?; - if let Some(projection) = &self.projection { + if let Some(projection) = &self.projection_input { write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?; } if !self.filters.is_empty() { @@ -235,8 +244,9 @@ mod tests { }; assert_eq!(request.to_string(), "ScanRequest { }"); + let projection_input = Some(vec![1, 2].into()); let request = ScanRequest { - projection: Some(vec![1, 2]), + projection_input, filters: vec![ binary_expr(col("i"), Operator::Gt, lit(1)), binary_expr(col("s"), Operator::Eq, lit("x")), @@ -246,7 +256,7 @@ mod tests { }; assert_eq!( request.to_string(), - r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"# + r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"# ); let request = ScanRequest { @@ -262,14 +272,29 @@ mod tests { r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"# ); + let projection_input = Some(vec![1, 2].into()); let request = ScanRequest { - projection: Some(vec![1, 2]), + projection_input, limit: Some(10), ..Default::default() }; assert_eq!( request.to_string(), - "ScanRequest { projection: [1, 2], limit: 10 }" + "ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, limit: 10 }" + ); + + let projection_input = Some(ProjectionInput::new(vec![1, 2]).with_nested_paths(vec![ + vec!["j".to_string(), "a".to_string(), "b".to_string()], + vec!["s".to_string(), "x".to_string()], + ])); + let request = ScanRequest { + projection_input, + limit: Some(10), + ..Default::default() + }; + assert_eq!( + request.to_string(), + r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [["j", "a", "b"], ["s", "x"]] }, limit: 10 }"# ); let request = ScanRequest { diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index e33bf53c84..55cd5ba017 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -104,10 +104,11 @@ impl TableProvider for DfTableProviderAdapter { limit: Option, ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).collect(); + let projection_input = projection.map(|p| p.clone().into()); let request = { let mut request = self.scan_req.lock().unwrap(); request.filters = filters; - request.projection = projection.cloned(); + request.projection_input = projection_input; request.limit = limit; request.clone() }; diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 6dd40fe747..cb4b8ce317 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -110,7 +110,8 @@ impl NumbersDataSource { impl DataSource for NumbersDataSource { fn get_stream(&self, request: ScanRequest) -> Result { - let projected_schema = match &request.projection { + let projection = request.projection_input.map(|input| input.projection); + let projected_schema = match &projection { Some(projection) => Arc::new(self.schema.try_project(projection).unwrap()), None => self.schema.clone(), }; @@ -118,7 +119,7 @@ impl DataSource for NumbersDataSource { limit: request.limit.unwrap_or(100) as u32, schema: self.schema.clone(), already_run: false, - projection: request.projection, + projection, projected_schema, })) } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index f1108bc533..ee73123aca 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -121,10 +121,10 @@ impl DataSource for MemtableDataSource { &self, request: ScanRequest, ) -> std::result::Result { - let df_recordbatch = if let Some(indices) = request.projection { + let df_recordbatch = if let Some(indices) = request.projection_indices() { self.recordbatch .df_record_batch() - .project(&indices) + .project(indices) .context(TableProjectionSnafu) .map_err(BoxedError::new)? } else { @@ -198,8 +198,9 @@ mod test { async fn test_scan_with_projection() { let table = build_testing_table(); + let projection_input = Some(vec![1].into()); let scan_req = ScanRequest { - projection: Some(vec![1]), + projection_input, ..Default::default() }; let stream = table.scan_to_stream(scan_req).await.unwrap();