From 4bbf56b3c941e4070e2d1e74c0ecd5ae793161c8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 11 Mar 2026 07:35:30 +0800 Subject: [PATCH] perf(mito2): optimize flat projection conversion --- src/common/recordbatch/src/adapter.rs | 11 +-- src/mito2/src/read/flat_projection.rs | 109 +++++++++++++++++++++++++- src/mito2/src/read/projection.rs | 12 ++- src/mito2/src/read/stream.rs | 63 ++++++++------- 4 files changed, 150 insertions(+), 45 deletions(-) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index fc12d87dcf..8f9045d3ad 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use common_base::readable_size::ReadableSize; -use common_telemetry::tracing::{Span, info_span}; +use common_telemetry::tracing::{Span, debug_span}; use common_time::util::format_nanoseconds_human_readable; use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; @@ -247,7 +247,7 @@ impl RecordBatchStreamAdapter { pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result { let schema = Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); - let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter"); + let subspan = debug_span!(parent: &span, "RecordBatchStreamAdapter"); Ok(Self { schema, stream, @@ -301,15 +301,13 @@ impl Stream for RecordBatchStreamAdapter { .map(|m| m.elapsed_compute().clone()) .unwrap_or_default(); let _guard = timer.timer(); - let poll_span = info_span!(parent: &self.span, "poll_next"); + let poll_span = debug_span!(parent: &self.span, "poll_next"); let _entered = poll_span.enter(); match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Some(df_record_batch)) => { let df_record_batch = df_record_batch?; - if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) = - &self.metrics_2 - { + if let Metrics::Unresolved(df_plan) = &self.metrics_2 { let mut metric_collector = MetricCollector::new(self.explain_verbose); accept(df_plan.as_ref(), &mut metric_collector).unwrap(); self.metrics_2 = Metrics::PartialResolved( @@ -462,7 +460,6 @@ fn format_bytes_human_readable(bytes: usize) -> String { format!("{}", ReadableSize(bytes as u64)) } -/// Only display `plan_metrics` with indent ` ` (2 spaces). impl Display for RecordBatchMetrics { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for metric in &self.plan_metrics { diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 3e0f1169df..36ad162af9 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -20,14 +20,17 @@ use api::v1::SemanticType; use common_error::ext::BoxedError; use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu}; use common_recordbatch::{DfRecordBatch, RecordBatch}; -use datatypes::arrow::datatypes::Field; +use datatypes::arrow::array::Array; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; -use datatypes::vectors::Helper; +use datatypes::value::Value; +use datatypes::vectors::{Helper, VectorRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; +use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result}; use crate::read::projection::read_column_ids_from_projection; use crate::sst::parquet::flat_format::sst_column_id_indices; @@ -63,6 +66,9 @@ pub struct FlatProjectionMapper { input_arrow_schema: datatypes::arrow::datatypes::SchemaRef, } +/// Max length of a repeated vector we will store in cache. +const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; + impl FlatProjectionMapper { /// Returns a new mapper with projection. /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count. @@ -248,12 +254,70 @@ impl FlatProjectionMapper { pub(crate) fn convert( &self, batch: &datatypes::arrow::record_batch::RecordBatch, + cache_strategy: &CacheStrategy, ) -> common_recordbatch::error::Result { if self.is_empty_projection { return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows()); } - let columns = self.project_vectors(batch)?; - RecordBatch::new(self.output_schema.clone(), columns) + // Construct output record batch directly from Arrow arrays to avoid + // Arrow -> Vector -> Arrow roundtrips in the hot path. + let mut arrays = Vec::with_capacity(self.output_schema.num_columns()); + for (output_idx, index) in self.batch_indices.iter().enumerate() { + let mut array = batch.column(*index).clone(); + // Cast dictionary values to the target type. + if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() { + // When a string dictionary column contains only a single value, reuse a cached + // repeated vector to avoid repeatedly expanding the dictionary. + if matches!( + value_type.as_ref(), + ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View + ) && self.output_schema.column_schemas()[output_idx] + .data_type + .is_string() + && let Some(dict_array) = array + .as_any() + .downcast_ref::>() + && dict_array.values().len() == 1 + && dict_array.null_count() == 0 + { + let dict_values = dict_array.values(); + let value = if dict_values.is_null(0) { + Value::Null + } else { + Value::from(datatypes::arrow_array::string_array_value(dict_values, 0)) + }; + + let repeated = repeated_vector_with_cache( + &self.output_schema.column_schemas()[output_idx].data_type, + &value, + batch.num_rows(), + cache_strategy, + )?; + array = repeated.to_arrow_array(); + } else { + let casted = datatypes::arrow::compute::cast(&array, value_type) + .context(ArrowComputeSnafu)?; + array = casted; + } + } + arrays.push(array); + } + + let df_record_batch = + DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays) + .map_err(|e| { + BoxedError::new(common_error::ext::PlainError::new( + e.to_string(), + common_error::status_code::StatusCode::Internal, + )) + }) + .context(ExternalSnafu)?; + Ok(RecordBatch::from_df_record_batch( + self.output_schema.clone(), + df_record_batch, + )) } /// Projects columns from the input batch and converts them into vectors. @@ -281,6 +345,43 @@ impl FlatProjectionMapper { } } +fn repeated_vector_with_cache( + data_type: &ConcreteDataType, + value: &Value, + num_rows: usize, + cache_strategy: &CacheStrategy, +) -> common_recordbatch::error::Result { + if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) { + // If the cached vector doesn't have enough length, create a new one. + match vector.len().cmp(&num_rows) { + std::cmp::Ordering::Less => {} + std::cmp::Ordering::Equal => return Ok(vector), + std::cmp::Ordering::Greater => return Ok(vector.slice(0, num_rows)), + } + } + + let vector = new_repeated_vector(data_type, value, num_rows)?; + if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE { + cache_strategy.put_repeated_vector(value.clone(), vector.clone()); + } + + Ok(vector) +} + +fn new_repeated_vector( + data_type: &ConcreteDataType, + value: &Value, + num_rows: usize, +) -> common_recordbatch::error::Result { + let mut mutable_vector = data_type.create_mutable_vector(1); + mutable_vector + .try_push_value_ref(&value.as_value_ref()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let base_vector = mutable_vector.to_vector(); + Ok(base_vector.replicate(&[num_rows])) +} + /// Returns ids and datatypes of columns of the output batch after applying the `projection`. /// /// It adds the time index column if it doesn't present in the projection. diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 2c000e7bdc..c7d5c4384c 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -809,6 +809,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; let mapper = ProjectionMapper::all(&metadata, true).unwrap(); assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!( @@ -823,7 +824,7 @@ mod tests { ); let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); let expect = "\ +---------------------+----+----+----+----+ | ts | k0 | k1 | v0 | v1 | @@ -843,6 +844,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; // Columns v1, k0 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap(); assert_eq!([4, 1], mapper.column_ids()); @@ -856,7 +858,7 @@ mod tests { ); let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | @@ -876,6 +878,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; // Output columns v1, k0. Read also includes v0. let mapper = ProjectionMapper::new_with_read_columns( &metadata, @@ -887,7 +890,7 @@ mod tests { assert_eq!([4, 1, 3], mapper.column_ids()); let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | @@ -907,6 +910,7 @@ mod tests { .num_fields(2) .build(), ); + let cache = CacheStrategy::Disabled; // Empty projection let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap(); assert_eq!([0], mapper.column_ids()); // Should still read the time index column @@ -918,7 +922,7 @@ mod tests { ); let batch = new_flat_batch(Some(0), &[], &[], 3); - let record_batch = flat_mapper.convert(&batch).unwrap(); + let record_batch = flat_mapper.convert(&batch, &cache).unwrap(); assert_eq!(3, record_batch.num_rows()); assert_eq!(0, record_batch.num_columns()); assert!(record_batch.schema.is_empty()); diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index dd85616241..7b74da2eab 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -66,24 +66,20 @@ impl ConvertBatchStream { } } - fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result { + fn convert_into_pending(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<()> { match batch { ScanBatch::Normal(batch) => { // Safety: Only primary key format returns this batch. let mapper = self.projection_mapper.as_primary_key().unwrap(); if batch.is_empty() { - Ok(mapper.empty_record_batch()) + self.pending.push_back(mapper.empty_record_batch()); } else { - mapper.convert(&batch, &self.cache_strategy) + self.pending + .push_back(mapper.convert(&batch, &self.cache_strategy)?); } } ScanBatch::Series(series) => { - debug_assert!( - self.pending.is_empty(), - "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist" - ); - match series { SeriesBatch::PrimaryKey(primary_key_batch) => { // Safety: Only primary key format returns this batch. @@ -99,24 +95,22 @@ impl ConvertBatchStream { let mapper = self.projection_mapper.as_flat().unwrap(); for batch in flat_batch.batches { - self.pending.push_back(mapper.convert(&batch)?); + self.pending + .push_back(mapper.convert(&batch, &self.cache_strategy)?); } } } - - let output_schema = self.projection_mapper.output_schema(); - Ok(self - .pending - .pop_front() - .unwrap_or_else(|| RecordBatch::new_empty(output_schema))) } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch. let mapper = self.projection_mapper.as_flat().unwrap(); - mapper.convert(&df_record_batch) + self.pending + .push_back(mapper.convert(&df_record_batch, &self.cache_strategy)?); } } + + Ok(()) } } @@ -128,21 +122,30 @@ impl Stream for ConvertBatchStream { return Poll::Ready(Some(Ok(batch))); } - let batch = futures::ready!(self.inner.poll_next_unpin(cx)); - let Some(batch) = batch else { - return Poll::Ready(None); - }; + loop { + let batch = futures::ready!(self.inner.poll_next_unpin(cx)); + let Some(batch) = batch else { + return Poll::Ready(None); + }; - let record_batch = match batch { - Ok(batch) => { - let start = Instant::now(); - let record_batch = self.convert(batch); - self.partition_metrics - .inc_convert_batch_cost(start.elapsed()); - record_batch + let result = match batch { + Ok(batch) => { + let start = Instant::now(); + let result = self.convert_into_pending(batch); + self.partition_metrics + .inc_convert_batch_cost(start.elapsed()); + result + } + Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu), + }; + + if let Err(e) = result { + return Poll::Ready(Some(Err(e))); } - Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu), - }; - Poll::Ready(Some(record_batch)) + + if let Some(batch) = self.pending.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + } } }