feat: support flat format for SeriesScan (#6938)

* feat: Support flat format for SeriesScan

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: simplify tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: only accumulate fetch time to scan_cost in SeriesDistributor of
the SeriesScan

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-09-11 14:12:53 +08:00
committed by GitHub
parent e75e5baa63
commit 5e65581f94
2 changed files with 513 additions and 36 deletions

View File

@@ -24,8 +24,10 @@ use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::arrow::array::BinaryArray;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use smallvec::{SmallVec, smallvec};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
@@ -43,9 +45,11 @@ use crate::error::{
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
use crate::read::seq_scan::{SeqScan, build_sources};
use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, ScannerMetrics};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::PrimaryKeyArray;
/// Timeout to send a batch to a sender.
const SEND_TIMEOUT: Duration = Duration::from_millis(10);
@@ -155,8 +159,8 @@ impl SeriesScan {
metrics.scan_cost += fetch_start.elapsed();
fetch_start = Instant::now();
metrics.num_batches += series.batches.len();
metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
metrics.num_batches += series.num_batches();
metrics.num_rows += series.num_rows();
let yield_start = Instant::now();
yield ScanBatch::Series(series);
@@ -377,11 +381,101 @@ struct SeriesDistributor {
impl SeriesDistributor {
/// Executes the distributor.
async fn execute(&mut self) {
if let Err(e) = self.scan_partitions().await {
let result = if self.stream_ctx.input.flat_format {
self.scan_partitions_flat().await
} else {
self.scan_partitions().await
};
if let Err(e) = result {
self.senders.send_error(e).await;
}
}
/// Scans all parts in flat format using FlatSeriesBatchDivider.
async fn scan_partitions_flat(&mut self) -> Result<()> {
let part_metrics = new_partition_metrics(
&self.stream_ctx,
false,
&self.metrics_set,
self.partitions.len(),
&self.metrics_list,
);
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
self.stream_ctx.input.num_memtables(),
self.stream_ctx.input.num_files(),
));
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());
for partition in &self.partitions {
sources.reserve(partition.len());
for part_range in partition {
build_flat_sources(
&self.stream_ctx,
part_range,
false,
&part_metrics,
range_builder_list.clone(),
&mut sources,
)
.await?;
}
}
// Builds a flat reader that merge sources from all parts.
let mut reader = SeqScan::build_flat_reader_from_sources(
&self.stream_ctx,
sources,
self.semaphore.clone(),
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
let mut fetch_start = Instant::now();
let mut divider = FlatSeriesBatchDivider::default();
while let Some(record_batch) = reader.try_next().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += record_batch.num_rows();
debug_assert!(record_batch.num_rows() > 0);
if record_batch.num_rows() == 0 {
fetch_start = Instant::now();
continue;
}
// Use divider to split series
if let Some(series_batch) = divider.push(record_batch) {
let yield_start = Instant::now();
self.senders
.send_batch(SeriesBatch::Flat(series_batch))
.await?;
metrics.yield_cost += yield_start.elapsed();
}
fetch_start = Instant::now();
}
// Send any remaining batch in the divider
if let Some(series_batch) = divider.finish() {
let yield_start = Instant::now();
self.senders
.send_batch(SeriesBatch::Flat(series_batch))
.await?;
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
metrics.num_series_send_timeout = self.senders.num_timeout;
metrics.num_series_send_full = self.senders.num_full;
part_metrics.set_distributor_metrics(&metrics);
part_metrics.on_finish();
Ok(())
}
/// Scans all parts.
async fn scan_partitions(&mut self) -> Result<()> {
let part_metrics = new_partition_metrics(
@@ -421,38 +515,46 @@ impl SeriesDistributor {
let mut metrics = SeriesDistributorMetrics::default();
let mut fetch_start = Instant::now();
let mut current_series = SeriesBatch::default();
let mut current_series = PrimaryKeySeriesBatch::default();
while let Some(batch) = reader.next_batch().await? {
metrics.scan_cost += fetch_start.elapsed();
fetch_start = Instant::now();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
fetch_start = Instant::now();
continue;
}
let Some(last_key) = current_series.current_key() else {
current_series.push(batch);
fetch_start = Instant::now();
continue;
};
if last_key == batch.primary_key() {
current_series.push(batch);
fetch_start = Instant::now();
continue;
}
// We find a new series, send the current one.
let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
let to_send =
std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
let yield_start = Instant::now();
self.senders.send_batch(to_send).await?;
self.senders
.send_batch(SeriesBatch::PrimaryKey(to_send))
.await?;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
if !current_series.is_empty() {
let yield_start = Instant::now();
self.senders.send_batch(current_series).await?;
self.senders
.send_batch(SeriesBatch::PrimaryKey(current_series))
.await?;
metrics.yield_cost += yield_start.elapsed();
}
@@ -467,14 +569,14 @@ impl SeriesDistributor {
}
}
/// Batches of the same series.
#[derive(Default)]
pub struct SeriesBatch {
/// Batches of the same series in primary key format.
#[derive(Default, Debug)]
pub struct PrimaryKeySeriesBatch {
pub batches: SmallVec<[Batch; 4]>,
}
impl SeriesBatch {
/// Creates a new [SeriesBatch] from a single [Batch].
impl PrimaryKeySeriesBatch {
/// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
fn single(batch: Batch) -> Self {
Self {
batches: smallvec![batch],
@@ -495,6 +597,39 @@ impl SeriesBatch {
}
}
/// Batches of the same series.
#[derive(Debug)]
pub enum SeriesBatch {
PrimaryKey(PrimaryKeySeriesBatch),
Flat(FlatSeriesBatch),
}
impl SeriesBatch {
/// Returns the number of batches.
pub fn num_batches(&self) -> usize {
match self {
SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
}
}
/// Returns the total number of rows across all batches.
pub fn num_rows(&self) -> usize {
match self {
SeriesBatch::PrimaryKey(primary_key_batch) => {
primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
}
SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
}
}
}
/// Batches of the same series in flat format.
#[derive(Default, Debug)]
pub struct FlatSeriesBatch {
pub batches: SmallVec<[RecordBatch; 4]>,
}
/// List of senders.
struct SenderList {
senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
@@ -627,3 +762,332 @@ fn new_partition_metrics(
metrics_list.set(partition, metrics.clone());
metrics
}
/// A divider to split flat record batches by time series.
///
/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
/// However, a [FlatSeriesBatch] may contain rows from multiple series.
#[derive(Default)]
struct FlatSeriesBatchDivider {
buffer: FlatSeriesBatch,
}
impl FlatSeriesBatchDivider {
/// Pushes a record batch into the divider.
///
/// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
// If buffer is empty
if self.buffer.batches.is_empty() {
self.buffer.batches.push(batch);
return None;
}
// Gets the primary key column from the incoming batch.
let pk_column_idx = primary_key_column_index(batch.num_columns());
let batch_pk_column = batch.column(pk_column_idx);
let batch_pk_array = batch_pk_column
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.unwrap();
let batch_pk_values = batch_pk_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
// Gets the last primary key of the incoming batch.
let batch_last_pk =
primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
// Gets the last primary key of the buffer.
// Safety: the buffer is not empty.
let buffer_last_batch = self.buffer.batches.last().unwrap();
let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
let buffer_pk_array = buffer_pk_column
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.unwrap();
let buffer_pk_values = buffer_pk_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let buffer_last_pk =
primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
// If last primary key in the batch is the same as last primary key in the buffer.
if batch_last_pk == buffer_last_pk {
self.buffer.batches.push(batch);
return None;
}
// Otherwise, the batch must have a different primary key, we find the first offset of the
// changed primary key.
let batch_pk_keys = batch_pk_array.keys();
let pk_indices = batch_pk_keys.values();
let mut change_offset = 0;
for (i, &key) in pk_indices.iter().enumerate() {
let batch_pk = batch_pk_values.value(key as usize);
if buffer_last_pk != batch_pk {
change_offset = i;
break;
}
}
// Splits the batch at the change offset
let (first_part, remaining_part) = if change_offset > 0 {
let first_part = batch.slice(0, change_offset);
let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
(Some(first_part), Some(remaining_part))
} else {
(None, Some(batch))
};
// Creates the result from current buffer + first part of new batch
let mut result = std::mem::take(&mut self.buffer);
if let Some(first_part) = first_part {
result.batches.push(first_part);
}
// Pushes remaining part to the buffer if it exists
if let Some(remaining_part) = remaining_part {
self.buffer.batches.push(remaining_part);
}
Some(result)
}
/// Returns the final [FlatSeriesBatch].
fn finish(&mut self) -> Option<FlatSeriesBatch> {
if self.buffer.batches.is_empty() {
None
} else {
Some(std::mem::take(&mut self.buffer))
}
}
}
/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
fn primary_key_at<'a>(
primary_key: &PrimaryKeyArray,
primary_key_values: &'a BinaryArray,
index: usize,
) -> &'a [u8] {
let key = primary_key.keys().value(index);
primary_key_values.value(key as usize)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{
ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
TimestampMillisecondArray, UInt8Array, UInt64Array,
};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use super::*;
fn new_test_record_batch(
primary_keys: &[&[u8]],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
fields: &[u64],
) -> RecordBatch {
let num_rows = timestamps.len();
debug_assert_eq!(sequences.len(), num_rows);
debug_assert_eq!(op_types.len(), num_rows);
debug_assert_eq!(fields.len(), num_rows);
debug_assert_eq!(primary_keys.len(), num_rows);
let columns: Vec<ArrayRef> = vec![
build_test_pk_string_dict_array(primary_keys),
Arc::new(Int64Array::from_iter(
fields.iter().map(|v| Some(*v as i64)),
)),
Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)),
build_test_pk_array(primary_keys),
Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)),
];
RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
}
fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
for &pk in primary_keys {
let pk_str = std::str::from_utf8(pk).unwrap();
builder.append(pk_str).unwrap();
}
Arc::new(builder.finish())
}
fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for &pk in primary_keys {
builder.append(pk).unwrap();
}
Arc::new(builder.finish())
}
fn build_test_flat_schema() -> SchemaRef {
let fields = vec![
Field::new(
"k0",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
false,
),
Field::new("field0", DataType::Int64, true),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
false,
),
Field::new("__sequence", DataType::UInt64, false),
Field::new("__op_type", DataType::UInt8, false),
];
Arc::new(Schema::new(fields))
}
#[test]
fn test_empty_buffer_first_push() {
let mut divider = FlatSeriesBatchDivider::default();
let result = divider.finish();
assert!(result.is_none());
let mut divider = FlatSeriesBatchDivider::default();
let batch = new_test_record_batch(
&[b"series1", b"series1"],
&[1000, 2000],
&[1, 2],
&[OpType::Put, OpType::Put],
&[10, 20],
);
let result = divider.push(batch);
assert!(result.is_none());
assert_eq!(divider.buffer.batches.len(), 1);
}
#[test]
fn test_same_series_accumulation() {
let mut divider = FlatSeriesBatchDivider::default();
let batch1 = new_test_record_batch(
&[b"series1", b"series1"],
&[1000, 2000],
&[1, 2],
&[OpType::Put, OpType::Put],
&[10, 20],
);
let batch2 = new_test_record_batch(
&[b"series1", b"series1"],
&[3000, 4000],
&[3, 4],
&[OpType::Put, OpType::Put],
&[30, 40],
);
divider.push(batch1);
let result = divider.push(batch2);
assert!(result.is_none());
let series_batch = divider.finish().unwrap();
assert_eq!(series_batch.batches.len(), 2);
}
#[test]
fn test_series_boundary_detection() {
let mut divider = FlatSeriesBatchDivider::default();
let batch1 = new_test_record_batch(
&[b"series1", b"series1"],
&[1000, 2000],
&[1, 2],
&[OpType::Put, OpType::Put],
&[10, 20],
);
let batch2 = new_test_record_batch(
&[b"series2", b"series2"],
&[3000, 4000],
&[3, 4],
&[OpType::Put, OpType::Put],
&[30, 40],
);
divider.push(batch1);
let series_batch = divider.push(batch2).unwrap();
assert_eq!(series_batch.batches.len(), 1);
assert_eq!(divider.buffer.batches.len(), 1);
}
#[test]
fn test_series_boundary_within_batch() {
let mut divider = FlatSeriesBatchDivider::default();
let batch1 = new_test_record_batch(
&[b"series1", b"series1"],
&[1000, 2000],
&[1, 2],
&[OpType::Put, OpType::Put],
&[10, 20],
);
let batch2 = new_test_record_batch(
&[b"series1", b"series2"],
&[3000, 4000],
&[3, 4],
&[OpType::Put, OpType::Put],
&[30, 40],
);
divider.push(batch1);
let series_batch = divider.push(batch2).unwrap();
assert_eq!(series_batch.batches.len(), 2);
assert_eq!(series_batch.batches[0].num_rows(), 2);
assert_eq!(series_batch.batches[1].num_rows(), 1);
assert_eq!(divider.buffer.batches.len(), 1);
assert_eq!(divider.buffer.batches[0].num_rows(), 1);
}
#[test]
fn test_series_splitting() {
let mut divider = FlatSeriesBatchDivider::default();
let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
let batch2 = new_test_record_batch(
&[b"series1", b"series2", b"series2", b"series3"],
&[2000, 3000, 4000, 5000],
&[2, 3, 4, 5],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[20, 30, 40, 50],
);
divider.push(batch1);
let series_batch = divider.push(batch2).unwrap();
assert_eq!(series_batch.batches.len(), 2);
let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
let final_batch = divider.finish().unwrap();
assert_eq!(final_batch.batches.len(), 1);
assert_eq!(final_batch.batches[0].num_rows(), 3);
}
}

View File

@@ -23,10 +23,10 @@ use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::compute;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use crate::cache::CacheStrategy;
use crate::error::{Result, UnexpectedSnafu};
use crate::error::Result;
use crate::read::Batch;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_util::PartitionMetrics;
@@ -67,17 +67,11 @@ impl ConvertBatchStream {
}
fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
let mapper = self
.projection_mapper
.as_primary_key()
.context(UnexpectedSnafu {
reason: "Unexpected format",
})
.map_err(|e| BoxedError::new(e) as _)
.context(ExternalSnafu)?;
match batch {
ScanBatch::Normal(batch) => {
// Safety: Only primary key format returns this batch.
let mapper = self.projection_mapper.as_primary_key().unwrap();
if batch.is_empty() {
Ok(mapper.empty_record_batch())
} else {
@@ -85,20 +79,39 @@ impl ConvertBatchStream {
}
}
ScanBatch::Series(series) => {
self.buffer.clear();
self.buffer.reserve(series.batches.len());
match series {
SeriesBatch::PrimaryKey(primary_key_batch) => {
self.buffer.clear();
self.buffer.reserve(primary_key_batch.batches.len());
// Safety: Only primary key format returns this batch.
let mapper = self.projection_mapper.as_primary_key().unwrap();
for batch in series.batches {
let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
self.buffer.push(record_batch.into_df_record_batch());
}
for batch in primary_key_batch.batches {
let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
self.buffer.push(record_batch.into_df_record_batch());
}
let output_schema = mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
let output_schema = mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;
RecordBatch::try_from_df_record_batch(output_schema, record_batch)
}
SeriesBatch::Flat(flat_batch) => {
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
let output_schema = mapper.output_schema();
let record_batch = compute::concat_batches(
output_schema.arrow_schema(),
&flat_batch.batches,
)
.context(ArrowComputeSnafu)?;
RecordBatch::try_from_df_record_batch(output_schema, record_batch)
mapper.convert(&record_batch)
}
}
}
ScanBatch::RecordBatch(df_record_batch) => {
// Safety: Only flat format returns this batch.