refactor: introduce the ProjectInput structure (#7908)

* refactor: introduce the ProjectInput structure

* remove unused import

* fix: cr

* fix: cr

* fix: code review

* add more unit test

* avoid clone of input.projection
This commit is contained in:
fys
2026-04-14 17:29:33 +08:00
committed by GitHub
parent 6fcca6e0d6
commit c90e4147de
18 changed files with 202 additions and 62 deletions

View File

@@ -139,12 +139,16 @@ impl DataSource for SystemTableDataSource {
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let projected_schema = match &request.projection {
let projection = request
.projection_input
.as_ref()
.map(|input| input.projection.clone());
let projected_schema = match projection.as_ref() {
Some(projection) => self.try_project(projection)?,
None => self.table.schema(),
};
let projection = request.projection.clone();
let stream = self
.table
.to_stream(request)

View File

@@ -63,7 +63,7 @@ impl InformationTable for InformationSchemaSstsManifest {
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = if let Some(p) = &request.projection {
let schema = if let Some(p) = request.projection_indices() {
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
} else {
self.schema.clone()
@@ -117,7 +117,7 @@ impl InformationTable for InformationSchemaSstsStorage {
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = if let Some(p) = &request.projection {
let schema = if let Some(p) = request.projection_indices() {
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
} else {
self.schema.clone()
@@ -172,7 +172,7 @@ impl InformationTable for InformationSchemaSstsIndexMeta {
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = if let Some(p) = &request.projection {
let schema = if let Some(p) = request.projection_indices() {
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
} else {
self.schema.clone()

View File

@@ -53,7 +53,9 @@ use store_api::metadata::RegionMetadata;
use store_api::path_utils::WAL_DIR;
use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine};
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use store_api::storage::{
ProjectionInput, RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
};
use tokio::fs;
use crate::datanode::objbench::{build_object_store, parse_config};
@@ -615,9 +617,10 @@ impl ScanbenchCommand {
let mut total_rows_all = 0u64;
let mut total_elapsed_all = std::time::Duration::ZERO;
let projection_input = projection.map(ProjectionInput::new);
for iteration in 0..self.iterations {
let request = ScanRequest {
projection: projection.clone(),
projection_input: projection_input.clone(),
filters: filters.clone(),
series_row_selector,
distribution,

View File

@@ -44,7 +44,8 @@ impl FileRegion {
pub fn query(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let store = build_backend(&self.url, &self.options).context(BuildBackendSnafu)?;
let file_projection = self.projection_pushdown_to_file(&request.projection)?;
let projection = request.projection_indices();
let file_projection = self.projection_pushdown_to_file(projection)?;
let file_filters = self.filters_pushdown_to_file(&request.filters)?;
let file_schema = Arc::new(Schema::new(self.file_options.file_column_schemas.clone()));
@@ -70,7 +71,7 @@ impl FileRegion {
},
)?;
let scan_schema = self.scan_schema(&request.projection)?;
let scan_schema = self.scan_schema(projection)?;
Ok(Box::pin(FileToScanRegionStream::new(
scan_schema,
@@ -81,9 +82,9 @@ impl FileRegion {
fn projection_pushdown_to_file(
&self,
req_projection: &Option<Vec<usize>>,
req_projection: Option<&[usize]>,
) -> Result<Option<Vec<usize>>> {
let Some(scan_projection) = req_projection.as_ref() else {
let Some(scan_projection) = req_projection else {
return Ok(None);
};
@@ -136,7 +137,7 @@ impl FileRegion {
Ok(file_filters)
}
fn scan_schema(&self, req_projection: &Option<Vec<usize>>) -> Result<SchemaRef> {
fn scan_schema(&self, req_projection: Option<&[usize]>) -> Result<SchemaRef> {
let schema = if let Some(indices) = req_projection {
Arc::new(
self.metadata

View File

@@ -143,17 +143,26 @@ impl MetricEngineInner {
mut request: ScanRequest,
) -> Result<ScanRequest> {
// transform projection
let physical_projection = if let Some(projection) = &request.projection {
self.transform_projection(physical_region_id, logical_region_id, projection)
.await?
} else {
self.default_projection(physical_region_id, logical_region_id)
let physical_projection = match request.projection_input.as_ref() {
Some(projection_input) => {
self.transform_projection(
physical_region_id,
logical_region_id,
&projection_input.projection,
)
.await?
}
None => {
self.default_projection(physical_region_id, logical_region_id)
.await?
}
};
request.projection = Some(physical_projection);
// Rewrite the top-level projection from logical-region schema indices to
// physical-region schema indices. `nested_paths` are left unchanged because
// they are expressed by column name rather than schema index.
request.projection_input.get_or_insert_default().projection = physical_projection;
// add table filter
request
.filters
.push(self.table_id_filter(logical_region_id));
@@ -325,8 +334,9 @@ mod test {
.unwrap();
// check explicit projection
let projection_input = Some(vec![0, 1, 2, 3, 4, 5, 6].into());
let scan_req = ScanRequest {
projection: Some(vec![0, 1, 2, 3, 4, 5, 6]),
projection_input,
filters: vec![],
..Default::default()
};
@@ -338,7 +348,10 @@ mod test {
.await
.unwrap();
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
assert_eq!(
scan_req.projection_indices().unwrap(),
&[11, 10, 9, 8, 0, 1, 4]
);
assert_eq!(scan_req.filters.len(), 1);
assert_eq!(
scan_req.filters[0],
@@ -354,6 +367,9 @@ mod test {
.transform_request(physical_region_id, logical_region_id, scan_req)
.await
.unwrap();
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
assert_eq!(
scan_req.projection_indices().unwrap(),
&[11, 10, 9, 8, 0, 1, 4]
);
}
}

View File

@@ -349,8 +349,9 @@ impl MetadataRegion {
METADATA_SCHEMA_VALUE_COLUMN_INDEX,
]
};
let projection_input = Some(projection.into());
ScanRequest {
projection: Some(projection),
projection_input,
filters: vec![filter_expr],
..Default::default()
}
@@ -361,8 +362,9 @@ impl MetadataRegion {
METADATA_SCHEMA_KEY_COLUMN_INDEX,
METADATA_SCHEMA_VALUE_COLUMN_INDEX,
];
let projection_input = Some(projection.into());
ScanRequest {
projection: Some(projection),
projection_input,
..Default::default()
}
}
@@ -679,8 +681,9 @@ impl MetadataRegion {
let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
.eq(datafusion::prelude::lit(key));
let projection_input = Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX].into());
let scan_req = ScanRequest {
projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
projection_input,
filters: vec![filter_expr],
..Default::default()
};

View File

@@ -130,8 +130,9 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
.unwrap();
// Scan data in staging mode - should only see initial 5 rows (staging SST not visible)
let projection_input = Some(vec![1].into());
let request = ScanRequest {
projection: Some(vec![1]),
projection_input,
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
@@ -153,8 +154,9 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
// Scan after exiting staging - the old SST (tag_0 = "0".."4") should have
// rows filtered by partition expr (tag_0 >= "5"), which means none of them pass.
// But the staging SST (tag_0 = "5".."10") satisfies the partition expr.
let projection_input = Some(vec![1].into());
let request = ScanRequest {
projection: Some(vec![1]),
projection_input,
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();

View File

@@ -106,8 +106,9 @@ async fn test_scan_projection_with_format(flat_format: bool) {
put_rows(&engine, region_id, rows).await;
// Scans tag_1, field_1, ts
let projection_input = Some(vec![1, 3, 4].into());
let request = ScanRequest {
projection: Some(vec![1, 3, 4]),
projection_input,
filters: Vec::new(),
..Default::default()
};
@@ -183,8 +184,9 @@ async fn test_scan_projection_without_primary_key_with_format(flat_format: bool)
put_rows(&engine, region_id, rows).await;
// Scan with projection on field_0 and field_1, filter ts >= 2s
let projection_input = Some(vec![0, 1].into());
let request = ScanRequest {
projection: Some(vec![0, 1]), // field_0 and field_1 (not ts)
projection_input, // field_0 and field_1 (not ts)
filters: vec![col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(2000), None)))],
..Default::default()
};

View File

@@ -390,7 +390,7 @@ impl ScanRegion {
let time_range = self.build_time_range_predicate();
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
let read_column_ids = match &self.request.projection {
let read_column_ids = match self.request.projection_indices() {
Some(p) => self.build_read_column_ids(p, &predicate)?,
None => self
.version
@@ -402,7 +402,7 @@ impl ScanRegion {
};
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
let mapper = match self.request.projection_indices() {
Some(p) => ProjectionMapper::new_with_read_columns(
&self.version.metadata,
p.iter().copied(),
@@ -1756,7 +1756,9 @@ mod tests {
use datatypes::value::Value;
use partition::expr::col as partition_col;
use store_api::metadata::RegionMetadataBuilder;
use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use store_api::storage::{
ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
};
use super::*;
use crate::cache::CacheManager;
@@ -1801,7 +1803,7 @@ mod tests {
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection: Some(vec![4]),
projection_input: Some(vec![4].into()),
filters: vec![
col("v0").gt(lit(1)),
col("ts").gt(lit(0)),
@@ -1817,7 +1819,7 @@ mod tests {
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = scan_region.request.projection.as_ref().unwrap();
let projection = &scan_region.request.projection_indices().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
@@ -1830,7 +1832,7 @@ mod tests {
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection: Some(vec![]),
projection_input: Some(ProjectionInput::default()),
..Default::default()
};
let scan_region = ScanRegion::new(
@@ -1841,7 +1843,7 @@ mod tests {
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = scan_region.request.projection.as_ref().unwrap();
let projection = &scan_region.request.projection_indices().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
@@ -1855,7 +1857,7 @@ mod tests {
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection: Some(vec![4, 1]),
projection_input: Some(vec![4, 1].into()),
filters: vec![col("v0").gt(lit(1))],
..Default::default()
};
@@ -1867,7 +1869,7 @@ mod tests {
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = scan_region.request.projection.as_ref().unwrap();
let projection = &scan_region.request.projection_indices().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();

View File

@@ -126,7 +126,7 @@ impl TestDataSource {
impl DataSource for TestDataSource {
fn get_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream, BoxedError> {
let projected_schema = match &request.projection {
let projected_schema = match request.projection_indices() {
Some(projection) => Arc::new(self.schema.try_project(projection).unwrap()),
None => self.schema.clone(),
};

View File

@@ -178,7 +178,7 @@ impl TableProvider for DummyTableProvider {
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let mut request = self.scan_request.lock().unwrap().clone();
request.projection = projection.cloned();
request.projection_input = projection.map(|p| p.clone().into());
request.filters = filters.to_vec();
request.limit = limit;

View File

@@ -384,11 +384,8 @@ fn build_plan_helper(
) -> Result<LogicalPlan, DataFusionError> {
let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
let mut builder = LogicalPlanBuilder::scan(
table_name,
Arc::new(table_source),
scan_request.projection.clone(),
)?;
let projection = scan_request.projection_input.map(|input| input.projection);
let mut builder = LogicalPlanBuilder::scan(table_name, Arc::new(table_source), projection)?;
for filter in scan_request.filters {
builder = builder.filter(filter)?;
@@ -910,8 +907,9 @@ mod tests {
#[test]
fn test_manifest_build_plan() {
// Note: filter must reference a column in the projected schema
let projection_input = Some(vec![0, 1, 2].into());
let request = ScanRequest {
projection: Some(vec![0, 1, 2]),
projection_input,
filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
limit: Some(5),
..Default::default()
@@ -941,8 +939,9 @@ mod tests {
#[test]
fn test_storage_build_plan() {
let projection_input = Some(vec![0, 2].into());
let request = ScanRequest {
projection: Some(vec![0, 2]),
projection_input,
filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
limit: Some(1),
..Default::default()

View File

@@ -17,6 +17,7 @@
pub mod consts;
mod descriptors;
mod file;
mod projection;
mod requests;
mod types;
@@ -27,6 +28,7 @@ pub use datatypes::schema::{
pub use self::descriptors::*;
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError};
pub use self::projection::{NestedPath, ProjectionInput};
pub use self::requests::{
ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, VectorDistanceMetric,
VectorIndexEngine, VectorIndexEngineType, VectorSearchMatches, VectorSearchRequest,

View File

@@ -0,0 +1,78 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
/// A nested field access path.
///
/// Each path represents a field access on a nested column.
///
/// Example:
/// - `j.a.b` -> `["j", "a", "b"]`
pub type NestedPath = Vec<String>;
/// Projection information for a table scan.
#[derive(Default, Debug, Clone, PartialEq)]
pub struct ProjectionInput {
/// Top-level column projection.
///
/// The indices are based on the schema exposed by the table scan input,
/// such as the schema passed to `TableProvider::scan`.
///
/// Only the root columns with the specified schema indices are needed.
pub projection: Vec<usize>,
/// Nested field access paths used for sub-field projection.
///
/// It extends and refines the top-level projection by specifying nested
/// field accesses inside complex columns such as JSON or struct columns.
///
/// In other words:
/// - `projection` determines **which root columns are needed**
/// - `nested_paths` further determines **which sub-fields inside those
/// columns are required**
///
/// Each path starts with the root column name and continues with
/// nested field names.
pub nested_paths: Vec<NestedPath>,
}
impl ProjectionInput {
pub fn new(projection: Vec<usize>) -> Self {
Self {
projection,
nested_paths: Vec::new(),
}
}
pub fn with_nested_paths(mut self, nested_paths: Vec<NestedPath>) -> Self {
self.nested_paths = nested_paths;
self
}
}
impl From<Vec<usize>> for ProjectionInput {
fn from(projection: Vec<usize>) -> Self {
Self::new(projection)
}
}
impl Display for ProjectionInput {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ProjectionInput {{ projection: {:?}, nested_paths: {:?} }}",
self.projection, self.nested_paths
)
}
}

View File

@@ -21,7 +21,7 @@ use datafusion_expr::expr::Expr;
pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
use strum::Display;
use crate::storage::{ColumnId, SequenceNumber};
use crate::storage::{ColumnId, ProjectionInput, SequenceNumber};
/// A hint for KNN vector search.
#[derive(Debug, Clone, PartialEq)]
@@ -95,9 +95,9 @@ pub enum TimeSeriesDistribution {
#[derive(Default, Clone, Debug, PartialEq)]
pub struct ScanRequest {
/// Indices of columns to read, `None` to read all columns. This indices is
/// based on table schema.
pub projection: Option<Vec<usize>>,
/// Optional projection information for the scan. `None` reads all root
/// columns.
pub projection_input: Option<ProjectionInput>,
/// Filters pushed down
pub filters: Vec<Expr>,
/// Expected output ordering. This is only a hint and isn't guaranteed.
@@ -130,6 +130,15 @@ pub struct ScanRequest {
pub vector_search: Option<VectorSearchRequest>,
}
impl ScanRequest {
/// Returns the top-level projected column indices.
pub fn projection_indices(&self) -> Option<&[usize]> {
self.projection_input
.as_ref()
.map(|projection_input| projection_input.projection.as_slice())
}
}
impl Display for ScanRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
enum Delimiter {
@@ -152,7 +161,7 @@ impl Display for ScanRequest {
let mut delimiter = Delimiter::None;
write!(f, "ScanRequest {{ ")?;
if let Some(projection) = &self.projection {
if let Some(projection) = &self.projection_input {
write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
}
if !self.filters.is_empty() {
@@ -235,8 +244,9 @@ mod tests {
};
assert_eq!(request.to_string(), "ScanRequest { }");
let projection_input = Some(vec![1, 2].into());
let request = ScanRequest {
projection: Some(vec![1, 2]),
projection_input,
filters: vec![
binary_expr(col("i"), Operator::Gt, lit(1)),
binary_expr(col("s"), Operator::Eq, lit("x")),
@@ -246,7 +256,7 @@ mod tests {
};
assert_eq!(
request.to_string(),
r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
);
let request = ScanRequest {
@@ -262,14 +272,29 @@ mod tests {
r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
);
let projection_input = Some(vec![1, 2].into());
let request = ScanRequest {
projection: Some(vec![1, 2]),
projection_input,
limit: Some(10),
..Default::default()
};
assert_eq!(
request.to_string(),
"ScanRequest { projection: [1, 2], limit: 10 }"
"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, limit: 10 }"
);
let projection_input = Some(ProjectionInput::new(vec![1, 2]).with_nested_paths(vec![
vec!["j".to_string(), "a".to_string(), "b".to_string()],
vec!["s".to_string(), "x".to_string()],
]));
let request = ScanRequest {
projection_input,
limit: Some(10),
..Default::default()
};
assert_eq!(
request.to_string(),
r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [["j", "a", "b"], ["s", "x"]] }, limit: 10 }"#
);
let request = ScanRequest {

View File

@@ -104,10 +104,11 @@ impl TableProvider for DfTableProviderAdapter {
limit: Option<usize>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let filters: Vec<Expr> = filters.iter().map(Clone::clone).collect();
let projection_input = projection.map(|p| p.clone().into());
let request = {
let mut request = self.scan_req.lock().unwrap();
request.filters = filters;
request.projection = projection.cloned();
request.projection_input = projection_input;
request.limit = limit;
request.clone()
};

View File

@@ -110,7 +110,8 @@ impl NumbersDataSource {
impl DataSource for NumbersDataSource {
fn get_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream, BoxedError> {
let projected_schema = match &request.projection {
let projection = request.projection_input.map(|input| input.projection);
let projected_schema = match &projection {
Some(projection) => Arc::new(self.schema.try_project(projection).unwrap()),
None => self.schema.clone(),
};
@@ -118,7 +119,7 @@ impl DataSource for NumbersDataSource {
limit: request.limit.unwrap_or(100) as u32,
schema: self.schema.clone(),
already_run: false,
projection: request.projection,
projection,
projected_schema,
}))
}

View File

@@ -121,10 +121,10 @@ impl DataSource for MemtableDataSource {
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let df_recordbatch = if let Some(indices) = request.projection {
let df_recordbatch = if let Some(indices) = request.projection_indices() {
self.recordbatch
.df_record_batch()
.project(&indices)
.project(indices)
.context(TableProjectionSnafu)
.map_err(BoxedError::new)?
} else {
@@ -198,8 +198,9 @@ mod test {
async fn test_scan_with_projection() {
let table = build_testing_table();
let projection_input = Some(vec![1].into());
let scan_req = ScanRequest {
projection: Some(vec![1]),
projection_input,
..Default::default()
};
let stream = table.scan_to_stream(scan_req).await.unwrap();