feat: split batches before merge (#7225)

* feat: split batches by rule in build_flat_sources()

It checks the num_series and splits batches when the series cardinality
is low

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: panic when no num_series available

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: don't subtract file index if checking mem range

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments and control flow

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-11-18 16:19:39 +08:00
committed by GitHub
parent 605f3270e5
commit ee35ec0a39
2 changed files with 171 additions and 4 deletions

View File

@@ -14,13 +14,17 @@
//! Utilities for scanners.
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use async_stream::try_stream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::timestamp::timestamp_array_to_primitive;
use futures::Stream;
use prometheus::IntGauge;
use smallvec::SmallVec;
@@ -33,11 +37,13 @@ use crate::metrics::{
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
};
use crate::read::range::{RangeBuilderList, RowGroupIndex};
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
/// Verbose scan metrics for a partition.
@@ -697,6 +703,71 @@ pub(crate) fn scan_flat_mem_ranges(
}
}
/// Files with row count greater than this threshold can contribute to the estimation.
const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
/// Number of series threshold for splitting batches.
const NUM_SERIES_THRESHOLD: u64 = 10240;
/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
/// 60 samples per hour.
const BATCH_SIZE_THRESHOLD: u64 = 50;
/// Returns true if splitting flat record batches may improve merge performance.
pub(crate) fn should_split_flat_batches_for_merge(
stream_ctx: &Arc<StreamContext>,
range_meta: &RangeMeta,
) -> bool {
// Number of files to split and scan.
let mut num_files_to_split = 0;
let mut num_mem_rows = 0;
let mut num_mem_series = 0;
// Checks each file range, returns early if any range is not splittable.
// For mem ranges, we collect the total number of rows and series because the number of rows in a
// mem range may be too small.
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let memtable = &stream_ctx.input.memtables[index.index];
// Is mem range
let stats = memtable.stats();
num_mem_rows += stats.num_rows();
num_mem_series += stats.series_count();
} else if stream_ctx.is_file_range_index(*index) {
// This is a file range.
let file_index = index.index - stream_ctx.input.num_memtables();
let file = &stream_ctx.input.files[file_index];
if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
// If the file doesn't have enough rows, or the number of series is unavailable, skips it.
continue;
}
debug_assert!(file.meta_ref().num_rows > 0);
if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
// We can't split batches in a file.
return false;
} else {
num_files_to_split += 1;
}
}
// Skips non-file and non-mem ranges.
}
if num_files_to_split > 0 {
// We mainly consider file ranges because they have enough data for sampling.
true
} else if num_mem_series > 0 && num_mem_rows > 0 {
// If we don't have files to scan, we check whether to split by the memtable.
can_split_series(num_mem_rows as u64, num_mem_series as u64)
} else {
false
}
}
fn can_split_series(num_rows: u64, num_series: u64) -> bool {
assert!(num_series > 0);
assert!(num_rows > 0);
// It doesn't have too many series or it will have enough rows for each batch.
num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
}
/// Scans file ranges at `index`.
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
@@ -876,3 +947,83 @@ pub(crate) async fn maybe_scan_flat_other_ranges(
}
.fail()
}
/// A stream wrapper that splits record batches from an inner stream.
pub(crate) struct SplitRecordBatchStream<S> {
/// The inner stream that yields record batches.
inner: S,
/// Buffer for split batches.
batches: VecDeque<RecordBatch>,
}
impl<S> SplitRecordBatchStream<S> {
/// Creates a new splitting stream wrapper.
pub(crate) fn new(inner: S) -> Self {
Self {
inner,
batches: VecDeque::new(),
}
}
}
impl<S> Stream for SplitRecordBatchStream<S>
where
S: Stream<Item = Result<RecordBatch>> + Unpin,
{
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// First, check if we have buffered split batches
if let Some(batch) = self.batches.pop_front() {
return Poll::Ready(Some(Ok(batch)));
}
// Poll the inner stream for the next batch
let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(batch)) => batch,
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
};
// Split the batch and buffer the results
split_record_batch(record_batch, &mut self.batches);
// Continue the loop to return the first split batch
}
}
}
/// Splits the batch by timestamps.
///
/// # Panics
/// Panics if the timestamp array is invalid.
pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
let batch_rows = record_batch.num_rows();
if batch_rows == 0 {
return;
}
if batch_rows < 2 {
batches.push_back(record_batch);
return;
}
let time_index_pos = time_index_column_index(record_batch.num_columns());
let timestamps = record_batch.column(time_index_pos);
let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
let mut offsets = Vec::with_capacity(16);
offsets.push(0);
let values = ts_values.values();
for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
if value > values[i + 1] {
offsets.push(i + 1);
}
}
offsets.push(values.len());
// Splits the batch by offsets.
for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
let end = offsets[i + 1];
let rows_in_batch = end - start;
batches.push_back(record_batch.slice(start, rows_in_batch));
}
}

View File

@@ -44,8 +44,9 @@ use crate::read::merge::MergeReaderBuilder;
use crate::read::range::{RangeBuilderList, RangeMeta};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
scan_flat_mem_ranges, scan_mem_ranges,
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
should_split_flat_batches_for_merge,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{
@@ -818,6 +819,7 @@ pub(crate) async fn build_flat_sources(
return Ok(());
}
let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
sources.reserve(num_indices);
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
@@ -874,8 +876,22 @@ pub(crate) async fn build_flat_sources(
}
for stream in ordered_sources.into_iter().flatten() {
sources.push(stream);
if should_split {
sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
} else {
sources.push(stream);
}
}
if should_split {
common_telemetry::debug!(
"Splitting record batches, region: {}, sources: {}, part_range: {:?}",
stream_ctx.input.region_metadata().region_id,
sources.len(),
part_range,
);
}
Ok(())
}