diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 674a4fab4b..de8875c4f6 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -14,13 +14,17 @@ //! Utilities for scanners. +use std::collections::VecDeque; use std::fmt; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use async_stream::try_stream; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::timestamp::timestamp_array_to_primitive; use futures::Stream; use prometheus::IntGauge; use smallvec::SmallVec; @@ -33,11 +37,13 @@ use crate::metrics::{ IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED, }; -use crate::read::range::{RangeBuilderList, RowGroupIndex}; +use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; +use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; /// Verbose scan metrics for a partition. @@ -697,6 +703,71 @@ pub(crate) fn scan_flat_mem_ranges( } } +/// Files with row count greater than this threshold can contribute to the estimation. +const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64; +/// Number of series threshold for splitting batches. +const NUM_SERIES_THRESHOLD: u64 = 10240; +/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have +/// 60 samples per hour. +const BATCH_SIZE_THRESHOLD: u64 = 50; + +/// Returns true if splitting flat record batches may improve merge performance. +pub(crate) fn should_split_flat_batches_for_merge( + stream_ctx: &Arc, + range_meta: &RangeMeta, +) -> bool { + // Number of files to split and scan. + let mut num_files_to_split = 0; + let mut num_mem_rows = 0; + let mut num_mem_series = 0; + // Checks each file range, returns early if any range is not splittable. + // For mem ranges, we collect the total number of rows and series because the number of rows in a + // mem range may be too small. + for index in &range_meta.row_group_indices { + if stream_ctx.is_mem_range_index(*index) { + let memtable = &stream_ctx.input.memtables[index.index]; + // Is mem range + let stats = memtable.stats(); + num_mem_rows += stats.num_rows(); + num_mem_series += stats.series_count(); + } else if stream_ctx.is_file_range_index(*index) { + // This is a file range. + let file_index = index.index - stream_ctx.input.num_memtables(); + let file = &stream_ctx.input.files[file_index]; + if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 { + // If the file doesn't have enough rows, or the number of series is unavailable, skips it. + continue; + } + debug_assert!(file.meta_ref().num_rows > 0); + if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) { + // We can't split batches in a file. + return false; + } else { + num_files_to_split += 1; + } + } + // Skips non-file and non-mem ranges. + } + + if num_files_to_split > 0 { + // We mainly consider file ranges because they have enough data for sampling. + true + } else if num_mem_series > 0 && num_mem_rows > 0 { + // If we don't have files to scan, we check whether to split by the memtable. + can_split_series(num_mem_rows as u64, num_mem_series as u64) + } else { + false + } +} + +fn can_split_series(num_rows: u64, num_series: u64) -> bool { + assert!(num_series > 0); + assert!(num_rows > 0); + + // It doesn't have too many series or it will have enough rows for each batch. + num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD +} + /// Scans file ranges at `index`. pub(crate) async fn scan_file_ranges( stream_ctx: Arc, @@ -876,3 +947,83 @@ pub(crate) async fn maybe_scan_flat_other_ranges( } .fail() } + +/// A stream wrapper that splits record batches from an inner stream. +pub(crate) struct SplitRecordBatchStream { + /// The inner stream that yields record batches. + inner: S, + /// Buffer for split batches. + batches: VecDeque, +} + +impl SplitRecordBatchStream { + /// Creates a new splitting stream wrapper. + pub(crate) fn new(inner: S) -> Self { + Self { + inner, + batches: VecDeque::new(), + } + } +} + +impl Stream for SplitRecordBatchStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + // First, check if we have buffered split batches + if let Some(batch) = self.batches.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + + // Poll the inner stream for the next batch + let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) { + Some(Ok(batch)) => batch, + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => return Poll::Ready(None), + }; + + // Split the batch and buffer the results + split_record_batch(record_batch, &mut self.batches); + // Continue the loop to return the first split batch + } + } +} + +/// Splits the batch by timestamps. +/// +/// # Panics +/// Panics if the timestamp array is invalid. +pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque) { + let batch_rows = record_batch.num_rows(); + if batch_rows == 0 { + return; + } + if batch_rows < 2 { + batches.push_back(record_batch); + return; + } + + let time_index_pos = time_index_column_index(record_batch.num_columns()); + let timestamps = record_batch.column(time_index_pos); + let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap(); + let mut offsets = Vec::with_capacity(16); + offsets.push(0); + let values = ts_values.values(); + for (i, &value) in values.iter().take(batch_rows - 1).enumerate() { + if value > values[i + 1] { + offsets.push(i + 1); + } + } + offsets.push(values.len()); + + // Splits the batch by offsets. + for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() { + let end = offsets[i + 1]; + let rows_in_batch = end - start; + batches.push_back(record_batch.slice(start, rows_in_batch)); + } +} diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index aad883c51a..99073f2cf7 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -44,8 +44,9 @@ use crate::read::merge::MergeReaderBuilder; use crate::read::range::{RangeBuilderList, RangeMeta}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ - PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges, - scan_flat_mem_ranges, scan_mem_ranges, + PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges, + scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges, + should_split_flat_batches_for_merge, }; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; use crate::read::{ @@ -818,6 +819,7 @@ pub(crate) async fn build_flat_sources( return Ok(()); } + let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta); sources.reserve(num_indices); let mut ordered_sources = Vec::with_capacity(num_indices); ordered_sources.resize_with(num_indices, || None); @@ -874,8 +876,22 @@ pub(crate) async fn build_flat_sources( } for stream in ordered_sources.into_iter().flatten() { - sources.push(stream); + if should_split { + sources.push(Box::pin(SplitRecordBatchStream::new(stream))); + } else { + sources.push(stream); + } } + + if should_split { + common_telemetry::debug!( + "Splitting record batches, region: {}, sources: {}, part_range: {:?}", + stream_ctx.input.region_metadata().region_id, + sources.len(), + part_range, + ); + } + Ok(()) }