From ac6aeb319ece2cdff25ece3faeb7016b3f6d6a5b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 20 Mar 2026 06:25:02 +0800 Subject: [PATCH] apply gemini's sugg Signed-off-by: Ruihang Xia --- src/mito2/src/read/flat_projection.rs | 54 ++------------------------- src/mito2/src/read/projection.rs | 11 +++--- 2 files changed, 9 insertions(+), 56 deletions(-) diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 36ad162af9..93831e870f 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -18,21 +18,21 @@ use std::sync::Arc; use api::v1::SemanticType; use common_error::ext::BoxedError; -use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu}; +use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu}; use common_recordbatch::{DfRecordBatch, RecordBatch}; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::Value; -use datatypes::vectors::{Helper, VectorRef}; +use datatypes::vectors::Helper; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result}; -use crate::read::projection::read_column_ids_from_projection; +use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache}; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; use crate::sst::{ @@ -66,9 +66,6 @@ pub struct FlatProjectionMapper { input_arrow_schema: datatypes::arrow::datatypes::SchemaRef, } -/// Max length of a repeated vector we will store in cache. -const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; - impl FlatProjectionMapper { /// Returns a new mapper with projection. /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count. @@ -307,13 +304,7 @@ impl FlatProjectionMapper { let df_record_batch = DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays) - .map_err(|e| { - BoxedError::new(common_error::ext::PlainError::new( - e.to_string(), - common_error::status_code::StatusCode::Internal, - )) - }) - .context(ExternalSnafu)?; + .context(NewDfRecordBatchSnafu)?; Ok(RecordBatch::from_df_record_batch( self.output_schema.clone(), df_record_batch, @@ -345,43 +336,6 @@ impl FlatProjectionMapper { } } -fn repeated_vector_with_cache( - data_type: &ConcreteDataType, - value: &Value, - num_rows: usize, - cache_strategy: &CacheStrategy, -) -> common_recordbatch::error::Result { - if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) { - // If the cached vector doesn't have enough length, create a new one. - match vector.len().cmp(&num_rows) { - std::cmp::Ordering::Less => {} - std::cmp::Ordering::Equal => return Ok(vector), - std::cmp::Ordering::Greater => return Ok(vector.slice(0, num_rows)), - } - } - - let vector = new_repeated_vector(data_type, value, num_rows)?; - if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE { - cache_strategy.put_repeated_vector(value.clone(), vector.clone()); - } - - Ok(vector) -} - -fn new_repeated_vector( - data_type: &ConcreteDataType, - value: &Value, - num_rows: usize, -) -> common_recordbatch::error::Result { - let mut mutable_vector = data_type.create_mutable_vector(1); - mutable_vector - .try_push_value_ref(&value.as_value_ref()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let base_vector = mutable_vector.to_vector(); - Ok(base_vector.replicate(&[num_rows])) -} - /// Returns ids and datatypes of columns of the output batch after applying the `projection`. /// /// It adds the time index column if it doesn't present in the projection. diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index c7d5c4384c..b5b6904521 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use api::v1::SemanticType; use common_error::ext::BoxedError; use common_recordbatch::RecordBatch; -use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu}; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::Value; @@ -37,7 +37,7 @@ use crate::read::Batch; use crate::read::flat_projection::FlatProjectionMapper; /// Only cache vector when its length `<=` this value. -const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; +pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; /// Wrapper enum for different projection mapper implementations. pub enum ProjectionMapper { @@ -423,7 +423,7 @@ enum BatchIndex { } /// Gets a vector with repeated values from specific cache or creates a new one. -fn repeated_vector_with_cache( +pub(crate) fn repeated_vector_with_cache( data_type: &ConcreteDataType, value: &Value, num_rows: usize, @@ -450,7 +450,7 @@ fn repeated_vector_with_cache( } /// Returns a vector with repeated values. -fn new_repeated_vector( +pub(crate) fn new_repeated_vector( data_type: &ConcreteDataType, value: &Value, num_rows: usize, @@ -458,8 +458,7 @@ fn new_repeated_vector( let mut mutable_vector = data_type.create_mutable_vector(1); mutable_vector .try_push_value_ref(&value.as_value_ref()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; + .context(DataTypesSnafu)?; // This requires an additional allocation. let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows]))