diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 7ed0fdbb86..5da5c006ed 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -26,7 +26,7 @@ use either::Either; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; use snafu::ResultExt; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, SequenceNumber}; use strum::IntoStaticStr; use tokio::sync::{Semaphore, mpsc, watch}; @@ -464,24 +464,26 @@ impl RegionFlushTask { // Sets `for_flush` flag to true. let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?; let num_mem_ranges = mem_ranges.ranges.len(); - let num_mem_rows = mem_ranges.stats.num_rows(); + + // Aggregate stats from all ranges + let num_mem_rows = mem_ranges.num_rows(); + let memtable_series_count = mem_ranges.series_count(); let memtable_id = mem.id(); // Increases series count for each mem range. We consider each mem range has different series so // the counter may have more series than the actual series count. - series_count += mem_ranges.stats.series_count(); + series_count += memtable_series_count; if mem_ranges.is_record_batch() { let flush_start = Instant::now(); let FlushFlatMemResult { num_encoded, - max_sequence, num_sources, results, } = self .flush_flat_mem_ranges(version, &write_opts, mem_ranges) .await?; for (source_idx, result) in results.into_iter().enumerate() { - let (ssts_written, metrics) = result?; + let (max_sequence, ssts_written, metrics) = result?; if ssts_written.is_empty() { // No data written. continue; @@ -521,7 +523,7 @@ impl RegionFlushTask { compact_cost, ); } else { - let max_sequence = mem_ranges.stats.max_sequence(); + let max_sequence = mem_ranges.max_sequence(); let source = memtable_source(mem_ranges, &version.options).await?; // Flush to level 0. @@ -583,8 +585,7 @@ impl RegionFlushTask { )?; let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len()); let num_encoded = flat_sources.encoded.len(); - let max_sequence = flat_sources.max_sequence; - for source in flat_sources.sources { + for (source, max_sequence) in flat_sources.sources { let source = Either::Right(source); let write_request = self.new_write_request(version, max_sequence, source); let access_layer = self.access_layer.clone(); @@ -596,11 +597,11 @@ impl RegionFlushTask { let ssts = access_layer .write_sst(write_request, &write_opts, &mut metrics) .await?; - Ok((ssts, metrics)) + Ok((max_sequence, ssts, metrics)) }); tasks.push(task); } - for encoded in flat_sources.encoded { + for (encoded, max_sequence) in flat_sources.encoded { let access_layer = self.access_layer.clone(); let cache_manager = self.cache_manager.clone(); let region_id = version.metadata.region_id; @@ -610,7 +611,7 @@ impl RegionFlushTask { let metrics = access_layer .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager) .await?; - Ok((smallvec![encoded.sst_info], metrics)) + Ok((max_sequence, smallvec![encoded.sst_info], metrics)) }); tasks.push(task); } @@ -620,7 +621,6 @@ impl RegionFlushTask { .context(JoinSnafu)?; Ok(FlushFlatMemResult { num_encoded, - max_sequence, num_sources, results, }) @@ -696,9 +696,8 @@ impl RegionFlushTask { struct FlushFlatMemResult { num_encoded: usize, - max_sequence: u64, num_sources: usize, - results: Vec>, + results: Vec>, } struct DoFlushMemtablesResult { @@ -744,9 +743,8 @@ async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> } struct FlatSources { - max_sequence: u64, - sources: SmallVec<[FlatSource; 4]>, - encoded: SmallVec<[EncodedRange; 4]>, + sources: SmallVec<[(FlatSource, SequenceNumber); 4]>, + encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>, } /// Returns the max sequence and [FlatSource] for the given memtable. @@ -756,18 +754,17 @@ fn memtable_flat_sources( options: &RegionOptions, field_column_start: usize, ) -> Result { - let MemtableRanges { ranges, stats } = mem_ranges; - let max_sequence = stats.max_sequence(); + let MemtableRanges { ranges } = mem_ranges; let mut flat_sources = FlatSources { - max_sequence, sources: SmallVec::new(), encoded: SmallVec::new(), }; if ranges.len() == 1 { let only_range = ranges.into_values().next().unwrap(); + let max_sequence = only_range.stats().max_sequence(); if let Some(encoded) = only_range.encoded() { - flat_sources.encoded.push(encoded); + flat_sources.encoded.push((encoded, max_sequence)); } else { let iter = only_range.build_record_batch_iter(None)?; // Dedup according to append mode and merge mode. @@ -778,25 +775,39 @@ fn memtable_flat_sources( field_column_start, iter, ); - flat_sources.sources.push(FlatSource::Iter(iter)); + flat_sources + .sources + .push((FlatSource::Iter(iter), max_sequence)); }; } else { - let min_flush_rows = stats.num_rows / 8; + // Calculate total rows from all ranges for min_flush_rows calculation + let total_rows: usize = ranges.values().map(|r| r.stats().num_rows()).sum(); + let min_flush_rows = total_rows / 8; let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE); let mut last_iter_rows = 0; let num_ranges = ranges.len(); let mut input_iters = Vec::with_capacity(num_ranges); + let mut current_ranges = Vec::new(); for (_range_id, range) in ranges { if let Some(encoded) = range.encoded() { - flat_sources.encoded.push(encoded); + let max_sequence = range.stats().max_sequence(); + flat_sources.encoded.push((encoded, max_sequence)); continue; } let iter = range.build_record_batch_iter(None)?; input_iters.push(iter); last_iter_rows += range.num_rows(); + current_ranges.push(range); if last_iter_rows > min_flush_rows { + // Calculate max_sequence from all merged ranges + let max_sequence = current_ranges + .iter() + .map(|r| r.stats().max_sequence()) + .max() + .unwrap_or(0); + let maybe_dedup = merge_and_dedup( &schema, options.append_mode, @@ -805,13 +816,22 @@ fn memtable_flat_sources( std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), )?; - flat_sources.sources.push(FlatSource::Iter(maybe_dedup)); + flat_sources + .sources + .push((FlatSource::Iter(maybe_dedup), max_sequence)); last_iter_rows = 0; + current_ranges.clear(); } } // Handle remaining iters. if !input_iters.is_empty() { + let max_sequence = current_ranges + .iter() + .map(|r| r.stats().max_sequence()) + .max() + .unwrap_or(0); + let maybe_dedup = merge_and_dedup( &schema, options.append_mode, @@ -820,7 +840,9 @@ fn memtable_flat_sources( input_iters, )?; - flat_sources.sources.push(FlatSource::Iter(maybe_dedup)); + flat_sources + .sources + .push((FlatSource::Iter(maybe_dedup), max_sequence)); } } @@ -1491,7 +1513,7 @@ mod tests { // Consume the iterator and count rows let mut total_rows = 0usize; - for source in flat_sources.sources { + for (source, _sequence) in flat_sources.sources { match source { crate::read::FlatSource::Iter(iter) => { for rb in iter { @@ -1521,7 +1543,7 @@ mod tests { assert_eq!(1, flat_sources.sources.len()); let mut total_rows = 0usize; - for source in flat_sources.sources { + for (source, _sequence) in flat_sources.sources { match source { crate::read::FlatSource::Iter(iter) => { for rb in iter { diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index c9ff2c0a98..74d73a7a89 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -204,8 +204,27 @@ pub type BoxedRecordBatchIterator = Box> pub struct MemtableRanges { /// Range IDs and ranges. pub ranges: BTreeMap, - /// Statistics of the memtable at the query time. - pub stats: MemtableStats, +} + +impl MemtableRanges { + /// Returns the total number of rows across all ranges. + pub fn num_rows(&self) -> usize { + self.ranges.values().map(|r| r.stats().num_rows()).sum() + } + + /// Returns the total series count across all ranges. + pub fn series_count(&self) -> usize { + self.ranges.values().map(|r| r.stats().series_count()).sum() + } + + /// Returns the maximum sequence number across all ranges. + pub fn max_sequence(&self) -> SequenceNumber { + self.ranges + .values() + .map(|r| r.stats().max_sequence()) + .max() + .unwrap_or(0) + } } impl IterBuilder for MemtableRanges { @@ -569,15 +588,19 @@ impl MemtableRangeContext { pub struct MemtableRange { /// Shared context. context: MemtableRangeContextRef, - /// Number of rows in current memtable range. - // todo(hl): use [MemtableRangeStats] instead. - num_rows: usize, + /// Statistics for this memtable range. + stats: MemtableStats, } impl MemtableRange { - /// Creates a new range from context. - pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self { - Self { context, num_rows } + /// Creates a new range from context and stats. + pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self { + Self { context, stats } + } + + /// Returns the statistics for this range. + pub fn stats(&self) -> &MemtableStats { + &self.stats } /// Returns the id of the memtable to read. @@ -624,7 +647,7 @@ impl MemtableRange { } pub fn num_rows(&self) -> usize { - self.num_rows + self.stats.num_rows } /// Returns the encoded range if available. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index beae618520..dddeca103c 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -382,7 +382,7 @@ impl Memtable for BulkMemtable { if !bulk_parts.unordered_part.is_empty() && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()? { - let num_rows = unordered_bulk_part.num_rows(); + let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata); let range = MemtableRange::new( Arc::new(MemtableRangeContext::new( self.id, @@ -393,7 +393,7 @@ impl Memtable for BulkMemtable { }), predicate.clone(), )), - num_rows, + part_stats, ); ranges.insert(range_id, range); range_id += 1; @@ -406,6 +406,7 @@ impl Memtable for BulkMemtable { continue; } + let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata); let range = MemtableRange::new( Arc::new(MemtableRangeContext::new( self.id, @@ -416,7 +417,7 @@ impl Memtable for BulkMemtable { }), predicate.clone(), )), - part_wrapper.part.num_rows(), + part_stats, ); ranges.insert(range_id, range); range_id += 1; @@ -429,6 +430,7 @@ impl Memtable for BulkMemtable { continue; } + let part_stats = encoded_part_wrapper.part.to_memtable_stats(); let range = MemtableRange::new( Arc::new(MemtableRangeContext::new( self.id, @@ -440,18 +442,14 @@ impl Memtable for BulkMemtable { }), predicate.clone(), )), - encoded_part_wrapper.part.metadata().num_rows, + part_stats, ); ranges.insert(range_id, range); range_id += 1; } } - let mut stats = self.stats(); - stats.num_ranges = ranges.len(); - - // TODO(yingwen): Supports per range stats. - Ok(MemtableRanges { ranges, stats }) + Ok(MemtableRanges { ranges }) } fn is_empty(&self) -> bool { @@ -811,6 +809,14 @@ impl PartToMerge { } } + /// Gets the maximum sequence number of this part. + fn max_sequence(&self) -> u64 { + match self { + PartToMerge::Bulk { part, .. } => part.sequence, + PartToMerge::Encoded { part, .. } => part.metadata().max_sequence, + } + } + /// Creates a record batch iterator for this part. fn create_iterator( self, @@ -984,7 +990,7 @@ impl MemtableCompactor { return Ok(None); } - // Calculates timestamp bounds for merged data + // Calculates timestamp bounds and max sequence for merged data let min_timestamp = parts_to_merge .iter() .map(|p| p.min_timestamp()) @@ -995,6 +1001,11 @@ impl MemtableCompactor { .map(|p| p.max_timestamp()) .max() .unwrap_or(i64::MIN); + let max_sequence = parts_to_merge + .iter() + .map(|p| p.max_sequence()) + .max() + .unwrap_or(0); let context = Arc::new(BulkIterContext::new( metadata.clone(), @@ -1051,6 +1062,7 @@ impl MemtableCompactor { arrow_schema.clone(), min_timestamp, max_timestamp, + max_sequence, &mut metrics, )?; @@ -1278,7 +1290,8 @@ mod tests { .unwrap(); assert_eq!(3, ranges.ranges.len()); - assert_eq!(5, ranges.stats.num_rows); + let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum(); + assert_eq!(5, total_rows); for (_range_id, range) in ranges.ranges.iter() { assert!(range.num_rows() > 0); @@ -1446,8 +1459,9 @@ mod tests { .unwrap(); assert_eq!(3, ranges.ranges.len()); - assert_eq!(5, ranges.stats.num_rows); - assert_eq!(3, ranges.stats.num_ranges); + let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum(); + assert_eq!(5, total_rows); + assert_eq!(3, ranges.ranges.len()); for (range_id, range) in ranges.ranges.iter() { assert!(*range_id < 3); @@ -1524,7 +1538,8 @@ mod tests { // Should have ranges for both bulk parts and encoded parts assert_eq!(3, ranges.ranges.len()); - assert_eq!(10, ranges.stats.num_rows); + let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum(); + assert_eq!(10, total_rows); for (_range_id, range) in ranges.ranges.iter() { assert!(range.num_rows() > 0); @@ -1606,7 +1621,8 @@ mod tests { // Should have at least 1 range (the compacted part) assert!(!ranges.ranges.is_empty()); - assert_eq!(10, ranges.stats.num_rows); + let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum(); + assert_eq!(10, total_rows); // Read all data and verify let mut total_rows_read = 0; @@ -1693,7 +1709,8 @@ mod tests { ) .unwrap(); - assert_eq!(13, ranges.stats.num_rows); + let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum(); + assert_eq!(13, total_rows); let mut total_rows_read = 0; for (_range_id, range) in ranges.ranges.iter() { @@ -1750,7 +1767,8 @@ mod tests { // Should have 1 range for the unordered_part assert_eq!(1, ranges.ranges.len()); - assert_eq!(3, ranges.stats.num_rows); + let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum(); + assert_eq!(3, total_rows); // Verify data is sorted correctly in the range let range = ranges.ranges.get(&0).unwrap(); diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index e79d1d83b8..dc8414aaad 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -66,7 +66,7 @@ use crate::error::{ use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; use crate::memtable::time_series::{ValueBuilder, Values}; -use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics}; +use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats}; use crate::sst::index::IndexOutput; use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete}; use crate::sst::parquet::flat_format::primary_key_column_index; @@ -170,6 +170,22 @@ impl BulkPart { } } + /// Creates MemtableStats from this BulkPart. + pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats { + let ts_type = region_metadata.time_index_type(); + let min_ts = ts_type.create_timestamp(self.min_timestamp); + let max_ts = ts_type.create_timestamp(self.max_timestamp); + + MemtableStats { + estimated_bytes: self.estimated_size(), + time_range: Some((min_ts, max_ts)), + num_rows: self.num_rows(), + num_ranges: 1, + max_sequence: self.sequence, + series_count: self.estimated_series_count(), + } + } + /// Fills missing columns in the BulkPart batch with default values. /// /// This function checks if the batch schema matches the region metadata schema, @@ -965,6 +981,23 @@ impl EncodedBulkPart { &self.data } + /// Creates MemtableStats from this EncodedBulkPart. + pub fn to_memtable_stats(&self) -> MemtableStats { + let meta = &self.metadata; + let ts_type = meta.region_metadata.time_index_type(); + let min_ts = ts_type.create_timestamp(meta.min_timestamp); + let max_ts = ts_type.create_timestamp(meta.max_timestamp); + + MemtableStats { + estimated_bytes: self.size_bytes(), + time_range: Some((min_ts, max_ts)), + num_rows: meta.num_rows, + num_ranges: 1, + max_sequence: meta.max_sequence, + series_count: meta.num_series as usize, + } + } + /// Converts this `EncodedBulkPart` to `SstInfo`. /// /// # Arguments @@ -1061,6 +1094,8 @@ pub struct BulkPartMeta { pub region_metadata: RegionMetadataRef, /// Number of series. pub num_series: u64, + /// Maximum sequence number in part. + pub max_sequence: u64, } /// Metrics for encoding a part. @@ -1122,6 +1157,7 @@ impl BulkPartEncoder { arrow_schema: SchemaRef, min_timestamp: i64, max_timestamp: i64, + max_sequence: u64, metrics: &mut BulkPartEncodeMetrics, ) -> Result> { let mut buf = Vec::with_capacity(4096); @@ -1173,6 +1209,7 @@ impl BulkPartEncoder { parquet_metadata, region_metadata: self.metadata.clone(), num_series, + max_sequence, }, })) } @@ -1206,6 +1243,7 @@ impl BulkPartEncoder { parquet_metadata, region_metadata: self.metadata.clone(), num_series: part.estimated_series_count() as u64, + max_sequence: part.sequence, }, })) } diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 8ddd687053..d16dc6a6cc 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -203,10 +203,10 @@ impl Memtable for PartitionTreeMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); - let stats = self.stats(); + let range_stats = self.stats(); + let range = MemtableRange::new(context, range_stats); Ok(MemtableRanges { - ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(), - stats, + ranges: [(0, range)].into(), }) } diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 4c3f31c2b8..b9890767b4 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -243,6 +243,23 @@ impl Memtable for SimpleBulkMemtable { let sequence = options.sequence; let start_time = Instant::now(); let projection = Arc::new(self.build_projection(projection)); + + // Use the memtable's overall time range and max sequence for all ranges + let max_sequence = self.max_sequence.load(Ordering::Relaxed); + let time_range = { + let num_rows = self.num_rows.load(Ordering::Relaxed); + if num_rows > 0 { + let ts_type = self.region_metadata.time_index_type(); + let max_timestamp = + ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed)); + let min_timestamp = + ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed)); + Some((min_timestamp, max_timestamp)) + } else { + None + } + }; + let values = self.series.read().unwrap().read_to_values(); let contexts = values .into_par_iter() @@ -267,13 +284,24 @@ impl Memtable for SimpleBulkMemtable { .map(|result| { result.map(|batch| { let num_rows = batch.num_rows(); + let estimated_bytes = batch.memory_size(); + + let range_stats = MemtableStats { + estimated_bytes, + time_range, + num_rows, + num_ranges: 1, + max_sequence, + series_count: 1, + }; + let builder = BatchRangeBuilder { batch, merge_mode: self.merge_mode, scan_cost: start_time.elapsed(), }; ( - num_rows, + range_stats, Arc::new(MemtableRangeContext::new( self.id, Box::new(builder), @@ -287,13 +315,10 @@ impl Memtable for SimpleBulkMemtable { let ranges = contexts .into_iter() .enumerate() - .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows))) + .map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats))) .collect(); - Ok(MemtableRanges { - ranges, - stats: self.stats(), - }) + Ok(MemtableRanges { ranges }) } fn is_empty(&self) -> bool { @@ -319,14 +344,7 @@ impl Memtable for SimpleBulkMemtable { series_count: 0, }; } - let ts_type = self - .region_metadata - .time_index_column() - .column_schema - .data_type - .clone() - .as_timestamp() - .expect("Timestamp column must have timestamp type"); + let ts_type = self.region_metadata.time_index_type(); let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed)); let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed)); MemtableStats { diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 7401dd96b9..9ce795429c 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -325,10 +325,10 @@ impl Memtable for TimeSeriesMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); - let stats = self.stats(); + let range_stats = self.stats(); + let range = MemtableRange::new(context, range_stats); Ok(MemtableRanges { - ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(), - stats, + ranges: [(0, range)].into(), }) } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 8191dbcb7a..cbc6720515 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -458,10 +458,7 @@ impl ScanRegion { .with_pre_filter_mode(filter_mode), )?; mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| { - // todo: we should add stats to MemtableRange - let mut stats = ranges_in_memtable.stats.clone(); - stats.num_ranges = 1; - stats.num_rows = v.num_rows(); + let stats = v.stats().clone(); MemRangeBuilder::new(v, stats) })); }