mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 10:20:38 +00:00
perf(mito2): optimize flat projection conversion
This commit is contained in:
@@ -21,7 +21,7 @@ use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::tracing::{Span, info_span};
|
||||
use common_telemetry::tracing::{Span, debug_span};
|
||||
use common_time::util::format_nanoseconds_human_readable;
|
||||
use datafusion::arrow::compute::cast;
|
||||
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
|
||||
@@ -247,7 +247,7 @@ impl RecordBatchStreamAdapter {
|
||||
pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result<Self> {
|
||||
let schema =
|
||||
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
|
||||
let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter");
|
||||
let subspan = debug_span!(parent: &span, "RecordBatchStreamAdapter");
|
||||
Ok(Self {
|
||||
schema,
|
||||
stream,
|
||||
@@ -301,15 +301,13 @@ impl Stream for RecordBatchStreamAdapter {
|
||||
.map(|m| m.elapsed_compute().clone())
|
||||
.unwrap_or_default();
|
||||
let _guard = timer.timer();
|
||||
let poll_span = info_span!(parent: &self.span, "poll_next");
|
||||
let poll_span = debug_span!(parent: &self.span, "poll_next");
|
||||
let _entered = poll_span.enter();
|
||||
match Pin::new(&mut self.stream).poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(df_record_batch)) => {
|
||||
let df_record_batch = df_record_batch?;
|
||||
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
|
||||
&self.metrics_2
|
||||
{
|
||||
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
|
||||
let mut metric_collector = MetricCollector::new(self.explain_verbose);
|
||||
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
|
||||
self.metrics_2 = Metrics::PartialResolved(
|
||||
@@ -462,7 +460,6 @@ fn format_bytes_human_readable(bytes: usize) -> String {
|
||||
format!("{}", ReadableSize(bytes as u64))
|
||||
}
|
||||
|
||||
/// Only display `plan_metrics` with indent ` ` (2 spaces).
|
||||
impl Display for RecordBatchMetrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
for metric in &self.plan_metrics {
|
||||
|
||||
@@ -20,14 +20,17 @@ use api::v1::SemanticType;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
|
||||
use common_recordbatch::{DfRecordBatch, RecordBatch};
|
||||
use datatypes::arrow::datatypes::Field;
|
||||
use datatypes::arrow::array::Array;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{Helper, VectorRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
|
||||
use crate::read::projection::read_column_ids_from_projection;
|
||||
use crate::sst::parquet::flat_format::sst_column_id_indices;
|
||||
@@ -63,6 +66,9 @@ pub struct FlatProjectionMapper {
|
||||
input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
|
||||
}
|
||||
|
||||
/// Max length of a repeated vector we will store in cache.
|
||||
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
|
||||
|
||||
impl FlatProjectionMapper {
|
||||
/// Returns a new mapper with projection.
|
||||
/// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
|
||||
@@ -248,12 +254,70 @@ impl FlatProjectionMapper {
|
||||
pub(crate) fn convert(
|
||||
&self,
|
||||
batch: &datatypes::arrow::record_batch::RecordBatch,
|
||||
cache_strategy: &CacheStrategy,
|
||||
) -> common_recordbatch::error::Result<RecordBatch> {
|
||||
if self.is_empty_projection {
|
||||
return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
|
||||
}
|
||||
let columns = self.project_vectors(batch)?;
|
||||
RecordBatch::new(self.output_schema.clone(), columns)
|
||||
// Construct output record batch directly from Arrow arrays to avoid
|
||||
// Arrow -> Vector -> Arrow roundtrips in the hot path.
|
||||
let mut arrays = Vec::with_capacity(self.output_schema.num_columns());
|
||||
for (output_idx, index) in self.batch_indices.iter().enumerate() {
|
||||
let mut array = batch.column(*index).clone();
|
||||
// Cast dictionary values to the target type.
|
||||
if let ArrowDataType::Dictionary(_key_type, value_type) = array.data_type() {
|
||||
// When a string dictionary column contains only a single value, reuse a cached
|
||||
// repeated vector to avoid repeatedly expanding the dictionary.
|
||||
if matches!(
|
||||
value_type.as_ref(),
|
||||
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View
|
||||
) && self.output_schema.column_schemas()[output_idx]
|
||||
.data_type
|
||||
.is_string()
|
||||
&& let Some(dict_array) = array
|
||||
.as_any()
|
||||
.downcast_ref::<datatypes::arrow::array::DictionaryArray<
|
||||
datatypes::arrow::datatypes::UInt32Type,
|
||||
>>()
|
||||
&& dict_array.values().len() == 1
|
||||
&& dict_array.null_count() == 0
|
||||
{
|
||||
let dict_values = dict_array.values();
|
||||
let value = if dict_values.is_null(0) {
|
||||
Value::Null
|
||||
} else {
|
||||
Value::from(datatypes::arrow_array::string_array_value(dict_values, 0))
|
||||
};
|
||||
|
||||
let repeated = repeated_vector_with_cache(
|
||||
&self.output_schema.column_schemas()[output_idx].data_type,
|
||||
&value,
|
||||
batch.num_rows(),
|
||||
cache_strategy,
|
||||
)?;
|
||||
array = repeated.to_arrow_array();
|
||||
} else {
|
||||
let casted = datatypes::arrow::compute::cast(&array, value_type)
|
||||
.context(ArrowComputeSnafu)?;
|
||||
array = casted;
|
||||
}
|
||||
}
|
||||
arrays.push(array);
|
||||
}
|
||||
|
||||
let df_record_batch =
|
||||
DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(common_error::ext::PlainError::new(
|
||||
e.to_string(),
|
||||
common_error::status_code::StatusCode::Internal,
|
||||
))
|
||||
})
|
||||
.context(ExternalSnafu)?;
|
||||
Ok(RecordBatch::from_df_record_batch(
|
||||
self.output_schema.clone(),
|
||||
df_record_batch,
|
||||
))
|
||||
}
|
||||
|
||||
/// Projects columns from the input batch and converts them into vectors.
|
||||
@@ -281,6 +345,43 @@ impl FlatProjectionMapper {
|
||||
}
|
||||
}
|
||||
|
||||
fn repeated_vector_with_cache(
|
||||
data_type: &ConcreteDataType,
|
||||
value: &Value,
|
||||
num_rows: usize,
|
||||
cache_strategy: &CacheStrategy,
|
||||
) -> common_recordbatch::error::Result<VectorRef> {
|
||||
if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
|
||||
// If the cached vector doesn't have enough length, create a new one.
|
||||
match vector.len().cmp(&num_rows) {
|
||||
std::cmp::Ordering::Less => {}
|
||||
std::cmp::Ordering::Equal => return Ok(vector),
|
||||
std::cmp::Ordering::Greater => return Ok(vector.slice(0, num_rows)),
|
||||
}
|
||||
}
|
||||
|
||||
let vector = new_repeated_vector(data_type, value, num_rows)?;
|
||||
if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
|
||||
cache_strategy.put_repeated_vector(value.clone(), vector.clone());
|
||||
}
|
||||
|
||||
Ok(vector)
|
||||
}
|
||||
|
||||
fn new_repeated_vector(
|
||||
data_type: &ConcreteDataType,
|
||||
value: &Value,
|
||||
num_rows: usize,
|
||||
) -> common_recordbatch::error::Result<VectorRef> {
|
||||
let mut mutable_vector = data_type.create_mutable_vector(1);
|
||||
mutable_vector
|
||||
.try_push_value_ref(&value.as_value_ref())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let base_vector = mutable_vector.to_vector();
|
||||
Ok(base_vector.replicate(&[num_rows]))
|
||||
}
|
||||
|
||||
/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
|
||||
///
|
||||
/// It adds the time index column if it doesn't present in the projection.
|
||||
|
||||
@@ -809,6 +809,7 @@ mod tests {
|
||||
.num_fields(2)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
let mapper = ProjectionMapper::all(&metadata, true).unwrap();
|
||||
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
|
||||
assert_eq!(
|
||||
@@ -823,7 +824,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+---------------------+----+----+----+----+
|
||||
| ts | k0 | k1 | v0 | v1 |
|
||||
@@ -843,6 +844,7 @@ mod tests {
|
||||
.num_fields(2)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
// Columns v1, k0
|
||||
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
|
||||
assert_eq!([4, 1], mapper.column_ids());
|
||||
@@ -856,7 +858,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+----+----+
|
||||
| v1 | k0 |
|
||||
@@ -876,6 +878,7 @@ mod tests {
|
||||
.num_fields(2)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
// Output columns v1, k0. Read also includes v0.
|
||||
let mapper = ProjectionMapper::new_with_read_columns(
|
||||
&metadata,
|
||||
@@ -887,7 +890,7 @@ mod tests {
|
||||
assert_eq!([4, 1, 3], mapper.column_ids());
|
||||
|
||||
let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+----+----+
|
||||
| v1 | k0 |
|
||||
@@ -907,6 +910,7 @@ mod tests {
|
||||
.num_fields(2)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
// Empty projection
|
||||
let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
|
||||
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
|
||||
@@ -918,7 +922,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let batch = new_flat_batch(Some(0), &[], &[], 3);
|
||||
let record_batch = flat_mapper.convert(&batch).unwrap();
|
||||
let record_batch = flat_mapper.convert(&batch, &cache).unwrap();
|
||||
assert_eq!(3, record_batch.num_rows());
|
||||
assert_eq!(0, record_batch.num_columns());
|
||||
assert!(record_batch.schema.is_empty());
|
||||
|
||||
@@ -66,24 +66,20 @@ impl ConvertBatchStream {
|
||||
}
|
||||
}
|
||||
|
||||
fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
|
||||
fn convert_into_pending(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<()> {
|
||||
match batch {
|
||||
ScanBatch::Normal(batch) => {
|
||||
// Safety: Only primary key format returns this batch.
|
||||
let mapper = self.projection_mapper.as_primary_key().unwrap();
|
||||
|
||||
if batch.is_empty() {
|
||||
Ok(mapper.empty_record_batch())
|
||||
self.pending.push_back(mapper.empty_record_batch());
|
||||
} else {
|
||||
mapper.convert(&batch, &self.cache_strategy)
|
||||
self.pending
|
||||
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
|
||||
}
|
||||
}
|
||||
ScanBatch::Series(series) => {
|
||||
debug_assert!(
|
||||
self.pending.is_empty(),
|
||||
"ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
|
||||
);
|
||||
|
||||
match series {
|
||||
SeriesBatch::PrimaryKey(primary_key_batch) => {
|
||||
// Safety: Only primary key format returns this batch.
|
||||
@@ -99,24 +95,22 @@ impl ConvertBatchStream {
|
||||
let mapper = self.projection_mapper.as_flat().unwrap();
|
||||
|
||||
for batch in flat_batch.batches {
|
||||
self.pending.push_back(mapper.convert(&batch)?);
|
||||
self.pending
|
||||
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let output_schema = self.projection_mapper.output_schema();
|
||||
Ok(self
|
||||
.pending
|
||||
.pop_front()
|
||||
.unwrap_or_else(|| RecordBatch::new_empty(output_schema)))
|
||||
}
|
||||
ScanBatch::RecordBatch(df_record_batch) => {
|
||||
// Safety: Only flat format returns this batch.
|
||||
let mapper = self.projection_mapper.as_flat().unwrap();
|
||||
|
||||
mapper.convert(&df_record_batch)
|
||||
self.pending
|
||||
.push_back(mapper.convert(&df_record_batch, &self.cache_strategy)?);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,21 +122,30 @@ impl Stream for ConvertBatchStream {
|
||||
return Poll::Ready(Some(Ok(batch)));
|
||||
}
|
||||
|
||||
let batch = futures::ready!(self.inner.poll_next_unpin(cx));
|
||||
let Some(batch) = batch else {
|
||||
return Poll::Ready(None);
|
||||
};
|
||||
loop {
|
||||
let batch = futures::ready!(self.inner.poll_next_unpin(cx));
|
||||
let Some(batch) = batch else {
|
||||
return Poll::Ready(None);
|
||||
};
|
||||
|
||||
let record_batch = match batch {
|
||||
Ok(batch) => {
|
||||
let start = Instant::now();
|
||||
let record_batch = self.convert(batch);
|
||||
self.partition_metrics
|
||||
.inc_convert_batch_cost(start.elapsed());
|
||||
record_batch
|
||||
let result = match batch {
|
||||
Ok(batch) => {
|
||||
let start = Instant::now();
|
||||
let result = self.convert_into_pending(batch);
|
||||
self.partition_metrics
|
||||
.inc_convert_batch_cost(start.elapsed());
|
||||
result
|
||||
}
|
||||
Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
|
||||
};
|
||||
|
||||
if let Err(e) = result {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
|
||||
};
|
||||
Poll::Ready(Some(record_batch))
|
||||
|
||||
if let Some(batch) = self.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(batch)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user