From f999d5e70e3076d7c45223613a7d6465bfa07c3e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 24 Mar 2026 08:11:37 +0800 Subject: [PATCH] feat: avoid some vector-array conversions on flat projection (#7804) * perf(mito2): optimize flat projection conversion * shrink the diff size Signed-off-by: Ruihang Xia * apply gemini's sugg Signed-off-by: Ruihang Xia * nit Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/read/flat_projection.rs | 78 +++++++++++++++++++++++++-- src/mito2/src/read/projection.rs | 23 ++++---- src/mito2/src/read/stream.rs | 5 +- 3 files changed, 89 insertions(+), 17 deletions(-) diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 3e0f1169df..02b4c6b3c1 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -18,18 +18,21 @@ use std::sync::Arc; use api::v1::SemanticType; use common_error::ext::BoxedError; -use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu}; +use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu}; use common_recordbatch::{DfRecordBatch, RecordBatch}; -use datatypes::arrow::datatypes::Field; +use datatypes::arrow::array::Array; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::Helper; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; +use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result}; -use crate::read::projection::read_column_ids_from_projection; +use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache}; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; use crate::sst::{ @@ -248,12 +251,55 @@ impl FlatProjectionMapper { pub(crate) fn convert( &self, batch: &datatypes::arrow::record_batch::RecordBatch, + cache_strategy: &CacheStrategy, ) -> common_recordbatch::error::Result { if self.is_empty_projection { return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows()); } - let columns = self.project_vectors(batch)?; - RecordBatch::new(self.output_schema.clone(), columns) + // Construct output record batch directly from Arrow arrays to avoid + // Arrow -> Vector -> Arrow roundtrips in the hot path. + let mut arrays = Vec::with_capacity(self.output_schema.num_columns()); + for (output_idx, index) in self.batch_indices.iter().enumerate() { + let mut array = batch.column(*index).clone(); + // Cast dictionary values to the target type. + if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() { + // When a string dictionary column contains only a single value, reuse a cached + // repeated vector to avoid repeatedly expanding the dictionary. + if let Some(dict_array) = single_value_string_dictionary( + &array, + &self.output_schema.column_schemas()[output_idx].data_type, + value_type.as_ref(), + ) { + let dict_values = dict_array.values(); + let value = if dict_values.is_null(0) { + Value::Null + } else { + Value::from(datatypes::arrow_array::string_array_value(dict_values, 0)) + }; + + let repeated = repeated_vector_with_cache( + &self.output_schema.column_schemas()[output_idx].data_type, + &value, + batch.num_rows(), + cache_strategy, + )?; + array = repeated.to_arrow_array(); + } else { + let casted = datatypes::arrow::compute::cast(&array, value_type) + .context(ArrowComputeSnafu)?; + array = casted; + } + } + arrays.push(array); + } + + let df_record_batch = + DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays) + .context(NewDfRecordBatchSnafu)?; + Ok(RecordBatch::from_df_record_batch( + self.output_schema.clone(), + df_record_batch, + )) } /// Projects columns from the input batch and converts them into vectors. @@ -281,6 +327,28 @@ impl FlatProjectionMapper { } } +fn single_value_string_dictionary<'a>( + array: &'a Arc, + output_type: &ConcreteDataType, + value_type: &ArrowDataType, +) -> Option<&'a datatypes::arrow::array::DictionaryArray> { + if !matches!( + value_type, + ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View + ) || !output_type.is_string() + { + return None; + } + + let dict_array = array + .as_any() + .downcast_ref::>()?; + + (dict_array.values().len() == 1 && dict_array.null_count() == 0).then_some(dict_array) +} + /// Returns ids and datatypes of columns of the output batch after applying the `projection`. /// /// It adds the time index column if it doesn't present in the projection. diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 2c000e7bdc..b5b6904521 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use api::v1::SemanticType; use common_error::ext::BoxedError; use common_recordbatch::RecordBatch; -use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu}; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::Value; @@ -37,7 +37,7 @@ use crate::read::Batch; use crate::read::flat_projection::FlatProjectionMapper; /// Only cache vector when its length `<=` this value. -const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; +pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; /// Wrapper enum for different projection mapper implementations. pub enum ProjectionMapper { @@ -423,7 +423,7 @@ enum BatchIndex { } /// Gets a vector with repeated values from specific cache or creates a new one. -fn repeated_vector_with_cache( +pub(crate) fn repeated_vector_with_cache( data_type: &ConcreteDataType, value: &Value, num_rows: usize, @@ -450,7 +450,7 @@ fn repeated_vector_with_cache( } /// Returns a vector with repeated values. -fn new_repeated_vector( +pub(crate) fn new_repeated_vector( data_type: &ConcreteDataType, value: &Value, num_rows: usize, @@ -458,8 +458,7 @@ fn new_repeated_vector( let mut mutable_vector = data_type.create_mutable_vector(1); mutable_vector .try_push_value_ref(&value.as_value_ref()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; + .context(DataTypesSnafu)?; // This requires an additional allocation. let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) @@ -809,6 +808,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; let mapper = ProjectionMapper::all(&metadata, true).unwrap(); assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!( @@ -823,7 +823,7 @@ mod tests { ); let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); let expect = "\ +---------------------+----+----+----+----+ | ts | k0 | k1 | v0 | v1 | @@ -843,6 +843,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; // Columns v1, k0 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap(); assert_eq!([4, 1], mapper.column_ids()); @@ -856,7 +857,7 @@ mod tests { ); let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | @@ -876,6 +877,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; // Output columns v1, k0. Read also includes v0. let mapper = ProjectionMapper::new_with_read_columns( &metadata, @@ -887,7 +889,7 @@ mod tests { assert_eq!([4, 1, 3], mapper.column_ids()); let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | @@ -907,6 +909,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; // Empty projection let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap(); assert_eq!([0], mapper.column_ids()); // Should still read the time index column @@ -918,7 +921,7 @@ mod tests { ); let batch = new_flat_batch(Some(0), &[], &[], 3); - let record_batch = flat_mapper.convert(&batch).unwrap(); + let record_batch = flat_mapper.convert(&batch, &cache).unwrap(); assert_eq!(3, record_batch.num_rows()); assert_eq!(0, record_batch.num_columns()); assert!(record_batch.schema.is_empty()); diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index dd85616241..80002147ea 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -99,7 +99,8 @@ impl ConvertBatchStream { let mapper = self.projection_mapper.as_flat().unwrap(); for batch in flat_batch.batches { - self.pending.push_back(mapper.convert(&batch)?); + self.pending + .push_back(mapper.convert(&batch, &self.cache_strategy)?); } } } @@ -114,7 +115,7 @@ impl ConvertBatchStream { // Safety: Only flat format returns this batch. let mapper = self.projection_mapper.as_flat().unwrap(); - mapper.convert(&df_record_batch) + mapper.convert(&df_record_batch, &self.cache_strategy) } } }