feat: avoid some vector-array conversions on flat projection (#7804)

* perf(mito2): optimize flat projection conversion

* shrink the diff size

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply gemini's sugg

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* nit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-24 08:11:37 +08:00
committed by GitHub
parent 7874282089
commit f999d5e70e
3 changed files with 89 additions and 17 deletions

View File

@@ -18,18 +18,21 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::arrow::datatypes::Field;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::Helper;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
use crate::read::projection::read_column_ids_from_projection;
use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
@@ -248,12 +251,55 @@ impl FlatProjectionMapper {
pub(crate) fn convert(
&self,
batch: &datatypes::arrow::record_batch::RecordBatch,
cache_strategy: &CacheStrategy,
) -> common_recordbatch::error::Result<RecordBatch> {
if self.is_empty_projection {
return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
}
let columns = self.project_vectors(batch)?;
RecordBatch::new(self.output_schema.clone(), columns)
// Construct output record batch directly from Arrow arrays to avoid
// Arrow -> Vector -> Arrow roundtrips in the hot path.
let mut arrays = Vec::with_capacity(self.output_schema.num_columns());
for (output_idx, index) in self.batch_indices.iter().enumerate() {
let mut array = batch.column(*index).clone();
// Cast dictionary values to the target type.
if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() {
// When a string dictionary column contains only a single value, reuse a cached
// repeated vector to avoid repeatedly expanding the dictionary.
if let Some(dict_array) = single_value_string_dictionary(
&array,
&self.output_schema.column_schemas()[output_idx].data_type,
value_type.as_ref(),
) {
let dict_values = dict_array.values();
let value = if dict_values.is_null(0) {
Value::Null
} else {
Value::from(datatypes::arrow_array::string_array_value(dict_values, 0))
};
let repeated = repeated_vector_with_cache(
&self.output_schema.column_schemas()[output_idx].data_type,
&value,
batch.num_rows(),
cache_strategy,
)?;
array = repeated.to_arrow_array();
} else {
let casted = datatypes::arrow::compute::cast(&array, value_type)
.context(ArrowComputeSnafu)?;
array = casted;
}
}
arrays.push(array);
}
let df_record_batch =
DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
.context(NewDfRecordBatchSnafu)?;
Ok(RecordBatch::from_df_record_batch(
self.output_schema.clone(),
df_record_batch,
))
}
/// Projects columns from the input batch and converts them into vectors.
@@ -281,6 +327,28 @@ impl FlatProjectionMapper {
}
}
fn single_value_string_dictionary<'a>(
array: &'a Arc<dyn Array>,
output_type: &ConcreteDataType,
value_type: &ArrowDataType,
) -> Option<&'a datatypes::arrow::array::DictionaryArray<datatypes::arrow::datatypes::UInt32Type>> {
if !matches!(
value_type,
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
) || !output_type.is_string()
{
return None;
}
let dict_array = array
.as_any()
.downcast_ref::<datatypes::arrow::array::DictionaryArray<
datatypes::arrow::datatypes::UInt32Type,
>>()?;
(dict_array.values().len() == 1 && dict_array.null_count() == 0).then_some(dict_array)
}
/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
///
/// It adds the time index column if it doesn't present in the projection.

View File

@@ -21,7 +21,7 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::RecordBatch;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu};
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
@@ -37,7 +37,7 @@ use crate::read::Batch;
use crate::read::flat_projection::FlatProjectionMapper;
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
/// Wrapper enum for different projection mapper implementations.
pub enum ProjectionMapper {
@@ -423,7 +423,7 @@ enum BatchIndex {
}
/// Gets a vector with repeated values from specific cache or creates a new one.
fn repeated_vector_with_cache(
pub(crate) fn repeated_vector_with_cache(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
@@ -450,7 +450,7 @@ fn repeated_vector_with_cache(
}
/// Returns a vector with repeated values.
fn new_repeated_vector(
pub(crate) fn new_repeated_vector(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
@@ -458,8 +458,7 @@ fn new_repeated_vector(
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector
.try_push_value_ref(&value.as_value_ref())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
.context(DataTypesSnafu)?;
// This requires an additional allocation.
let base_vector = mutable_vector.to_vector();
Ok(base_vector.replicate(&[num_rows]))
@@ -809,6 +808,7 @@ mod tests {
.num_fields(2)
.build(),
);
let cache = CacheStrategy::Disabled;
let mapper = ProjectionMapper::all(&metadata, true).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!(
@@ -823,7 +823,7 @@ mod tests {
);
let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
@@ -843,6 +843,7 @@ mod tests {
.num_fields(2)
.build(),
);
let cache = CacheStrategy::Disabled;
// Columns v1, k0
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
assert_eq!([4, 1], mapper.column_ids());
@@ -856,7 +857,7 @@ mod tests {
);
let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
let expect = "\
+----+----+
| v1 | k0 |
@@ -876,6 +877,7 @@ mod tests {
.num_fields(2)
.build(),
);
let cache = CacheStrategy::Disabled;
// Output columns v1, k0. Read also includes v0.
let mapper = ProjectionMapper::new_with_read_columns(
&metadata,
@@ -887,7 +889,7 @@ mod tests {
assert_eq!([4, 1, 3], mapper.column_ids());
let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
let expect = "\
+----+----+
| v1 | k0 |
@@ -907,6 +909,7 @@ mod tests {
.num_fields(2)
.build(),
);
let cache = CacheStrategy::Disabled;
// Empty projection
let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
@@ -918,7 +921,7 @@ mod tests {
);
let batch = new_flat_batch(Some(0), &[], &[], 3);
let record_batch = flat_mapper.convert(&batch).unwrap();
let record_batch = flat_mapper.convert(&batch, &cache).unwrap();
assert_eq!(3, record_batch.num_rows());
assert_eq!(0, record_batch.num_columns());
assert!(record_batch.schema.is_empty());

View File

@@ -99,7 +99,8 @@ impl ConvertBatchStream {
let mapper = self.projection_mapper.as_flat().unwrap();
for batch in flat_batch.batches {
self.pending.push_back(mapper.convert(&batch)?);
self.pending
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
}
}
}
@@ -114,7 +115,7 @@ impl ConvertBatchStream {
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
mapper.convert(&df_record_batch)
mapper.convert(&df_record_batch, &self.cache_strategy)
}
}
}