From 5e65581f94d35fe16b77e1b549f4bc7d674e8f77 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 11 Sep 2025 14:12:53 +0800 Subject: [PATCH] feat: support flat format for SeriesScan (#6938) * feat: Support flat format for SeriesScan Signed-off-by: evenyag * test: simplify tests Signed-off-by: evenyag * chore: update comment Signed-off-by: evenyag * chore: only accumulate fetch time to scan_cost in SeriesDistributor of the SeriesScan Signed-off-by: evenyag * chore: update comment Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/read/series_scan.rs | 494 +++++++++++++++++++++++++++++- src/mito2/src/read/stream.rs | 55 ++-- 2 files changed, 513 insertions(+), 36 deletions(-) diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index b8081bb2e9..3de81b6d0b 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -24,8 +24,10 @@ use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; +use datatypes::arrow::array::BinaryArray; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::SchemaRef; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use smallvec::{SmallVec, smallvec}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::RegionMetadataRef; @@ -43,9 +45,11 @@ use crate::error::{ use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics}; -use crate::read::seq_scan::{SeqScan, build_sources}; +use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources}; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; use crate::read::{Batch, ScannerMetrics}; +use crate::sst::parquet::flat_format::primary_key_column_index; +use crate::sst::parquet::format::PrimaryKeyArray; /// Timeout to send a batch to a sender. const SEND_TIMEOUT: Duration = Duration::from_millis(10); @@ -155,8 +159,8 @@ impl SeriesScan { metrics.scan_cost += fetch_start.elapsed(); fetch_start = Instant::now(); - metrics.num_batches += series.batches.len(); - metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::(); + metrics.num_batches += series.num_batches(); + metrics.num_rows += series.num_rows(); let yield_start = Instant::now(); yield ScanBatch::Series(series); @@ -377,11 +381,101 @@ struct SeriesDistributor { impl SeriesDistributor { /// Executes the distributor. async fn execute(&mut self) { - if let Err(e) = self.scan_partitions().await { + let result = if self.stream_ctx.input.flat_format { + self.scan_partitions_flat().await + } else { + self.scan_partitions().await + }; + + if let Err(e) = result { self.senders.send_error(e).await; } } + /// Scans all parts in flat format using FlatSeriesBatchDivider. + async fn scan_partitions_flat(&mut self) -> Result<()> { + let part_metrics = new_partition_metrics( + &self.stream_ctx, + false, + &self.metrics_set, + self.partitions.len(), + &self.metrics_list, + ); + part_metrics.on_first_poll(); + + let range_builder_list = Arc::new(RangeBuilderList::new( + self.stream_ctx.input.num_memtables(), + self.stream_ctx.input.num_files(), + )); + // Scans all parts. + let mut sources = Vec::with_capacity(self.partitions.len()); + for partition in &self.partitions { + sources.reserve(partition.len()); + for part_range in partition { + build_flat_sources( + &self.stream_ctx, + part_range, + false, + &part_metrics, + range_builder_list.clone(), + &mut sources, + ) + .await?; + } + } + + // Builds a flat reader that merge sources from all parts. + let mut reader = SeqScan::build_flat_reader_from_sources( + &self.stream_ctx, + sources, + self.semaphore.clone(), + ) + .await?; + let mut metrics = SeriesDistributorMetrics::default(); + let mut fetch_start = Instant::now(); + + let mut divider = FlatSeriesBatchDivider::default(); + while let Some(record_batch) = reader.try_next().await? { + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += record_batch.num_rows(); + + debug_assert!(record_batch.num_rows() > 0); + if record_batch.num_rows() == 0 { + fetch_start = Instant::now(); + continue; + } + + // Use divider to split series + if let Some(series_batch) = divider.push(record_batch) { + let yield_start = Instant::now(); + self.senders + .send_batch(SeriesBatch::Flat(series_batch)) + .await?; + metrics.yield_cost += yield_start.elapsed(); + } + fetch_start = Instant::now(); + } + + // Send any remaining batch in the divider + if let Some(series_batch) = divider.finish() { + let yield_start = Instant::now(); + self.senders + .send_batch(SeriesBatch::Flat(series_batch)) + .await?; + metrics.yield_cost += yield_start.elapsed(); + } + + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_series_send_timeout = self.senders.num_timeout; + metrics.num_series_send_full = self.senders.num_full; + part_metrics.set_distributor_metrics(&metrics); + + part_metrics.on_finish(); + + Ok(()) + } + /// Scans all parts. async fn scan_partitions(&mut self) -> Result<()> { let part_metrics = new_partition_metrics( @@ -421,38 +515,46 @@ impl SeriesDistributor { let mut metrics = SeriesDistributorMetrics::default(); let mut fetch_start = Instant::now(); - let mut current_series = SeriesBatch::default(); + let mut current_series = PrimaryKeySeriesBatch::default(); while let Some(batch) = reader.next_batch().await? { metrics.scan_cost += fetch_start.elapsed(); - fetch_start = Instant::now(); metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); debug_assert!(!batch.is_empty()); if batch.is_empty() { + fetch_start = Instant::now(); continue; } let Some(last_key) = current_series.current_key() else { current_series.push(batch); + fetch_start = Instant::now(); continue; }; if last_key == batch.primary_key() { current_series.push(batch); + fetch_start = Instant::now(); continue; } // We find a new series, send the current one. - let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch)); + let to_send = + std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch)); let yield_start = Instant::now(); - self.senders.send_batch(to_send).await?; + self.senders + .send_batch(SeriesBatch::PrimaryKey(to_send)) + .await?; metrics.yield_cost += yield_start.elapsed(); + fetch_start = Instant::now(); } if !current_series.is_empty() { let yield_start = Instant::now(); - self.senders.send_batch(current_series).await?; + self.senders + .send_batch(SeriesBatch::PrimaryKey(current_series)) + .await?; metrics.yield_cost += yield_start.elapsed(); } @@ -467,14 +569,14 @@ impl SeriesDistributor { } } -/// Batches of the same series. -#[derive(Default)] -pub struct SeriesBatch { +/// Batches of the same series in primary key format. +#[derive(Default, Debug)] +pub struct PrimaryKeySeriesBatch { pub batches: SmallVec<[Batch; 4]>, } -impl SeriesBatch { - /// Creates a new [SeriesBatch] from a single [Batch]. +impl PrimaryKeySeriesBatch { + /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch]. fn single(batch: Batch) -> Self { Self { batches: smallvec![batch], @@ -495,6 +597,39 @@ impl SeriesBatch { } } +/// Batches of the same series. +#[derive(Debug)] +pub enum SeriesBatch { + PrimaryKey(PrimaryKeySeriesBatch), + Flat(FlatSeriesBatch), +} + +impl SeriesBatch { + /// Returns the number of batches. + pub fn num_batches(&self) -> usize { + match self { + SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(), + SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(), + } + } + + /// Returns the total number of rows across all batches. + pub fn num_rows(&self) -> usize { + match self { + SeriesBatch::PrimaryKey(primary_key_batch) => { + primary_key_batch.batches.iter().map(|x| x.num_rows()).sum() + } + SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(), + } + } +} + +/// Batches of the same series in flat format. +#[derive(Default, Debug)] +pub struct FlatSeriesBatch { + pub batches: SmallVec<[RecordBatch; 4]>, +} + /// List of senders. struct SenderList { senders: Vec>>>, @@ -627,3 +762,332 @@ fn new_partition_metrics( metrics_list.set(partition, metrics.clone()); metrics } + +/// A divider to split flat record batches by time series. +/// +/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch]. +/// However, a [FlatSeriesBatch] may contain rows from multiple series. +#[derive(Default)] +struct FlatSeriesBatchDivider { + buffer: FlatSeriesBatch, +} + +impl FlatSeriesBatchDivider { + /// Pushes a record batch into the divider. + /// + /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it. + fn push(&mut self, batch: RecordBatch) -> Option { + // If buffer is empty + if self.buffer.batches.is_empty() { + self.buffer.batches.push(batch); + return None; + } + + // Gets the primary key column from the incoming batch. + let pk_column_idx = primary_key_column_index(batch.num_columns()); + let batch_pk_column = batch.column(pk_column_idx); + let batch_pk_array = batch_pk_column + .as_any() + .downcast_ref::() + .unwrap(); + let batch_pk_values = batch_pk_array + .values() + .as_any() + .downcast_ref::() + .unwrap(); + // Gets the last primary key of the incoming batch. + let batch_last_pk = + primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1); + // Gets the last primary key of the buffer. + // Safety: the buffer is not empty. + let buffer_last_batch = self.buffer.batches.last().unwrap(); + let buffer_pk_column = buffer_last_batch.column(pk_column_idx); + let buffer_pk_array = buffer_pk_column + .as_any() + .downcast_ref::() + .unwrap(); + let buffer_pk_values = buffer_pk_array + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let buffer_last_pk = + primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1); + + // If last primary key in the batch is the same as last primary key in the buffer. + if batch_last_pk == buffer_last_pk { + self.buffer.batches.push(batch); + return None; + } + // Otherwise, the batch must have a different primary key, we find the first offset of the + // changed primary key. + let batch_pk_keys = batch_pk_array.keys(); + let pk_indices = batch_pk_keys.values(); + let mut change_offset = 0; + for (i, &key) in pk_indices.iter().enumerate() { + let batch_pk = batch_pk_values.value(key as usize); + + if buffer_last_pk != batch_pk { + change_offset = i; + break; + } + } + + // Splits the batch at the change offset + let (first_part, remaining_part) = if change_offset > 0 { + let first_part = batch.slice(0, change_offset); + let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset); + (Some(first_part), Some(remaining_part)) + } else { + (None, Some(batch)) + }; + + // Creates the result from current buffer + first part of new batch + let mut result = std::mem::take(&mut self.buffer); + if let Some(first_part) = first_part { + result.batches.push(first_part); + } + + // Pushes remaining part to the buffer if it exists + if let Some(remaining_part) = remaining_part { + self.buffer.batches.push(remaining_part); + } + + Some(result) + } + + /// Returns the final [FlatSeriesBatch]. + fn finish(&mut self) -> Option { + if self.buffer.batches.is_empty() { + None + } else { + Some(std::mem::take(&mut self.buffer)) + } + } +} + +/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray]. +fn primary_key_at<'a>( + primary_key: &PrimaryKeyArray, + primary_key_values: &'a BinaryArray, + index: usize, +) -> &'a [u8] { + let key = primary_key.keys().value(index); + primary_key_values.value(key as usize) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::OpType; + use datatypes::arrow::array::{ + ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder, + TimestampMillisecondArray, UInt8Array, UInt64Array, + }; + use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type}; + use datatypes::arrow::record_batch::RecordBatch; + + use super::*; + + fn new_test_record_batch( + primary_keys: &[&[u8]], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + fields: &[u64], + ) -> RecordBatch { + let num_rows = timestamps.len(); + debug_assert_eq!(sequences.len(), num_rows); + debug_assert_eq!(op_types.len(), num_rows); + debug_assert_eq!(fields.len(), num_rows); + debug_assert_eq!(primary_keys.len(), num_rows); + + let columns: Vec = vec![ + build_test_pk_string_dict_array(primary_keys), + Arc::new(Int64Array::from_iter( + fields.iter().map(|v| Some(*v as i64)), + )), + Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + )), + build_test_pk_array(primary_keys), + Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())), + Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + )), + ]; + + RecordBatch::try_new(build_test_flat_schema(), columns).unwrap() + } + + fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef { + let mut builder = StringDictionaryBuilder::::new(); + for &pk in primary_keys { + let pk_str = std::str::from_utf8(pk).unwrap(); + builder.append(pk_str).unwrap(); + } + Arc::new(builder.finish()) + } + + fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef { + let mut builder = BinaryDictionaryBuilder::::new(); + for &pk in primary_keys { + builder.append(pk).unwrap(); + } + Arc::new(builder.finish()) + } + + fn build_test_flat_schema() -> SchemaRef { + let fields = vec![ + Field::new( + "k0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + ), + Field::new("field0", DataType::Int64, true), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), + false, + ), + Field::new("__sequence", DataType::UInt64, false), + Field::new("__op_type", DataType::UInt8, false), + ]; + Arc::new(Schema::new(fields)) + } + + #[test] + fn test_empty_buffer_first_push() { + let mut divider = FlatSeriesBatchDivider::default(); + let result = divider.finish(); + assert!(result.is_none()); + + let mut divider = FlatSeriesBatchDivider::default(); + let batch = new_test_record_batch( + &[b"series1", b"series1"], + &[1000, 2000], + &[1, 2], + &[OpType::Put, OpType::Put], + &[10, 20], + ); + let result = divider.push(batch); + assert!(result.is_none()); + assert_eq!(divider.buffer.batches.len(), 1); + } + + #[test] + fn test_same_series_accumulation() { + let mut divider = FlatSeriesBatchDivider::default(); + + let batch1 = new_test_record_batch( + &[b"series1", b"series1"], + &[1000, 2000], + &[1, 2], + &[OpType::Put, OpType::Put], + &[10, 20], + ); + + let batch2 = new_test_record_batch( + &[b"series1", b"series1"], + &[3000, 4000], + &[3, 4], + &[OpType::Put, OpType::Put], + &[30, 40], + ); + + divider.push(batch1); + let result = divider.push(batch2); + assert!(result.is_none()); + let series_batch = divider.finish().unwrap(); + assert_eq!(series_batch.batches.len(), 2); + } + + #[test] + fn test_series_boundary_detection() { + let mut divider = FlatSeriesBatchDivider::default(); + + let batch1 = new_test_record_batch( + &[b"series1", b"series1"], + &[1000, 2000], + &[1, 2], + &[OpType::Put, OpType::Put], + &[10, 20], + ); + + let batch2 = new_test_record_batch( + &[b"series2", b"series2"], + &[3000, 4000], + &[3, 4], + &[OpType::Put, OpType::Put], + &[30, 40], + ); + + divider.push(batch1); + let series_batch = divider.push(batch2).unwrap(); + assert_eq!(series_batch.batches.len(), 1); + + assert_eq!(divider.buffer.batches.len(), 1); + } + + #[test] + fn test_series_boundary_within_batch() { + let mut divider = FlatSeriesBatchDivider::default(); + + let batch1 = new_test_record_batch( + &[b"series1", b"series1"], + &[1000, 2000], + &[1, 2], + &[OpType::Put, OpType::Put], + &[10, 20], + ); + + let batch2 = new_test_record_batch( + &[b"series1", b"series2"], + &[3000, 4000], + &[3, 4], + &[OpType::Put, OpType::Put], + &[30, 40], + ); + + divider.push(batch1); + let series_batch = divider.push(batch2).unwrap(); + assert_eq!(series_batch.batches.len(), 2); + assert_eq!(series_batch.batches[0].num_rows(), 2); + assert_eq!(series_batch.batches[1].num_rows(), 1); + + assert_eq!(divider.buffer.batches.len(), 1); + assert_eq!(divider.buffer.batches[0].num_rows(), 1); + } + + #[test] + fn test_series_splitting() { + let mut divider = FlatSeriesBatchDivider::default(); + + let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]); + + let batch2 = new_test_record_batch( + &[b"series1", b"series2", b"series2", b"series3"], + &[2000, 3000, 4000, 5000], + &[2, 3, 4, 5], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[20, 30, 40, 50], + ); + + divider.push(batch1); + let series_batch = divider.push(batch2).unwrap(); + assert_eq!(series_batch.batches.len(), 2); + + let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + + let final_batch = divider.finish().unwrap(); + assert_eq!(final_batch.batches.len(), 1); + assert_eq!(final_batch.batches[0].num_rows(), 3); + } +} diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index f149b8f6e7..086a92cb16 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -23,10 +23,10 @@ use common_recordbatch::{DfRecordBatch, RecordBatch}; use datatypes::compute; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use crate::cache::CacheStrategy; -use crate::error::{Result, UnexpectedSnafu}; +use crate::error::Result; use crate::read::Batch; use crate::read::projection::ProjectionMapper; use crate::read::scan_util::PartitionMetrics; @@ -67,17 +67,11 @@ impl ConvertBatchStream { } fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result { - let mapper = self - .projection_mapper - .as_primary_key() - .context(UnexpectedSnafu { - reason: "Unexpected format", - }) - .map_err(|e| BoxedError::new(e) as _) - .context(ExternalSnafu)?; - match batch { ScanBatch::Normal(batch) => { + // Safety: Only primary key format returns this batch. + let mapper = self.projection_mapper.as_primary_key().unwrap(); + if batch.is_empty() { Ok(mapper.empty_record_batch()) } else { @@ -85,20 +79,39 @@ impl ConvertBatchStream { } } ScanBatch::Series(series) => { - self.buffer.clear(); - self.buffer.reserve(series.batches.len()); + match series { + SeriesBatch::PrimaryKey(primary_key_batch) => { + self.buffer.clear(); + self.buffer.reserve(primary_key_batch.batches.len()); + // Safety: Only primary key format returns this batch. + let mapper = self.projection_mapper.as_primary_key().unwrap(); - for batch in series.batches { - let record_batch = mapper.convert(&batch, &self.cache_strategy)?; - self.buffer.push(record_batch.into_df_record_batch()); - } + for batch in primary_key_batch.batches { + let record_batch = mapper.convert(&batch, &self.cache_strategy)?; + self.buffer.push(record_batch.into_df_record_batch()); + } - let output_schema = mapper.output_schema(); - let record_batch = - compute::concat_batches(output_schema.arrow_schema(), &self.buffer) + let output_schema = mapper.output_schema(); + let record_batch = + compute::concat_batches(output_schema.arrow_schema(), &self.buffer) + .context(ArrowComputeSnafu)?; + + RecordBatch::try_from_df_record_batch(output_schema, record_batch) + } + SeriesBatch::Flat(flat_batch) => { + // Safety: Only flat format returns this batch. + let mapper = self.projection_mapper.as_flat().unwrap(); + + let output_schema = mapper.output_schema(); + let record_batch = compute::concat_batches( + output_schema.arrow_schema(), + &flat_batch.batches, + ) .context(ArrowComputeSnafu)?; - RecordBatch::try_from_df_record_batch(output_schema, record_batch) + mapper.convert(&record_batch) + } + } } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch.