feat: flat read path support primary_key format memtables (#7759)

* feat: add adapter for batch to flat recordbatch

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support batch to flat record batch in MemtableRange

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: address review issues for BatchToRecordBatchAdapter

- Extract duplicated read_column_ids computation into a shared
  `read_column_ids_from_projection` helper function
- Cache `FormatProjection` in `BatchToRecordBatchContext::new()` instead
  of recomputing it on every `adapt_iter()` call
- Remove unnecessary `Arc` wrapping of `read_column_ids` in
  `SimpleBulkMemtable::ranges()`
- Fix clippy `filter_map_bool_then` warning in `batch_adapter.rs`

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: simplify comments

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): use read column ids in batch adapter

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test build_record_batch_iter

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test build_record_batch_iter for all old memtables

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: prune time range before adapter

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: share BatchToRecordBatchContext in simple_bulk_memtable.rs

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: use ScalarValue::to_array_of_size to build repeated value array

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-03-10 20:46:39 +08:00
committed by GitHub
parent 58528d1334
commit 04cd2c8a05
13 changed files with 1087 additions and 29 deletions

View File

@@ -193,7 +193,7 @@ fn filter_1_host(c: &mut Criterion) {
.unwrap();
for (_range_id, range) in ranges.ranges.iter() {
let iter = range.build_record_batch_iter(None).unwrap();
let iter = range.build_record_batch_iter(None, None).unwrap();
for batch in iter {
let _batch = batch.unwrap();
}

View File

@@ -783,7 +783,7 @@ fn memtable_flat_sources(
if let Some(encoded) = only_range.encoded() {
flat_sources.encoded.push((encoded, max_sequence));
} else {
let iter = only_range.build_record_batch_iter(None)?;
let iter = only_range.build_record_batch_iter(None, None)?;
// Dedup according to append mode and merge mode.
// Even single range may have duplicate rows.
let iter = maybe_dedup_one(
@@ -822,7 +822,7 @@ fn memtable_flat_sources(
continue;
}
let iter = range.build_record_batch_iter(None)?;
let iter = range.build_record_batch_iter(None, None)?;
input_iters.push(iter);
let range_rows = range.num_rows();
last_iter_rows += range_rows;

View File

@@ -26,6 +26,7 @@ use common_time::Timestamp;
use datatypes::arrow::record_batch::RecordBatch;
use mito_codec::key_values::KeyValue;
pub use mito_codec::key_values::KeyValues;
use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
@@ -38,6 +39,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
use crate::read::batch_adapter::BatchToRecordBatchAdapter;
use crate::read::prune::PruneTimeIterator;
use crate::read::scan_region::PredicateGroup;
use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
@@ -560,6 +562,57 @@ pub trait IterBuilder: Send + Sync {
pub type BoxedIterBuilder = Box<dyn IterBuilder>;
/// Computes the column IDs to read based on the projection.
///
/// If `projection` is `Some`, returns those column IDs. If `None`, returns all column IDs
/// from the metadata.
pub fn read_column_ids_from_projection(
metadata: &RegionMetadataRef,
projection: Option<&[ColumnId]>,
) -> Vec<ColumnId> {
if let Some(projection) = projection {
projection.to_vec()
} else {
metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect()
}
}
/// Context to adapt batch iterators to record batch iterators for flat scan.
pub struct BatchToRecordBatchContext {
metadata: RegionMetadataRef,
codec: Arc<dyn PrimaryKeyCodec>,
read_column_ids: Vec<ColumnId>,
}
impl BatchToRecordBatchContext {
/// Creates a new context for adapting batch iterators.
pub fn new(metadata: RegionMetadataRef, mut read_column_ids: Vec<ColumnId>) -> Self {
if read_column_ids.is_empty() {
read_column_ids.push(metadata.time_index_column().column_id);
}
let codec = build_primary_key_codec(&metadata);
Self {
metadata,
codec,
read_column_ids,
}
}
fn adapt_iter(&self, iter: BoxedBatchIterator) -> BoxedRecordBatchIterator {
Box::new(BatchToRecordBatchAdapter::new(
iter,
self.metadata.clone(),
self.codec.clone(),
&self.read_column_ids,
))
}
}
/// Context shared by ranges of the same memtable.
pub struct MemtableRangeContext {
/// Id of the memtable.
@@ -568,6 +621,8 @@ pub struct MemtableRangeContext {
builder: BoxedIterBuilder,
/// All filters.
predicate: PredicateGroup,
/// Optional context to adapt batch iterators for flat scans.
batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
}
pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
@@ -575,10 +630,21 @@ pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
impl MemtableRangeContext {
/// Creates a new [MemtableRangeContext].
pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
Self::new_with_batch_to_record_batch(id, builder, predicate, None)
}
/// Creates a new [MemtableRangeContext] with optional adapter context.
pub fn new_with_batch_to_record_batch(
id: MemtableId,
builder: BoxedIterBuilder,
predicate: PredicateGroup,
batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
) -> Self {
Self {
id,
builder,
predicate,
batch_to_record_batch,
}
}
}
@@ -630,15 +696,34 @@ impl MemtableRange {
self.context.builder.build(None)
}
/// Builds a record batch iterator to read all rows in range.
/// Builds a record batch iterator to read rows in range.
///
/// This method doesn't take the optional time range because a bulk part is immutable
/// so we don't need to filter rows out of the time range.
/// For mutable memtables (adapter path), applies time-range pruning to ensure rows
/// outside the time range are filtered, matching the behavior of `build_prune_iter`.
pub fn build_record_batch_iter(
&self,
time_range: Option<FileTimeRange>,
metrics: Option<MemScanMetrics>,
) -> Result<BoxedRecordBatchIterator> {
self.context.builder.build_record_batch(metrics)
if self.context.builder.is_record_batch() {
return self.context.builder.build_record_batch(metrics);
}
if let Some(context) = self.context.batch_to_record_batch.as_ref() {
let iter = self.context.builder.build(metrics)?;
let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
let time_filters = self.context.predicate.time_filters();
Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
} else {
iter
};
return Ok(context.adapt_iter(iter));
}
UnsupportedOperationSnafu {
err_msg: "Record batch iterator is not supported by this memtable",
}
.fail()
}
/// Returns whether the iterator is a record batch iterator.
@@ -658,6 +743,8 @@ impl MemtableRange {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use super::*;

View File

@@ -1485,7 +1485,7 @@ mod tests {
assert!(range.num_rows() > 0);
assert!(range.is_record_batch());
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
let mut total_rows = 0;
for batch_result in record_batch_iter {
@@ -1535,7 +1535,7 @@ mod tests {
let range = ranges.ranges.get(&0).unwrap();
assert!(range.is_record_batch());
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
let mut total_rows = 0;
for batch_result in record_batch_iter {
@@ -1731,7 +1731,7 @@ mod tests {
assert_eq!(1, ranges.ranges.len());
let range = ranges.ranges.get(&0).unwrap();
let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
assert!(record_batch_iter.next().is_none());
}
@@ -1786,7 +1786,7 @@ mod tests {
assert!(range.num_rows() > 0);
assert!(range.is_record_batch());
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
let mut total_rows = 0;
for batch_result in record_batch_iter {
let batch = batch_result.unwrap();
@@ -1870,7 +1870,7 @@ mod tests {
let mut total_rows_read = 0;
for (_range_id, range) in ranges.ranges.iter() {
assert!(range.is_record_batch());
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
for batch_result in record_batch_iter {
let batch = batch_result.unwrap();
@@ -1957,7 +1957,7 @@ mod tests {
let mut total_rows_read = 0;
for (_range_id, range) in ranges.ranges.iter() {
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
for batch_result in record_batch_iter {
let batch = batch_result.unwrap();
total_rows_read += batch.num_rows();
@@ -2016,7 +2016,7 @@ mod tests {
// Verify data is sorted correctly in the range
let range = ranges.ranges.get(&0).unwrap();
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
let mut total_rows = 0;
for batch_result in record_batch_iter {
@@ -2211,7 +2211,7 @@ mod tests {
let mut total_rows_read = 0;
for (_range_id, range) in ranges.ranges.iter() {
assert!(range.is_record_batch());
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
for batch_result in record_batch_iter {
let batch = batch_result.unwrap();

View File

@@ -42,9 +42,9 @@ use crate::memtable::bulk::part::BulkPart;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, RangesOptions,
AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext,
MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
};
use crate::region::options::MergeMode;
@@ -194,6 +194,7 @@ impl Memtable for PartitionTreeMemtable {
) -> Result<MemtableRanges> {
let predicate = options.predicate;
let sequence = options.sequence;
let read_column_ids = read_column_ids_from_projection(&self.tree.metadata, projection);
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
@@ -201,7 +202,16 @@ impl Memtable for PartitionTreeMemtable {
predicate: predicate.predicate().cloned(),
sequence,
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
let adapter_context = Arc::new(BatchToRecordBatchContext::new(
self.tree.metadata.clone(),
read_column_ids,
));
let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
self.id,
builder,
predicate,
Some(adapter_context),
));
let range_stats = self.stats();
let range = MemtableRange::new(context, range_stats);
@@ -933,4 +943,114 @@ mod tests {
.collect::<HashMap<_, _>>();
assert_eq!(kvs, expected);
}
#[test]
fn test_build_record_batch_iter_from_memtable() {
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &[1, 2, 3], 0);
memtable.write(&kvs).unwrap();
let read_column_ids: Vec<ColumnId> = metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let ranges = memtable
.ranges(Some(&read_column_ids), RangesOptions::default())
.unwrap();
assert!(!ranges.ranges.is_empty());
let mut total_rows = 0;
for range in ranges.ranges.into_values() {
let mut iter = range.build_record_batch_iter(None, None).unwrap();
while let Some(rb) = iter.next().transpose().unwrap() {
total_rows += rb.num_rows();
let schema = rb.schema();
let column_names: Vec<_> =
schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
column_names,
vec![
"__table_id",
"k0",
"v0",
"v1",
"ts",
"__primary_key",
"__sequence",
"__op_type",
]
);
}
}
assert_eq!(3, total_rows);
}
#[test]
fn test_build_record_batch_iter_with_time_range() {
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
42,
&[1, 2, 3, 4, 5],
0,
);
memtable.write(&kvs).unwrap();
let read_column_ids: Vec<ColumnId> = metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let ranges = memtable
.ranges(Some(&read_column_ids), RangesOptions::default())
.unwrap();
assert!(!ranges.ranges.is_empty());
let time_range = (Timestamp::new_millisecond(2), Timestamp::new_millisecond(4));
let mut total_rows = 0;
let mut all_timestamps = Vec::new();
for range in ranges.ranges.into_values() {
let mut iter = range
.build_record_batch_iter(Some(time_range), None)
.unwrap();
while let Some(rb) = iter.next().transpose().unwrap() {
total_rows += rb.num_rows();
// ts column is at index 4 (after __table_id, k0, v0, v1)
let ts_col = rb
.column_by_name("ts")
.unwrap()
.as_any()
.downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
.unwrap();
for i in 0..ts_col.len() {
all_timestamps.push(ts_col.value(i));
}
}
}
assert_eq!(3, total_rows);
all_timestamps.sort();
assert_eq!(vec![2, 3, 4], all_timestamps);
}
}

View File

@@ -34,8 +34,9 @@ use crate::memtable::bulk::part::BulkPart;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::Series;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
MemScanMetrics, Memtable, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges,
MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
};
use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
use crate::read::Batch;
@@ -236,6 +237,7 @@ impl Memtable for SimpleBulkMemtable {
let predicate = options.predicate;
let sequence = options.sequence;
let start_time = Instant::now();
let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
let projection = Arc::new(self.build_projection(projection));
// Use the memtable's overall time range and max sequence for all ranges
@@ -255,6 +257,11 @@ impl Memtable for SimpleBulkMemtable {
};
let values = self.series.read().unwrap().read_to_values();
let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new(
self.region_metadata.clone(),
read_column_ids.clone(),
));
let contexts = values
.into_par_iter()
.filter_map(|v| {
@@ -298,10 +305,11 @@ impl Memtable for SimpleBulkMemtable {
};
(
range_stats,
Arc::new(MemtableRangeContext::new(
Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
self.id,
Box::new(builder),
predicate.clone(),
Some(batch_to_record_batch.clone()),
)),
)
})
@@ -941,4 +949,44 @@ mod tests {
}
assert_eq!(rows, 2);
}
#[test]
fn test_build_record_batch_iter_from_memtable() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
let kvs = build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string()), (2, 2.0, "b".to_string())],
OpType::Put,
);
memtable.write(&kvs).unwrap();
let read_column_ids: Vec<ColumnId> = memtable
.region_metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let ranges = memtable
.ranges(Some(&read_column_ids), RangesOptions::default())
.unwrap();
assert!(!ranges.ranges.is_empty());
let mut total_rows = 0;
for range in ranges.ranges.into_values() {
let mut iter = range.build_record_batch_iter(None, None).unwrap();
while let Some(rb) = iter.next().transpose().unwrap() {
total_rows += rb.num_rows();
let schema = rb.schema();
let column_names: Vec<_> =
schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
column_names,
vec!["f1", "f2", "ts", "__primary_key", "__sequence", "__op_type"]
);
}
}
assert_eq!(2, total_rows);
}
}

View File

@@ -51,9 +51,9 @@ use crate::memtable::bulk::part::BulkPart;
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, RangesOptions,
AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext,
MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
};
use crate::metrics::{
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
@@ -307,6 +307,7 @@ impl Memtable for TimeSeriesMemtable {
) -> Result<MemtableRanges> {
let predicate = options.predicate;
let sequence = options.sequence;
let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
@@ -323,7 +324,16 @@ impl Memtable for TimeSeriesMemtable {
merge_mode: self.merge_mode,
sequence,
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
let adapter_context = Arc::new(BatchToRecordBatchContext::new(
self.region_metadata.clone(),
read_column_ids,
));
let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
self.id,
builder,
predicate,
Some(adapter_context),
));
let range_stats = self.stats();
let range = MemtableRange::new(context, range_stats);
@@ -1938,4 +1948,89 @@ mod tests {
assert_eq!(total_series, series_count);
assert_eq!(total_series * rows_per_series, row_count);
}
#[test]
fn test_build_record_batch_iter_from_memtable() {
let schema = schema_for_test();
let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
memtable.write(&kvs).unwrap();
let read_column_ids: Vec<ColumnId> = schema
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let ranges = memtable
.ranges(Some(&read_column_ids), RangesOptions::default())
.unwrap();
assert_eq!(1, ranges.ranges.len());
let range = ranges.ranges.into_values().next().unwrap();
let mut iter = range.build_record_batch_iter(None, None).unwrap();
let rb = iter.next().transpose().unwrap().unwrap();
assert_eq!(10, rb.num_rows());
// k0, k1 (pk columns), v0, v1 (field columns), ts, __primary_key, __sequence, __op_type
let schema = rb.schema();
let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
column_names,
vec![
"k0",
"k1",
"v0",
"v1",
"ts",
"__primary_key",
"__sequence",
"__op_type",
]
);
assert!(iter.next().is_none());
}
#[test]
fn test_build_record_batch_iter_with_time_range() {
let schema = schema_for_test();
let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
memtable.write(&kvs).unwrap();
let read_column_ids: Vec<ColumnId> = schema
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let ranges = memtable
.ranges(Some(&read_column_ids), RangesOptions::default())
.unwrap();
assert_eq!(1, ranges.ranges.len());
let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7));
let range = ranges.ranges.into_values().next().unwrap();
let mut iter = range
.build_record_batch_iter(Some(time_range), None)
.unwrap();
let mut total_rows = 0;
let mut all_timestamps = Vec::new();
while let Some(rb) = iter.next().transpose().unwrap() {
total_rows += rb.num_rows();
let ts_col = rb
.column_by_name("ts")
.unwrap()
.as_any()
.downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
.unwrap();
for i in 0..ts_col.len() {
all_timestamps.push(ts_col.value(i));
}
}
assert_eq!(5, total_rows);
all_timestamps.sort();
assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps);
}
}

View File

@@ -14,6 +14,7 @@
//! Common structs and utilities for reading data.
pub mod batch_adapter;
pub mod compat;
pub mod dedup;
pub mod flat_dedup;

View File

@@ -0,0 +1,700 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Adapter to convert [`BoxedBatchIterator`] (primary key format) into an iterator
//! of flat-format Arrow [`RecordBatch`]es, allowing memtable iterators that only
//! produce [`Batch`] to feed into the flat read pipeline.
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array};
use datatypes::arrow::datatypes::{Field, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::{ConcreteDataType, DataType, Vector};
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error::{
DataTypeMismatchSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::read::Batch;
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
/// Adapts a [`BoxedBatchIterator`] into an `Iterator<Item = Result<RecordBatch>>`
/// producing flat-format record batches.
pub struct BatchToRecordBatchAdapter {
iter: BoxedBatchIterator,
codec: Arc<dyn PrimaryKeyCodec>,
output_schema: SchemaRef,
projected_pk: Vec<ProjectedPkColumn>,
}
struct ProjectedPkColumn {
column_id: ColumnId,
pk_index: usize,
data_type: ConcreteDataType,
}
impl BatchToRecordBatchAdapter {
/// Creates a new adapter.
///
/// - `iter`: the source batch iterator producing primary-key-format batches.
/// - `metadata`: region metadata describing the schema.
/// - `codec`: codec for decoding the encoded primary key bytes.
/// - `read_column_ids`: projected column ids to read.
pub(crate) fn new(
iter: BoxedBatchIterator,
metadata: RegionMetadataRef,
codec: Arc<dyn PrimaryKeyCodec>,
read_column_ids: &[ColumnId],
) -> Self {
let read_column_id_set: HashSet<_> = read_column_ids.iter().copied().collect();
let projected_pk = metadata
.primary_key_columns()
.enumerate()
.filter(|(_, column_metadata)| read_column_id_set.contains(&column_metadata.column_id))
.map(|(pk_index, column_metadata)| ProjectedPkColumn {
column_id: column_metadata.column_id,
pk_index,
data_type: column_metadata.column_schema.data_type.clone(),
})
.collect();
let output_schema = compute_output_arrow_schema(&metadata, &read_column_id_set);
Self {
iter,
codec,
output_schema,
projected_pk,
}
}
/// Converts a single [`Batch`] into a flat-format [`RecordBatch`].
fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
let num_rows = batch.num_rows();
let pk_values = if let Some(vals) = batch.pk_values() {
Cow::Borrowed(vals)
} else {
Cow::Owned(
self.codec
.decode(batch.primary_key())
.context(DecodeSnafu)?,
)
};
let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
for pk_column in &self.projected_pk {
if pk_column.data_type.is_string() {
let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
columns.push(build_string_tag_dict_array(
value,
&pk_column.data_type,
num_rows,
));
} else {
let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
let array = build_repeated_value_array(value, &pk_column.data_type, num_rows)?;
columns.push(array);
}
}
for batch_col in batch.fields() {
columns.push(batch_col.data.to_arrow_array());
}
columns.push(batch.timestamps().to_arrow_array());
// __primary_key
let pk_bytes = batch.primary_key();
let values = Arc::new(BinaryArray::from_iter_values([pk_bytes]));
let keys = UInt32Array::from(vec![0u32; num_rows]);
let pk_dict: ArrayRef = Arc::new(DictionaryArray::new(keys, values));
columns.push(pk_dict);
// __sequence.
columns.push(batch.sequences().to_arrow_array());
// __op_type.
columns.push(batch.op_types().to_arrow_array());
RecordBatch::try_new(self.output_schema.clone(), columns).context(NewRecordBatchSnafu)
}
}
impl Iterator for BatchToRecordBatchAdapter {
type Item = Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.iter.next()? {
Ok(batch) => {
if batch.is_empty() {
continue;
}
return Some(self.convert_batch(&batch));
}
Err(e) => return Some(Err(e)),
}
}
}
}
/// Extracts a value for the given primary key column from decoded [`CompositeValues`].
fn get_pk_value(
pk_values: &CompositeValues,
column_id: ColumnId,
pk_index: usize,
) -> &datatypes::value::Value {
match pk_values {
CompositeValues::Dense(dense) => {
if pk_index < dense.len() {
&dense[pk_index].1
} else {
&datatypes::value::Value::Null
}
}
CompositeValues::Sparse(sparse) => sparse.get_or_null(column_id),
}
}
/// Builds an Arrow array of `num_rows` copies of `value`.
fn build_repeated_value_array(
value: &datatypes::value::Value,
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<ArrayRef> {
let scalar = value
.try_to_scalar_value(data_type)
.context(DataTypeMismatchSnafu)?;
scalar
.to_array_of_size(num_rows)
.context(EvalPartitionFilterSnafu)
}
/// Builds a dictionary-encoded string tag array with one dictionary value.
fn build_string_tag_dict_array(
value: &datatypes::value::Value,
data_type: &ConcreteDataType,
num_rows: usize,
) -> ArrayRef {
let mut builder = data_type.create_mutable_vector(1);
builder.push_value_ref(&value.as_value_ref());
let values = builder.to_vector().to_arrow_array();
let keys = UInt32Array::from(vec![0u32; num_rows]);
Arc::new(DictionaryArray::new(keys, values))
}
fn compute_output_arrow_schema(
metadata: &RegionMetadataRef,
read_column_id_set: &HashSet<ColumnId>,
) -> SchemaRef {
let mut fields = Vec::new();
for column_metadata in metadata.primary_key_columns() {
if !read_column_id_set.contains(&column_metadata.column_id) {
continue;
}
let field = Arc::new(Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
));
let field = if column_metadata.semantic_type == SemanticType::Tag {
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
} else {
field
};
fields.push(field);
}
for column_metadata in metadata.field_columns() {
if !read_column_id_set.contains(&column_metadata.column_id) {
continue;
}
let field = Arc::new(Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
));
fields.push(field);
}
let time_index = metadata.time_index_column();
let time_index_field = Arc::new(Field::new(
&time_index.column_schema.name,
time_index.column_schema.data_type.as_arrow_type(),
time_index.column_schema.is_nullable(),
));
fields.push(time_index_field);
fields.extend(internal_fields().iter().cloned());
Arc::new(datatypes::arrow::datatypes::Schema::new(fields))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use datatypes::arrow::array::{Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
use datatypes::arrow::datatypes::UInt32Type;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::RegionId;
use super::*;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::new_batch_builder;
use crate::test_util::sst_util::{new_primary_key, sst_region_metadata};
/// Helper to build the adapter from batches and metadata.
fn build_adapter(
batches: Vec<Batch>,
metadata: &RegionMetadataRef,
codec: &Arc<dyn PrimaryKeyCodec>,
) -> BatchToRecordBatchAdapter {
let read_column_ids = metadata
.column_metadatas
.iter()
.map(|column| column.column_id)
.collect::<Vec<_>>();
let iter: BoxedBatchIterator = Box::new(batches.into_iter().map(Ok));
BatchToRecordBatchAdapter::new(
iter,
Arc::clone(metadata),
Arc::clone(codec),
&read_column_ids,
)
}
#[test]
fn test_single_batch_two_tags() {
// Schema: tag_0(string), tag_1(string), field_0(u64), ts
let metadata = Arc::new(sst_region_metadata());
let codec = build_primary_key_codec(&metadata);
let pk = new_primary_key(&["host-1", "region-a"]);
let batch = new_batch_builder(
&pk,
&[1, 2, 3],
&[100, 100, 100],
&[OpType::Put, OpType::Put, OpType::Put],
2,
&[10, 20, 30],
)
.build()
.unwrap();
let adapter = build_adapter(vec![batch], &metadata, &codec);
let results: Vec<_> = adapter.collect::<Vec<_>>();
assert_eq!(1, results.len());
let rb = results[0].as_ref().unwrap();
let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
assert_eq!(rb.schema(), expected_schema);
assert_eq!(3, rb.num_rows());
// 2 tags + 1 field + 1 time index + 3 internal = 7 columns
assert_eq!(7, rb.num_columns());
}
#[test]
fn test_multiple_batches() {
let metadata = Arc::new(sst_region_metadata());
let codec = build_primary_key_codec(&metadata);
let pk1 = new_primary_key(&["a", "b"]);
let batch1 = new_batch_builder(
&pk1,
&[1, 2],
&[100, 100],
&[OpType::Put, OpType::Put],
2,
&[10, 20],
)
.build()
.unwrap();
let pk2 = new_primary_key(&["c", "d"]);
let batch2 = new_batch_builder(
&pk2,
&[3, 4],
&[200, 200],
&[OpType::Put, OpType::Put],
2,
&[30, 40],
)
.build()
.unwrap();
let adapter = build_adapter(vec![batch1, batch2], &metadata, &codec);
let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
assert_eq!(2, results.len());
assert_eq!(2, results[0].num_rows());
assert_eq!(2, results[1].num_rows());
}
#[test]
fn test_empty_batch_skipped() {
let metadata = Arc::new(sst_region_metadata());
let codec = build_primary_key_codec(&metadata);
let empty = Batch::empty();
let pk = new_primary_key(&["x", "y"]);
let batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[42])
.build()
.unwrap();
let adapter = build_adapter(vec![empty, batch], &metadata, &codec);
let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
assert_eq!(1, results.len());
assert_eq!(1, results[0].num_rows());
}
#[test]
fn test_no_tags() {
// Schema with no primary key columns: field_0(u64), ts
let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_0".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
});
builder.primary_key(vec![]);
let metadata = Arc::new(builder.build().unwrap());
let codec = build_primary_key_codec(&metadata);
// Empty primary key
let pk = vec![];
let batch = new_batch_builder(
&pk,
&[1, 2],
&[100, 100],
&[OpType::Put, OpType::Put],
0,
&[10, 20],
)
.build()
.unwrap();
let adapter = build_adapter(vec![batch], &metadata, &codec);
let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
assert_eq!(1, results.len());
let rb = &results[0];
let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
assert_eq!(rb.schema(), expected_schema);
// 0 tags + 1 field + 1 time index + 3 internal = 5 columns
assert_eq!(5, rb.num_columns());
assert_eq!(2, rb.num_rows());
}
#[test]
fn test_primary_key_dict_column() {
// Verify the __primary_key column is a proper dictionary array.
let metadata = Arc::new(sst_region_metadata());
let codec = build_primary_key_codec(&metadata);
let pk = new_primary_key(&["host", "az"]);
let batch = new_batch_builder(
&pk,
&[1, 2],
&[1, 1],
&[OpType::Put, OpType::Put],
2,
&[5, 6],
)
.build()
.unwrap();
let adapter = build_adapter(vec![batch.clone()], &metadata, &codec);
let rb = adapter.into_iter().next().unwrap().unwrap();
// __primary_key is at num_columns - 3
let pk_col_idx = rb.num_columns() - 3;
let pk_array = rb
.column(pk_col_idx)
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
.expect("should be DictionaryArray<UInt32>");
// Should have 2 rows, all pointing to key 0
assert_eq!(2, pk_array.len());
assert_eq!(0, pk_array.keys().value(0));
assert_eq!(0, pk_array.keys().value(1));
// The single dictionary value should be the encoded pk bytes.
let values = pk_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
assert_eq!(1, values.len());
assert_eq!(batch.primary_key(), values.value(0));
}
#[test]
fn test_sequence_and_op_type_columns() {
let metadata = Arc::new(sst_region_metadata());
let codec = build_primary_key_codec(&metadata);
let pk = new_primary_key(&["a", "b"]);
let batch = new_batch_builder(
&pk,
&[10, 20, 30],
&[1, 2, 3],
&[OpType::Put, OpType::Delete, OpType::Put],
2,
&[100, 200, 300],
)
.build()
.unwrap();
let adapter = build_adapter(vec![batch], &metadata, &codec);
let rb = adapter.into_iter().next().unwrap().unwrap();
// __sequence is at num_columns - 2
let seq_idx = rb.num_columns() - 2;
let seq_array = rb
.column(seq_idx)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(&[1u64, 2, 3], seq_array.values().as_ref());
// __op_type is at num_columns - 1
let op_idx = rb.num_columns() - 1;
let op_array = rb
.column(op_idx)
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap();
assert_eq!(
&[OpType::Put as u8, OpType::Delete as u8, OpType::Put as u8],
op_array.values().as_ref()
);
}
#[test]
fn test_integer_tag_column() {
// Schema with an integer (non-string) tag: tag_0(u32), field_0(u64), ts
let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::uint32_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_0".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
});
builder.primary_key(vec![0]);
let metadata = Arc::new(builder.build().unwrap());
let codec = build_primary_key_codec(&metadata);
// Encode integer primary key
let pk = {
use datatypes::value::ValueRef;
use mito_codec::row_converter::PrimaryKeyCodecExt;
let codec_ext = mito_codec::row_converter::DensePrimaryKeyCodec::with_fields(vec![(
0,
mito_codec::row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
)]);
codec_ext
.encode([ValueRef::UInt32(42)].into_iter())
.unwrap()
};
let batch = new_batch_builder(
&pk,
&[1, 2],
&[1, 1],
&[OpType::Put, OpType::Put],
1,
&[10, 20],
)
.build()
.unwrap();
let adapter = build_adapter(vec![batch], &metadata, &codec);
let rb = adapter.into_iter().next().unwrap().unwrap();
let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
assert_eq!(rb.schema(), expected_schema);
// tag_0 column (index 0) should be a regular (non-dictionary) UInt32 array
let tag_array = rb
.column(0)
.as_any()
.downcast_ref::<UInt32Array>()
.expect("integer tag should be a plain UInt32Array");
assert_eq!(&[42u32, 42], tag_array.values().as_ref());
}
#[test]
fn test_with_precomputed_pk_values() {
// If pk_values are already set on the Batch, the adapter should use them
// instead of calling codec.decode().
let metadata = Arc::new(sst_region_metadata());
let codec = build_primary_key_codec(&metadata);
let pk = new_primary_key(&["pre", "computed"]);
let mut batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[99])
.build()
.unwrap();
// Decode and set pk_values ahead of time.
let decoded = codec.decode(&pk).unwrap();
batch.set_pk_values(decoded);
let adapter = build_adapter(vec![batch], &metadata, &codec);
let rb = adapter.into_iter().next().unwrap().unwrap();
assert_eq!(1, rb.num_rows());
let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
assert_eq!(rb.schema(), expected_schema);
}
#[test]
fn test_partial_projection_schema_matches_mapper() {
let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_0".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_1".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 4,
});
builder.primary_key(vec![0, 1]);
let metadata = Arc::new(builder.build().unwrap());
let codec = build_primary_key_codec(&metadata);
// Project tag_0 and field_1; skip tag_1 and field_0.
let read_column_ids = vec![0, 3];
let pk = new_primary_key(&["host-1", "region-a"]);
let batch = new_batch_builder(
&pk,
&[1, 2, 3],
&[100, 100, 100],
&[OpType::Put, OpType::Put, OpType::Put],
3,
&[10, 20, 30],
)
.build()
.unwrap();
let iter: BoxedBatchIterator = Box::new(vec![Ok(batch)].into_iter());
let adapter =
BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids);
let rb = adapter.into_iter().next().unwrap().unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap();
assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
// tag_0 + field_1 + ts + 3 internal columns.
assert_eq!(6, rb.num_columns());
assert_eq!(3, rb.num_rows());
let field_1 = rb.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
assert_eq!(&[10u64, 20, 30], field_1.values().as_ref());
let ts = rb
.column(2)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(&[1i64, 2, 3], ts.values().as_ref());
}
}

View File

@@ -325,7 +325,7 @@ pub(crate) fn flat_projected_columns(
///
/// # Panics
/// Panics if it can't find the column by the column id in the batch_schema.
fn compute_input_arrow_schema(
pub(crate) fn compute_input_arrow_schema(
metadata: &RegionMetadata,
batch_schema: &[(ColumnId, ConcreteDataType)],
) -> datatypes::arrow::datatypes::SchemaRef {

View File

@@ -1181,6 +1181,7 @@ pub(crate) fn scan_flat_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
time_range: FileTimeRange,
) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! {
let ranges = stream_ctx.input.build_mem_ranges(index);
@@ -1188,7 +1189,7 @@ pub(crate) fn scan_flat_mem_ranges(
for range in ranges {
let build_reader_start = Instant::now();
let mem_scan_metrics = Some(MemScanMetrics::default());
let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
while let Some(record_batch) = iter.next().transpose()? {

View File

@@ -897,7 +897,12 @@ pub(crate) async fn build_flat_sources(
for (position, index) in range_meta.row_group_indices.iter().enumerate() {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
let stream = scan_flat_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
);
ordered_sources[position] = Some(Box::pin(stream) as _);
} else if stream_ctx.is_file_range_index(*index) {
if let Some(semaphore_ref) = semaphore.as_ref() {

View File

@@ -179,6 +179,7 @@ impl UnorderedScan {
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
);
for await record_batch in stream {
yield record_batch?;