diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 29a697d557..ebe994f861 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -193,7 +193,7 @@ fn filter_1_host(c: &mut Criterion) { .unwrap(); for (_range_id, range) in ranges.ranges.iter() { - let iter = range.build_record_batch_iter(None).unwrap(); + let iter = range.build_record_batch_iter(None, None).unwrap(); for batch in iter { let _batch = batch.unwrap(); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index dd575ac687..0c16544b6e 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -783,7 +783,7 @@ fn memtable_flat_sources( if let Some(encoded) = only_range.encoded() { flat_sources.encoded.push((encoded, max_sequence)); } else { - let iter = only_range.build_record_batch_iter(None)?; + let iter = only_range.build_record_batch_iter(None, None)?; // Dedup according to append mode and merge mode. // Even single range may have duplicate rows. let iter = maybe_dedup_one( @@ -822,7 +822,7 @@ fn memtable_flat_sources( continue; } - let iter = range.build_record_batch_iter(None)?; + let iter = range.build_record_batch_iter(None, None)?; input_iters.push(iter); let range_rows = range.num_rows(); last_iter_rows += range_rows; diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index a352a10805..c39bbfa346 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -26,6 +26,7 @@ use common_time::Timestamp; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::key_values::KeyValue; pub use mito_codec::key_values::KeyValues; +use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec}; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber, SequenceRange}; @@ -38,6 +39,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; +use crate::read::batch_adapter::BatchToRecordBatchAdapter; use crate::read::prune::PruneTimeIterator; use crate::read::scan_region::PredicateGroup; use crate::region::options::{MemtableOptions, MergeMode, RegionOptions}; @@ -560,6 +562,57 @@ pub trait IterBuilder: Send + Sync { pub type BoxedIterBuilder = Box; +/// Computes the column IDs to read based on the projection. +/// +/// If `projection` is `Some`, returns those column IDs. If `None`, returns all column IDs +/// from the metadata. +pub fn read_column_ids_from_projection( + metadata: &RegionMetadataRef, + projection: Option<&[ColumnId]>, +) -> Vec { + if let Some(projection) = projection { + projection.to_vec() + } else { + metadata + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect() + } +} + +/// Context to adapt batch iterators to record batch iterators for flat scan. +pub struct BatchToRecordBatchContext { + metadata: RegionMetadataRef, + codec: Arc, + read_column_ids: Vec, +} + +impl BatchToRecordBatchContext { + /// Creates a new context for adapting batch iterators. + pub fn new(metadata: RegionMetadataRef, mut read_column_ids: Vec) -> Self { + if read_column_ids.is_empty() { + read_column_ids.push(metadata.time_index_column().column_id); + } + + let codec = build_primary_key_codec(&metadata); + Self { + metadata, + codec, + read_column_ids, + } + } + + fn adapt_iter(&self, iter: BoxedBatchIterator) -> BoxedRecordBatchIterator { + Box::new(BatchToRecordBatchAdapter::new( + iter, + self.metadata.clone(), + self.codec.clone(), + &self.read_column_ids, + )) + } +} + /// Context shared by ranges of the same memtable. pub struct MemtableRangeContext { /// Id of the memtable. @@ -568,6 +621,8 @@ pub struct MemtableRangeContext { builder: BoxedIterBuilder, /// All filters. predicate: PredicateGroup, + /// Optional context to adapt batch iterators for flat scans. + batch_to_record_batch: Option>, } pub type MemtableRangeContextRef = Arc; @@ -575,10 +630,21 @@ pub type MemtableRangeContextRef = Arc; impl MemtableRangeContext { /// Creates a new [MemtableRangeContext]. pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self { + Self::new_with_batch_to_record_batch(id, builder, predicate, None) + } + + /// Creates a new [MemtableRangeContext] with optional adapter context. + pub fn new_with_batch_to_record_batch( + id: MemtableId, + builder: BoxedIterBuilder, + predicate: PredicateGroup, + batch_to_record_batch: Option>, + ) -> Self { Self { id, builder, predicate, + batch_to_record_batch, } } } @@ -630,15 +696,34 @@ impl MemtableRange { self.context.builder.build(None) } - /// Builds a record batch iterator to read all rows in range. + /// Builds a record batch iterator to read rows in range. /// - /// This method doesn't take the optional time range because a bulk part is immutable - /// so we don't need to filter rows out of the time range. + /// For mutable memtables (adapter path), applies time-range pruning to ensure rows + /// outside the time range are filtered, matching the behavior of `build_prune_iter`. pub fn build_record_batch_iter( &self, + time_range: Option, metrics: Option, ) -> Result { - self.context.builder.build_record_batch(metrics) + if self.context.builder.is_record_batch() { + return self.context.builder.build_record_batch(metrics); + } + + if let Some(context) = self.context.batch_to_record_batch.as_ref() { + let iter = self.context.builder.build(metrics)?; + let iter: BoxedBatchIterator = if let Some(time_range) = time_range { + let time_filters = self.context.predicate.time_filters(); + Box::new(PruneTimeIterator::new(iter, time_range, time_filters)) + } else { + iter + }; + return Ok(context.adapt_iter(iter)); + } + + UnsupportedOperationSnafu { + err_msg: "Record batch iterator is not supported by this memtable", + } + .fail() } /// Returns whether the iterator is a record batch iterator. @@ -658,6 +743,8 @@ impl MemtableRange { #[cfg(test)] mod tests { + use std::sync::Arc; + use common_base::readable_size::ReadableSize; use super::*; diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 3817560932..cf2ced06fe 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -1485,7 +1485,7 @@ mod tests { assert!(range.num_rows() > 0); assert!(range.is_record_batch()); - let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); let mut total_rows = 0; for batch_result in record_batch_iter { @@ -1535,7 +1535,7 @@ mod tests { let range = ranges.ranges.get(&0).unwrap(); assert!(range.is_record_batch()); - let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); let mut total_rows = 0; for batch_result in record_batch_iter { @@ -1731,7 +1731,7 @@ mod tests { assert_eq!(1, ranges.ranges.len()); let range = ranges.ranges.get(&0).unwrap(); - let mut record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); assert!(record_batch_iter.next().is_none()); } @@ -1786,7 +1786,7 @@ mod tests { assert!(range.num_rows() > 0); assert!(range.is_record_batch()); - let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); let mut total_rows = 0; for batch_result in record_batch_iter { let batch = batch_result.unwrap(); @@ -1870,7 +1870,7 @@ mod tests { let mut total_rows_read = 0; for (_range_id, range) in ranges.ranges.iter() { assert!(range.is_record_batch()); - let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); for batch_result in record_batch_iter { let batch = batch_result.unwrap(); @@ -1957,7 +1957,7 @@ mod tests { let mut total_rows_read = 0; for (_range_id, range) in ranges.ranges.iter() { - let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); for batch_result in record_batch_iter { let batch = batch_result.unwrap(); total_rows_read += batch.num_rows(); @@ -2016,7 +2016,7 @@ mod tests { // Verify data is sorted correctly in the range let range = ranges.ranges.get(&0).unwrap(); - let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); let mut total_rows = 0; for batch_result in record_batch_iter { @@ -2211,7 +2211,7 @@ mod tests { let mut total_rows_read = 0; for (_range_id, range) in ranges.ranges.iter() { assert!(range.is_record_batch()); - let record_batch_iter = range.build_record_batch_iter(None).unwrap(); + let record_batch_iter = range.build_record_batch_iter(None, None).unwrap(); for batch_result in record_batch_iter { let batch = batch_result.unwrap(); diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index d16dc6a6cc..febae46784 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -42,9 +42,9 @@ use crate::memtable::bulk::part::BulkPart; use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, - MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, - MemtableStats, RangesOptions, + AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues, + MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, + MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection, }; use crate::region::options::MergeMode; @@ -194,6 +194,7 @@ impl Memtable for PartitionTreeMemtable { ) -> Result { let predicate = options.predicate; let sequence = options.sequence; + let read_column_ids = read_column_ids_from_projection(&self.tree.metadata, projection); let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { tree: self.tree.clone(), @@ -201,7 +202,16 @@ impl Memtable for PartitionTreeMemtable { predicate: predicate.predicate().cloned(), sequence, }); - let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); + let adapter_context = Arc::new(BatchToRecordBatchContext::new( + self.tree.metadata.clone(), + read_column_ids, + )); + let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch( + self.id, + builder, + predicate, + Some(adapter_context), + )); let range_stats = self.stats(); let range = MemtableRange::new(context, range_stats); @@ -933,4 +943,114 @@ mod tests { .collect::>(); assert_eq!(kvs, expected); } + + #[test] + fn test_build_record_batch_iter_from_memtable() { + let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true)); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let memtable = PartitionTreeMemtable::new( + 1, + codec, + metadata.clone(), + None, + &PartitionTreeConfig::default(), + ); + + let kvs = + memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &[1, 2, 3], 0); + memtable.write(&kvs).unwrap(); + + let read_column_ids: Vec = metadata + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + let ranges = memtable + .ranges(Some(&read_column_ids), RangesOptions::default()) + .unwrap(); + assert!(!ranges.ranges.is_empty()); + + let mut total_rows = 0; + for range in ranges.ranges.into_values() { + let mut iter = range.build_record_batch_iter(None, None).unwrap(); + while let Some(rb) = iter.next().transpose().unwrap() { + total_rows += rb.num_rows(); + let schema = rb.schema(); + let column_names: Vec<_> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + column_names, + vec![ + "__table_id", + "k0", + "v0", + "v1", + "ts", + "__primary_key", + "__sequence", + "__op_type", + ] + ); + } + } + assert_eq!(3, total_rows); + } + + #[test] + fn test_build_record_batch_iter_with_time_range() { + let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true)); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let memtable = PartitionTreeMemtable::new( + 1, + codec, + metadata.clone(), + None, + &PartitionTreeConfig::default(), + ); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 42, + &[1, 2, 3, 4, 5], + 0, + ); + memtable.write(&kvs).unwrap(); + + let read_column_ids: Vec = metadata + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + let ranges = memtable + .ranges(Some(&read_column_ids), RangesOptions::default()) + .unwrap(); + assert!(!ranges.ranges.is_empty()); + + let time_range = (Timestamp::new_millisecond(2), Timestamp::new_millisecond(4)); + + let mut total_rows = 0; + let mut all_timestamps = Vec::new(); + for range in ranges.ranges.into_values() { + let mut iter = range + .build_record_batch_iter(Some(time_range), None) + .unwrap(); + while let Some(rb) = iter.next().transpose().unwrap() { + total_rows += rb.num_rows(); + // ts column is at index 4 (after __table_id, k0, v0, v1) + let ts_col = rb + .column_by_name("ts") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ts_col.len() { + all_timestamps.push(ts_col.value(i)); + } + } + } + assert_eq!(3, total_rows); + all_timestamps.sort(); + assert_eq!(vec![2, 3, 4], all_timestamps); + } } diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 299f9e5f00..4dcaa2bac0 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -34,8 +34,9 @@ use crate::memtable::bulk::part::BulkPart; use crate::memtable::stats::WriteMetrics; use crate::memtable::time_series::Series; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId, - MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions, + AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues, + MemScanMetrics, Memtable, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, + MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection, }; use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT; use crate::read::Batch; @@ -236,6 +237,7 @@ impl Memtable for SimpleBulkMemtable { let predicate = options.predicate; let sequence = options.sequence; let start_time = Instant::now(); + let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection); let projection = Arc::new(self.build_projection(projection)); // Use the memtable's overall time range and max sequence for all ranges @@ -255,6 +257,11 @@ impl Memtable for SimpleBulkMemtable { }; let values = self.series.read().unwrap().read_to_values(); + let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new( + self.region_metadata.clone(), + read_column_ids.clone(), + )); + let contexts = values .into_par_iter() .filter_map(|v| { @@ -298,10 +305,11 @@ impl Memtable for SimpleBulkMemtable { }; ( range_stats, - Arc::new(MemtableRangeContext::new( + Arc::new(MemtableRangeContext::new_with_batch_to_record_batch( self.id, Box::new(builder), predicate.clone(), + Some(batch_to_record_batch.clone()), )), ) }) @@ -941,4 +949,44 @@ mod tests { } assert_eq!(rows, 2); } + + #[test] + fn test_build_record_batch_iter_from_memtable() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + + let kvs = build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string()), (2, 2.0, "b".to_string())], + OpType::Put, + ); + memtable.write(&kvs).unwrap(); + + let read_column_ids: Vec = memtable + .region_metadata + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + let ranges = memtable + .ranges(Some(&read_column_ids), RangesOptions::default()) + .unwrap(); + assert!(!ranges.ranges.is_empty()); + + let mut total_rows = 0; + for range in ranges.ranges.into_values() { + let mut iter = range.build_record_batch_iter(None, None).unwrap(); + while let Some(rb) = iter.next().transpose().unwrap() { + total_rows += rb.num_rows(); + let schema = rb.schema(); + let column_names: Vec<_> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + column_names, + vec!["f1", "f2", "ts", "__primary_key", "__sequence", "__op_type"] + ); + } + } + assert_eq!(2, total_rows); + } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index fdf051e1b7..271a9343eb 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -51,9 +51,9 @@ use crate::memtable::bulk::part::BulkPart; use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, - MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, - MemtableStats, RangesOptions, + AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues, + MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, + MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection, }; use crate::metrics::{ MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL, @@ -307,6 +307,7 @@ impl Memtable for TimeSeriesMemtable { ) -> Result { let predicate = options.predicate; let sequence = options.sequence; + let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection); let projection = if let Some(projection) = projection { projection.iter().copied().collect() } else { @@ -323,7 +324,16 @@ impl Memtable for TimeSeriesMemtable { merge_mode: self.merge_mode, sequence, }); - let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); + let adapter_context = Arc::new(BatchToRecordBatchContext::new( + self.region_metadata.clone(), + read_column_ids, + )); + let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch( + self.id, + builder, + predicate, + Some(adapter_context), + )); let range_stats = self.stats(); let range = MemtableRange::new(context, range_stats); @@ -1938,4 +1948,89 @@ mod tests { assert_eq!(total_series, series_count); assert_eq!(total_series * rows_per_series, row_count); } + + #[test] + fn test_build_record_batch_iter_from_memtable() { + let schema = schema_for_test(); + let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow); + + let kvs = build_key_values(&schema, "test".to_string(), 1, 10); + memtable.write(&kvs).unwrap(); + + let read_column_ids: Vec = schema + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + let ranges = memtable + .ranges(Some(&read_column_ids), RangesOptions::default()) + .unwrap(); + assert_eq!(1, ranges.ranges.len()); + + let range = ranges.ranges.into_values().next().unwrap(); + let mut iter = range.build_record_batch_iter(None, None).unwrap(); + let rb = iter.next().transpose().unwrap().unwrap(); + assert_eq!(10, rb.num_rows()); + // k0, k1 (pk columns), v0, v1 (field columns), ts, __primary_key, __sequence, __op_type + let schema = rb.schema(); + let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + column_names, + vec![ + "k0", + "k1", + "v0", + "v1", + "ts", + "__primary_key", + "__sequence", + "__op_type", + ] + ); + assert!(iter.next().is_none()); + } + + #[test] + fn test_build_record_batch_iter_with_time_range() { + let schema = schema_for_test(); + let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow); + + let kvs = build_key_values(&schema, "test".to_string(), 1, 10); + memtable.write(&kvs).unwrap(); + + let read_column_ids: Vec = schema + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + let ranges = memtable + .ranges(Some(&read_column_ids), RangesOptions::default()) + .unwrap(); + assert_eq!(1, ranges.ranges.len()); + + let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7)); + + let range = ranges.ranges.into_values().next().unwrap(); + let mut iter = range + .build_record_batch_iter(Some(time_range), None) + .unwrap(); + + let mut total_rows = 0; + let mut all_timestamps = Vec::new(); + while let Some(rb) = iter.next().transpose().unwrap() { + total_rows += rb.num_rows(); + let ts_col = rb + .column_by_name("ts") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ts_col.len() { + all_timestamps.push(ts_col.value(i)); + } + } + assert_eq!(5, total_rows); + all_timestamps.sort(); + assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps); + } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 9ffff99ae3..5fbd63ce8b 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -14,6 +14,7 @@ //! Common structs and utilities for reading data. +pub mod batch_adapter; pub mod compat; pub mod dedup; pub mod flat_dedup; diff --git a/src/mito2/src/read/batch_adapter.rs b/src/mito2/src/read/batch_adapter.rs new file mode 100644 index 0000000000..461dbeba69 --- /dev/null +++ b/src/mito2/src/read/batch_adapter.rs @@ -0,0 +1,700 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Adapter to convert [`BoxedBatchIterator`] (primary key format) into an iterator +//! of flat-format Arrow [`RecordBatch`]es, allowing memtable iterators that only +//! produce [`Batch`] to feed into the flat read pipeline. + +use std::borrow::Cow; +use std::collections::HashSet; +use std::sync::Arc; + +use api::v1::SemanticType; +use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array}; +use datatypes::arrow::datatypes::{Field, SchemaRef}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::prelude::{ConcreteDataType, DataType, Vector}; +use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; +use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; + +use crate::error::{ + DataTypeMismatchSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, Result, +}; +use crate::memtable::BoxedBatchIterator; +use crate::read::Batch; +use crate::sst::{internal_fields, tag_maybe_to_dictionary_field}; + +/// Adapts a [`BoxedBatchIterator`] into an `Iterator>` +/// producing flat-format record batches. +pub struct BatchToRecordBatchAdapter { + iter: BoxedBatchIterator, + codec: Arc, + output_schema: SchemaRef, + projected_pk: Vec, +} + +struct ProjectedPkColumn { + column_id: ColumnId, + pk_index: usize, + data_type: ConcreteDataType, +} + +impl BatchToRecordBatchAdapter { + /// Creates a new adapter. + /// + /// - `iter`: the source batch iterator producing primary-key-format batches. + /// - `metadata`: region metadata describing the schema. + /// - `codec`: codec for decoding the encoded primary key bytes. + /// - `read_column_ids`: projected column ids to read. + pub(crate) fn new( + iter: BoxedBatchIterator, + metadata: RegionMetadataRef, + codec: Arc, + read_column_ids: &[ColumnId], + ) -> Self { + let read_column_id_set: HashSet<_> = read_column_ids.iter().copied().collect(); + let projected_pk = metadata + .primary_key_columns() + .enumerate() + .filter(|(_, column_metadata)| read_column_id_set.contains(&column_metadata.column_id)) + .map(|(pk_index, column_metadata)| ProjectedPkColumn { + column_id: column_metadata.column_id, + pk_index, + data_type: column_metadata.column_schema.data_type.clone(), + }) + .collect(); + let output_schema = compute_output_arrow_schema(&metadata, &read_column_id_set); + + Self { + iter, + codec, + output_schema, + projected_pk, + } + } + + /// Converts a single [`Batch`] into a flat-format [`RecordBatch`]. + fn convert_batch(&self, batch: &Batch) -> Result { + let num_rows = batch.num_rows(); + + let pk_values = if let Some(vals) = batch.pk_values() { + Cow::Borrowed(vals) + } else { + Cow::Owned( + self.codec + .decode(batch.primary_key()) + .context(DecodeSnafu)?, + ) + }; + + let mut columns: Vec = Vec::with_capacity(self.output_schema.fields().len()); + for pk_column in &self.projected_pk { + if pk_column.data_type.is_string() { + let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index); + columns.push(build_string_tag_dict_array( + value, + &pk_column.data_type, + num_rows, + )); + } else { + let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index); + let array = build_repeated_value_array(value, &pk_column.data_type, num_rows)?; + columns.push(array); + } + } + for batch_col in batch.fields() { + columns.push(batch_col.data.to_arrow_array()); + } + + columns.push(batch.timestamps().to_arrow_array()); + + // __primary_key + let pk_bytes = batch.primary_key(); + let values = Arc::new(BinaryArray::from_iter_values([pk_bytes])); + let keys = UInt32Array::from(vec![0u32; num_rows]); + let pk_dict: ArrayRef = Arc::new(DictionaryArray::new(keys, values)); + columns.push(pk_dict); + + // __sequence. + columns.push(batch.sequences().to_arrow_array()); + + // __op_type. + columns.push(batch.op_types().to_arrow_array()); + + RecordBatch::try_new(self.output_schema.clone(), columns).context(NewRecordBatchSnafu) + } +} + +impl Iterator for BatchToRecordBatchAdapter { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + match self.iter.next()? { + Ok(batch) => { + if batch.is_empty() { + continue; + } + return Some(self.convert_batch(&batch)); + } + Err(e) => return Some(Err(e)), + } + } + } +} + +/// Extracts a value for the given primary key column from decoded [`CompositeValues`]. +fn get_pk_value( + pk_values: &CompositeValues, + column_id: ColumnId, + pk_index: usize, +) -> &datatypes::value::Value { + match pk_values { + CompositeValues::Dense(dense) => { + if pk_index < dense.len() { + &dense[pk_index].1 + } else { + &datatypes::value::Value::Null + } + } + CompositeValues::Sparse(sparse) => sparse.get_or_null(column_id), + } +} + +/// Builds an Arrow array of `num_rows` copies of `value`. +fn build_repeated_value_array( + value: &datatypes::value::Value, + data_type: &ConcreteDataType, + num_rows: usize, +) -> Result { + let scalar = value + .try_to_scalar_value(data_type) + .context(DataTypeMismatchSnafu)?; + scalar + .to_array_of_size(num_rows) + .context(EvalPartitionFilterSnafu) +} + +/// Builds a dictionary-encoded string tag array with one dictionary value. +fn build_string_tag_dict_array( + value: &datatypes::value::Value, + data_type: &ConcreteDataType, + num_rows: usize, +) -> ArrayRef { + let mut builder = data_type.create_mutable_vector(1); + builder.push_value_ref(&value.as_value_ref()); + let values = builder.to_vector().to_arrow_array(); + + let keys = UInt32Array::from(vec![0u32; num_rows]); + Arc::new(DictionaryArray::new(keys, values)) +} + +fn compute_output_arrow_schema( + metadata: &RegionMetadataRef, + read_column_id_set: &HashSet, +) -> SchemaRef { + let mut fields = Vec::new(); + + for column_metadata in metadata.primary_key_columns() { + if !read_column_id_set.contains(&column_metadata.column_id) { + continue; + } + let field = Arc::new(Field::new( + &column_metadata.column_schema.name, + column_metadata.column_schema.data_type.as_arrow_type(), + column_metadata.column_schema.is_nullable(), + )); + let field = if column_metadata.semantic_type == SemanticType::Tag { + tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field) + } else { + field + }; + fields.push(field); + } + + for column_metadata in metadata.field_columns() { + if !read_column_id_set.contains(&column_metadata.column_id) { + continue; + } + let field = Arc::new(Field::new( + &column_metadata.column_schema.name, + column_metadata.column_schema.data_type.as_arrow_type(), + column_metadata.column_schema.is_nullable(), + )); + fields.push(field); + } + + let time_index = metadata.time_index_column(); + let time_index_field = Arc::new(Field::new( + &time_index.column_schema.name, + time_index.column_schema.data_type.as_arrow_type(), + time_index.column_schema.is_nullable(), + )); + fields.push(time_index_field); + fields.extend(internal_fields().iter().cloned()); + + Arc::new(datatypes::arrow::datatypes::Schema::new(fields)) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::{OpType, SemanticType}; + use datatypes::arrow::array::{Array, TimestampMillisecondArray, UInt8Array, UInt64Array}; + use datatypes::arrow::datatypes::UInt32Type; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; + use store_api::storage::RegionId; + + use super::*; + use crate::read::flat_projection::FlatProjectionMapper; + use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; + use crate::test_util::new_batch_builder; + use crate::test_util::sst_util::{new_primary_key, sst_region_metadata}; + + /// Helper to build the adapter from batches and metadata. + fn build_adapter( + batches: Vec, + metadata: &RegionMetadataRef, + codec: &Arc, + ) -> BatchToRecordBatchAdapter { + let read_column_ids = metadata + .column_metadatas + .iter() + .map(|column| column.column_id) + .collect::>(); + let iter: BoxedBatchIterator = Box::new(batches.into_iter().map(Ok)); + BatchToRecordBatchAdapter::new( + iter, + Arc::clone(metadata), + Arc::clone(codec), + &read_column_ids, + ) + } + + #[test] + fn test_single_batch_two_tags() { + // Schema: tag_0(string), tag_1(string), field_0(u64), ts + let metadata = Arc::new(sst_region_metadata()); + let codec = build_primary_key_codec(&metadata); + + let pk = new_primary_key(&["host-1", "region-a"]); + let batch = new_batch_builder( + &pk, + &[1, 2, 3], + &[100, 100, 100], + &[OpType::Put, OpType::Put, OpType::Put], + 2, + &[10, 20, 30], + ) + .build() + .unwrap(); + + let adapter = build_adapter(vec![batch], &metadata, &codec); + let results: Vec<_> = adapter.collect::>(); + assert_eq!(1, results.len()); + + let rb = results[0].as_ref().unwrap(); + let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + assert_eq!(rb.schema(), expected_schema); + assert_eq!(3, rb.num_rows()); + // 2 tags + 1 field + 1 time index + 3 internal = 7 columns + assert_eq!(7, rb.num_columns()); + } + + #[test] + fn test_multiple_batches() { + let metadata = Arc::new(sst_region_metadata()); + let codec = build_primary_key_codec(&metadata); + + let pk1 = new_primary_key(&["a", "b"]); + let batch1 = new_batch_builder( + &pk1, + &[1, 2], + &[100, 100], + &[OpType::Put, OpType::Put], + 2, + &[10, 20], + ) + .build() + .unwrap(); + + let pk2 = new_primary_key(&["c", "d"]); + let batch2 = new_batch_builder( + &pk2, + &[3, 4], + &[200, 200], + &[OpType::Put, OpType::Put], + 2, + &[30, 40], + ) + .build() + .unwrap(); + + let adapter = build_adapter(vec![batch1, batch2], &metadata, &codec); + let results: Vec<_> = adapter.map(|r| r.unwrap()).collect(); + assert_eq!(2, results.len()); + + assert_eq!(2, results[0].num_rows()); + assert_eq!(2, results[1].num_rows()); + } + + #[test] + fn test_empty_batch_skipped() { + let metadata = Arc::new(sst_region_metadata()); + let codec = build_primary_key_codec(&metadata); + + let empty = Batch::empty(); + let pk = new_primary_key(&["x", "y"]); + let batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[42]) + .build() + .unwrap(); + + let adapter = build_adapter(vec![empty, batch], &metadata, &codec); + let results: Vec<_> = adapter.map(|r| r.unwrap()).collect(); + assert_eq!(1, results.len()); + assert_eq!(1, results[0].num_rows()); + } + + #[test] + fn test_no_tags() { + // Schema with no primary key columns: field_0(u64), ts + let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }); + builder.primary_key(vec![]); + let metadata = Arc::new(builder.build().unwrap()); + let codec = build_primary_key_codec(&metadata); + + // Empty primary key + let pk = vec![]; + let batch = new_batch_builder( + &pk, + &[1, 2], + &[100, 100], + &[OpType::Put, OpType::Put], + 0, + &[10, 20], + ) + .build() + .unwrap(); + + let adapter = build_adapter(vec![batch], &metadata, &codec); + let results: Vec<_> = adapter.map(|r| r.unwrap()).collect(); + assert_eq!(1, results.len()); + + let rb = &results[0]; + let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + assert_eq!(rb.schema(), expected_schema); + // 0 tags + 1 field + 1 time index + 3 internal = 5 columns + assert_eq!(5, rb.num_columns()); + assert_eq!(2, rb.num_rows()); + } + + #[test] + fn test_primary_key_dict_column() { + // Verify the __primary_key column is a proper dictionary array. + let metadata = Arc::new(sst_region_metadata()); + let codec = build_primary_key_codec(&metadata); + + let pk = new_primary_key(&["host", "az"]); + let batch = new_batch_builder( + &pk, + &[1, 2], + &[1, 1], + &[OpType::Put, OpType::Put], + 2, + &[5, 6], + ) + .build() + .unwrap(); + + let adapter = build_adapter(vec![batch.clone()], &metadata, &codec); + let rb = adapter.into_iter().next().unwrap().unwrap(); + + // __primary_key is at num_columns - 3 + let pk_col_idx = rb.num_columns() - 3; + let pk_array = rb + .column(pk_col_idx) + .as_any() + .downcast_ref::>() + .expect("should be DictionaryArray"); + + // Should have 2 rows, all pointing to key 0 + assert_eq!(2, pk_array.len()); + assert_eq!(0, pk_array.keys().value(0)); + assert_eq!(0, pk_array.keys().value(1)); + + // The single dictionary value should be the encoded pk bytes. + let values = pk_array + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(1, values.len()); + assert_eq!(batch.primary_key(), values.value(0)); + } + + #[test] + fn test_sequence_and_op_type_columns() { + let metadata = Arc::new(sst_region_metadata()); + let codec = build_primary_key_codec(&metadata); + + let pk = new_primary_key(&["a", "b"]); + let batch = new_batch_builder( + &pk, + &[10, 20, 30], + &[1, 2, 3], + &[OpType::Put, OpType::Delete, OpType::Put], + 2, + &[100, 200, 300], + ) + .build() + .unwrap(); + + let adapter = build_adapter(vec![batch], &metadata, &codec); + let rb = adapter.into_iter().next().unwrap().unwrap(); + + // __sequence is at num_columns - 2 + let seq_idx = rb.num_columns() - 2; + let seq_array = rb + .column(seq_idx) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(&[1u64, 2, 3], seq_array.values().as_ref()); + + // __op_type is at num_columns - 1 + let op_idx = rb.num_columns() - 1; + let op_array = rb + .column(op_idx) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + &[OpType::Put as u8, OpType::Delete as u8, OpType::Put as u8], + op_array.values().as_ref() + ); + } + + #[test] + fn test_integer_tag_column() { + // Schema with an integer (non-string) tag: tag_0(u32), field_0(u64), ts + let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::uint32_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }); + builder.primary_key(vec![0]); + let metadata = Arc::new(builder.build().unwrap()); + let codec = build_primary_key_codec(&metadata); + + // Encode integer primary key + let pk = { + use datatypes::value::ValueRef; + use mito_codec::row_converter::PrimaryKeyCodecExt; + let codec_ext = mito_codec::row_converter::DensePrimaryKeyCodec::with_fields(vec![( + 0, + mito_codec::row_converter::SortField::new(ConcreteDataType::uint32_datatype()), + )]); + codec_ext + .encode([ValueRef::UInt32(42)].into_iter()) + .unwrap() + }; + let batch = new_batch_builder( + &pk, + &[1, 2], + &[1, 1], + &[OpType::Put, OpType::Put], + 1, + &[10, 20], + ) + .build() + .unwrap(); + + let adapter = build_adapter(vec![batch], &metadata, &codec); + let rb = adapter.into_iter().next().unwrap().unwrap(); + + let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + assert_eq!(rb.schema(), expected_schema); + + // tag_0 column (index 0) should be a regular (non-dictionary) UInt32 array + let tag_array = rb + .column(0) + .as_any() + .downcast_ref::() + .expect("integer tag should be a plain UInt32Array"); + assert_eq!(&[42u32, 42], tag_array.values().as_ref()); + } + + #[test] + fn test_with_precomputed_pk_values() { + // If pk_values are already set on the Batch, the adapter should use them + // instead of calling codec.decode(). + let metadata = Arc::new(sst_region_metadata()); + let codec = build_primary_key_codec(&metadata); + + let pk = new_primary_key(&["pre", "computed"]); + let mut batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[99]) + .build() + .unwrap(); + + // Decode and set pk_values ahead of time. + let decoded = codec.decode(&pk).unwrap(); + batch.set_pk_values(decoded); + + let adapter = build_adapter(vec![batch], &metadata, &codec); + let rb = adapter.into_iter().next().unwrap().unwrap(); + assert_eq!(1, rb.num_rows()); + + let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + assert_eq!(rb.schema(), expected_schema); + } + + #[test] + fn test_partial_projection_schema_matches_mapper() { + let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_1".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 4, + }); + builder.primary_key(vec![0, 1]); + let metadata = Arc::new(builder.build().unwrap()); + let codec = build_primary_key_codec(&metadata); + + // Project tag_0 and field_1; skip tag_1 and field_0. + let read_column_ids = vec![0, 3]; + + let pk = new_primary_key(&["host-1", "region-a"]); + let batch = new_batch_builder( + &pk, + &[1, 2, 3], + &[100, 100, 100], + &[OpType::Put, OpType::Put, OpType::Put], + 3, + &[10, 20, 30], + ) + .build() + .unwrap(); + + let iter: BoxedBatchIterator = Box::new(vec![Ok(batch)].into_iter()); + let adapter = + BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids); + let rb = adapter.into_iter().next().unwrap().unwrap(); + + let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap(); + assert_eq!(rb.schema(), mapper.input_arrow_schema(false)); + // tag_0 + field_1 + ts + 3 internal columns. + assert_eq!(6, rb.num_columns()); + assert_eq!(3, rb.num_rows()); + + let field_1 = rb.column(1).as_any().downcast_ref::().unwrap(); + assert_eq!(&[10u64, 20, 30], field_1.values().as_ref()); + + let ts = rb + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(&[1i64, 2, 3], ts.values().as_ref()); + } +} diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 7394a3c4ab..3e0f1169df 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -325,7 +325,7 @@ pub(crate) fn flat_projected_columns( /// /// # Panics /// Panics if it can't find the column by the column id in the batch_schema. -fn compute_input_arrow_schema( +pub(crate) fn compute_input_arrow_schema( metadata: &RegionMetadata, batch_schema: &[(ColumnId, ConcreteDataType)], ) -> datatypes::arrow::datatypes::SchemaRef { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index d405696bc0..0ee6a4437d 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1181,6 +1181,7 @@ pub(crate) fn scan_flat_mem_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, index: RowGroupIndex, + time_range: FileTimeRange, ) -> impl Stream> { try_stream! { let ranges = stream_ctx.input.build_mem_ranges(index); @@ -1188,7 +1189,7 @@ pub(crate) fn scan_flat_mem_ranges( for range in ranges { let build_reader_start = Instant::now(); let mem_scan_metrics = Some(MemScanMetrics::default()); - let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?; + let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?; part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); while let Some(record_batch) = iter.next().transpose()? { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index e5b486b068..121d1b6c2d 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -897,7 +897,12 @@ pub(crate) async fn build_flat_sources( for (position, index) in range_meta.row_group_indices.iter().enumerate() { if stream_ctx.is_mem_range_index(*index) { - let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index); + let stream = scan_flat_mem_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + range_meta.time_range, + ); ordered_sources[position] = Some(Box::pin(stream) as _); } else if stream_ctx.is_file_range_index(*index) { if let Some(semaphore_ref) = semaphore.as_ref() { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index dcd5d97041..73ab4eb73c 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -179,6 +179,7 @@ impl UnorderedScan { stream_ctx.clone(), part_metrics.clone(), *index, + range_meta.time_range, ); for await record_batch in stream { yield record_batch?;