feat: Implement per range stats for bulk memtable (#7486)

* feat: implement per range stats for MemtableRange

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: extract methods to MemtableRanges

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: simple bulk memtable set other fields in stats

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use time_index_type()

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use time index type

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-12-26 15:24:11 +08:00
committed by GitHub
parent 518a4e013b
commit 89b9469250
8 changed files with 195 additions and 79 deletions

View File

@@ -26,7 +26,7 @@ use either::Either;
use partition::expr::PartitionExpr;
use smallvec::{SmallVec, smallvec};
use snafu::ResultExt;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, SequenceNumber};
use strum::IntoStaticStr;
use tokio::sync::{Semaphore, mpsc, watch};
@@ -464,24 +464,26 @@ impl RegionFlushTask {
// Sets `for_flush` flag to true.
let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
let num_mem_ranges = mem_ranges.ranges.len();
let num_mem_rows = mem_ranges.stats.num_rows();
// Aggregate stats from all ranges
let num_mem_rows = mem_ranges.num_rows();
let memtable_series_count = mem_ranges.series_count();
let memtable_id = mem.id();
// Increases series count for each mem range. We consider each mem range has different series so
// the counter may have more series than the actual series count.
series_count += mem_ranges.stats.series_count();
series_count += memtable_series_count;
if mem_ranges.is_record_batch() {
let flush_start = Instant::now();
let FlushFlatMemResult {
num_encoded,
max_sequence,
num_sources,
results,
} = self
.flush_flat_mem_ranges(version, &write_opts, mem_ranges)
.await?;
for (source_idx, result) in results.into_iter().enumerate() {
let (ssts_written, metrics) = result?;
let (max_sequence, ssts_written, metrics) = result?;
if ssts_written.is_empty() {
// No data written.
continue;
@@ -521,7 +523,7 @@ impl RegionFlushTask {
compact_cost,
);
} else {
let max_sequence = mem_ranges.stats.max_sequence();
let max_sequence = mem_ranges.max_sequence();
let source = memtable_source(mem_ranges, &version.options).await?;
// Flush to level 0.
@@ -583,8 +585,7 @@ impl RegionFlushTask {
)?;
let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
let num_encoded = flat_sources.encoded.len();
let max_sequence = flat_sources.max_sequence;
for source in flat_sources.sources {
for (source, max_sequence) in flat_sources.sources {
let source = Either::Right(source);
let write_request = self.new_write_request(version, max_sequence, source);
let access_layer = self.access_layer.clone();
@@ -596,11 +597,11 @@ impl RegionFlushTask {
let ssts = access_layer
.write_sst(write_request, &write_opts, &mut metrics)
.await?;
Ok((ssts, metrics))
Ok((max_sequence, ssts, metrics))
});
tasks.push(task);
}
for encoded in flat_sources.encoded {
for (encoded, max_sequence) in flat_sources.encoded {
let access_layer = self.access_layer.clone();
let cache_manager = self.cache_manager.clone();
let region_id = version.metadata.region_id;
@@ -610,7 +611,7 @@ impl RegionFlushTask {
let metrics = access_layer
.put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
.await?;
Ok((smallvec![encoded.sst_info], metrics))
Ok((max_sequence, smallvec![encoded.sst_info], metrics))
});
tasks.push(task);
}
@@ -620,7 +621,6 @@ impl RegionFlushTask {
.context(JoinSnafu)?;
Ok(FlushFlatMemResult {
num_encoded,
max_sequence,
num_sources,
results,
})
@@ -696,9 +696,8 @@ impl RegionFlushTask {
struct FlushFlatMemResult {
num_encoded: usize,
max_sequence: u64,
num_sources: usize,
results: Vec<Result<(SstInfoArray, Metrics)>>,
results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
}
struct DoFlushMemtablesResult {
@@ -744,9 +743,8 @@ async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) ->
}
struct FlatSources {
max_sequence: u64,
sources: SmallVec<[FlatSource; 4]>,
encoded: SmallVec<[EncodedRange; 4]>,
sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
}
/// Returns the max sequence and [FlatSource] for the given memtable.
@@ -756,18 +754,17 @@ fn memtable_flat_sources(
options: &RegionOptions,
field_column_start: usize,
) -> Result<FlatSources> {
let MemtableRanges { ranges, stats } = mem_ranges;
let max_sequence = stats.max_sequence();
let MemtableRanges { ranges } = mem_ranges;
let mut flat_sources = FlatSources {
max_sequence,
sources: SmallVec::new(),
encoded: SmallVec::new(),
};
if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();
let max_sequence = only_range.stats().max_sequence();
if let Some(encoded) = only_range.encoded() {
flat_sources.encoded.push(encoded);
flat_sources.encoded.push((encoded, max_sequence));
} else {
let iter = only_range.build_record_batch_iter(None)?;
// Dedup according to append mode and merge mode.
@@ -778,25 +775,39 @@ fn memtable_flat_sources(
field_column_start,
iter,
);
flat_sources.sources.push(FlatSource::Iter(iter));
flat_sources
.sources
.push((FlatSource::Iter(iter), max_sequence));
};
} else {
let min_flush_rows = stats.num_rows / 8;
// Calculate total rows from all ranges for min_flush_rows calculation
let total_rows: usize = ranges.values().map(|r| r.stats().num_rows()).sum();
let min_flush_rows = total_rows / 8;
let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
let mut last_iter_rows = 0;
let num_ranges = ranges.len();
let mut input_iters = Vec::with_capacity(num_ranges);
let mut current_ranges = Vec::new();
for (_range_id, range) in ranges {
if let Some(encoded) = range.encoded() {
flat_sources.encoded.push(encoded);
let max_sequence = range.stats().max_sequence();
flat_sources.encoded.push((encoded, max_sequence));
continue;
}
let iter = range.build_record_batch_iter(None)?;
input_iters.push(iter);
last_iter_rows += range.num_rows();
current_ranges.push(range);
if last_iter_rows > min_flush_rows {
// Calculate max_sequence from all merged ranges
let max_sequence = current_ranges
.iter()
.map(|r| r.stats().max_sequence())
.max()
.unwrap_or(0);
let maybe_dedup = merge_and_dedup(
&schema,
options.append_mode,
@@ -805,13 +816,22 @@ fn memtable_flat_sources(
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
)?;
flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
flat_sources
.sources
.push((FlatSource::Iter(maybe_dedup), max_sequence));
last_iter_rows = 0;
current_ranges.clear();
}
}
// Handle remaining iters.
if !input_iters.is_empty() {
let max_sequence = current_ranges
.iter()
.map(|r| r.stats().max_sequence())
.max()
.unwrap_or(0);
let maybe_dedup = merge_and_dedup(
&schema,
options.append_mode,
@@ -820,7 +840,9 @@ fn memtable_flat_sources(
input_iters,
)?;
flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
flat_sources
.sources
.push((FlatSource::Iter(maybe_dedup), max_sequence));
}
}
@@ -1491,7 +1513,7 @@ mod tests {
// Consume the iterator and count rows
let mut total_rows = 0usize;
for source in flat_sources.sources {
for (source, _sequence) in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {
@@ -1521,7 +1543,7 @@ mod tests {
assert_eq!(1, flat_sources.sources.len());
let mut total_rows = 0usize;
for source in flat_sources.sources {
for (source, _sequence) in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {

View File

@@ -204,8 +204,27 @@ pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>>
pub struct MemtableRanges {
/// Range IDs and ranges.
pub ranges: BTreeMap<usize, MemtableRange>,
/// Statistics of the memtable at the query time.
pub stats: MemtableStats,
}
impl MemtableRanges {
/// Returns the total number of rows across all ranges.
pub fn num_rows(&self) -> usize {
self.ranges.values().map(|r| r.stats().num_rows()).sum()
}
/// Returns the total series count across all ranges.
pub fn series_count(&self) -> usize {
self.ranges.values().map(|r| r.stats().series_count()).sum()
}
/// Returns the maximum sequence number across all ranges.
pub fn max_sequence(&self) -> SequenceNumber {
self.ranges
.values()
.map(|r| r.stats().max_sequence())
.max()
.unwrap_or(0)
}
}
impl IterBuilder for MemtableRanges {
@@ -569,15 +588,19 @@ impl MemtableRangeContext {
pub struct MemtableRange {
/// Shared context.
context: MemtableRangeContextRef,
/// Number of rows in current memtable range.
// todo(hl): use [MemtableRangeStats] instead.
num_rows: usize,
/// Statistics for this memtable range.
stats: MemtableStats,
}
impl MemtableRange {
/// Creates a new range from context.
pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
Self { context, num_rows }
/// Creates a new range from context and stats.
pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self {
Self { context, stats }
}
/// Returns the statistics for this range.
pub fn stats(&self) -> &MemtableStats {
&self.stats
}
/// Returns the id of the memtable to read.
@@ -624,7 +647,7 @@ impl MemtableRange {
}
pub fn num_rows(&self) -> usize {
self.num_rows
self.stats.num_rows
}
/// Returns the encoded range if available.

View File

@@ -382,7 +382,7 @@ impl Memtable for BulkMemtable {
if !bulk_parts.unordered_part.is_empty()
&& let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
{
let num_rows = unordered_bulk_part.num_rows();
let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
let range = MemtableRange::new(
Arc::new(MemtableRangeContext::new(
self.id,
@@ -393,7 +393,7 @@ impl Memtable for BulkMemtable {
}),
predicate.clone(),
)),
num_rows,
part_stats,
);
ranges.insert(range_id, range);
range_id += 1;
@@ -406,6 +406,7 @@ impl Memtable for BulkMemtable {
continue;
}
let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
let range = MemtableRange::new(
Arc::new(MemtableRangeContext::new(
self.id,
@@ -416,7 +417,7 @@ impl Memtable for BulkMemtable {
}),
predicate.clone(),
)),
part_wrapper.part.num_rows(),
part_stats,
);
ranges.insert(range_id, range);
range_id += 1;
@@ -429,6 +430,7 @@ impl Memtable for BulkMemtable {
continue;
}
let part_stats = encoded_part_wrapper.part.to_memtable_stats();
let range = MemtableRange::new(
Arc::new(MemtableRangeContext::new(
self.id,
@@ -440,18 +442,14 @@ impl Memtable for BulkMemtable {
}),
predicate.clone(),
)),
encoded_part_wrapper.part.metadata().num_rows,
part_stats,
);
ranges.insert(range_id, range);
range_id += 1;
}
}
let mut stats = self.stats();
stats.num_ranges = ranges.len();
// TODO(yingwen): Supports per range stats.
Ok(MemtableRanges { ranges, stats })
Ok(MemtableRanges { ranges })
}
fn is_empty(&self) -> bool {
@@ -811,6 +809,14 @@ impl PartToMerge {
}
}
/// Gets the maximum sequence number of this part.
fn max_sequence(&self) -> u64 {
match self {
PartToMerge::Bulk { part, .. } => part.sequence,
PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
}
}
/// Creates a record batch iterator for this part.
fn create_iterator(
self,
@@ -984,7 +990,7 @@ impl MemtableCompactor {
return Ok(None);
}
// Calculates timestamp bounds for merged data
// Calculates timestamp bounds and max sequence for merged data
let min_timestamp = parts_to_merge
.iter()
.map(|p| p.min_timestamp())
@@ -995,6 +1001,11 @@ impl MemtableCompactor {
.map(|p| p.max_timestamp())
.max()
.unwrap_or(i64::MIN);
let max_sequence = parts_to_merge
.iter()
.map(|p| p.max_sequence())
.max()
.unwrap_or(0);
let context = Arc::new(BulkIterContext::new(
metadata.clone(),
@@ -1051,6 +1062,7 @@ impl MemtableCompactor {
arrow_schema.clone(),
min_timestamp,
max_timestamp,
max_sequence,
&mut metrics,
)?;
@@ -1278,7 +1290,8 @@ mod tests {
.unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
assert_eq!(5, total_rows);
for (_range_id, range) in ranges.ranges.iter() {
assert!(range.num_rows() > 0);
@@ -1446,8 +1459,9 @@ mod tests {
.unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
assert_eq!(3, ranges.stats.num_ranges);
let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
assert_eq!(5, total_rows);
assert_eq!(3, ranges.ranges.len());
for (range_id, range) in ranges.ranges.iter() {
assert!(*range_id < 3);
@@ -1524,7 +1538,8 @@ mod tests {
// Should have ranges for both bulk parts and encoded parts
assert_eq!(3, ranges.ranges.len());
assert_eq!(10, ranges.stats.num_rows);
let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
assert_eq!(10, total_rows);
for (_range_id, range) in ranges.ranges.iter() {
assert!(range.num_rows() > 0);
@@ -1606,7 +1621,8 @@ mod tests {
// Should have at least 1 range (the compacted part)
assert!(!ranges.ranges.is_empty());
assert_eq!(10, ranges.stats.num_rows);
let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
assert_eq!(10, total_rows);
// Read all data and verify
let mut total_rows_read = 0;
@@ -1693,7 +1709,8 @@ mod tests {
)
.unwrap();
assert_eq!(13, ranges.stats.num_rows);
let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
assert_eq!(13, total_rows);
let mut total_rows_read = 0;
for (_range_id, range) in ranges.ranges.iter() {
@@ -1750,7 +1767,8 @@ mod tests {
// Should have 1 range for the unordered_part
assert_eq!(1, ranges.ranges.len());
assert_eq!(3, ranges.stats.num_rows);
let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
assert_eq!(3, total_rows);
// Verify data is sorted correctly in the range
let range = ranges.ranges.get(&0).unwrap();

View File

@@ -66,7 +66,7 @@ use crate::error::{
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
use crate::memtable::time_series::{ValueBuilder, Values};
use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
use crate::sst::index::IndexOutput;
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
use crate::sst::parquet::flat_format::primary_key_column_index;
@@ -170,6 +170,22 @@ impl BulkPart {
}
}
/// Creates MemtableStats from this BulkPart.
pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
let ts_type = region_metadata.time_index_type();
let min_ts = ts_type.create_timestamp(self.min_timestamp);
let max_ts = ts_type.create_timestamp(self.max_timestamp);
MemtableStats {
estimated_bytes: self.estimated_size(),
time_range: Some((min_ts, max_ts)),
num_rows: self.num_rows(),
num_ranges: 1,
max_sequence: self.sequence,
series_count: self.estimated_series_count(),
}
}
/// Fills missing columns in the BulkPart batch with default values.
///
/// This function checks if the batch schema matches the region metadata schema,
@@ -965,6 +981,23 @@ impl EncodedBulkPart {
&self.data
}
/// Creates MemtableStats from this EncodedBulkPart.
pub fn to_memtable_stats(&self) -> MemtableStats {
let meta = &self.metadata;
let ts_type = meta.region_metadata.time_index_type();
let min_ts = ts_type.create_timestamp(meta.min_timestamp);
let max_ts = ts_type.create_timestamp(meta.max_timestamp);
MemtableStats {
estimated_bytes: self.size_bytes(),
time_range: Some((min_ts, max_ts)),
num_rows: meta.num_rows,
num_ranges: 1,
max_sequence: meta.max_sequence,
series_count: meta.num_series as usize,
}
}
/// Converts this `EncodedBulkPart` to `SstInfo`.
///
/// # Arguments
@@ -1061,6 +1094,8 @@ pub struct BulkPartMeta {
pub region_metadata: RegionMetadataRef,
/// Number of series.
pub num_series: u64,
/// Maximum sequence number in part.
pub max_sequence: u64,
}
/// Metrics for encoding a part.
@@ -1122,6 +1157,7 @@ impl BulkPartEncoder {
arrow_schema: SchemaRef,
min_timestamp: i64,
max_timestamp: i64,
max_sequence: u64,
metrics: &mut BulkPartEncodeMetrics,
) -> Result<Option<EncodedBulkPart>> {
let mut buf = Vec::with_capacity(4096);
@@ -1173,6 +1209,7 @@ impl BulkPartEncoder {
parquet_metadata,
region_metadata: self.metadata.clone(),
num_series,
max_sequence,
},
}))
}
@@ -1206,6 +1243,7 @@ impl BulkPartEncoder {
parquet_metadata,
region_metadata: self.metadata.clone(),
num_series: part.estimated_series_count() as u64,
max_sequence: part.sequence,
},
}))
}

View File

@@ -203,10 +203,10 @@ impl Memtable for PartitionTreeMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
let stats = self.stats();
let range_stats = self.stats();
let range = MemtableRange::new(context, range_stats);
Ok(MemtableRanges {
ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
stats,
ranges: [(0, range)].into(),
})
}

View File

@@ -243,6 +243,23 @@ impl Memtable for SimpleBulkMemtable {
let sequence = options.sequence;
let start_time = Instant::now();
let projection = Arc::new(self.build_projection(projection));
// Use the memtable's overall time range and max sequence for all ranges
let max_sequence = self.max_sequence.load(Ordering::Relaxed);
let time_range = {
let num_rows = self.num_rows.load(Ordering::Relaxed);
if num_rows > 0 {
let ts_type = self.region_metadata.time_index_type();
let max_timestamp =
ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
let min_timestamp =
ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
Some((min_timestamp, max_timestamp))
} else {
None
}
};
let values = self.series.read().unwrap().read_to_values();
let contexts = values
.into_par_iter()
@@ -267,13 +284,24 @@ impl Memtable for SimpleBulkMemtable {
.map(|result| {
result.map(|batch| {
let num_rows = batch.num_rows();
let estimated_bytes = batch.memory_size();
let range_stats = MemtableStats {
estimated_bytes,
time_range,
num_rows,
num_ranges: 1,
max_sequence,
series_count: 1,
};
let builder = BatchRangeBuilder {
batch,
merge_mode: self.merge_mode,
scan_cost: start_time.elapsed(),
};
(
num_rows,
range_stats,
Arc::new(MemtableRangeContext::new(
self.id,
Box::new(builder),
@@ -287,13 +315,10 @@ impl Memtable for SimpleBulkMemtable {
let ranges = contexts
.into_iter()
.enumerate()
.map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
.map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats)))
.collect();
Ok(MemtableRanges {
ranges,
stats: self.stats(),
})
Ok(MemtableRanges { ranges })
}
fn is_empty(&self) -> bool {
@@ -319,14 +344,7 @@ impl Memtable for SimpleBulkMemtable {
series_count: 0,
};
}
let ts_type = self
.region_metadata
.time_index_column()
.column_schema
.data_type
.clone()
.as_timestamp()
.expect("Timestamp column must have timestamp type");
let ts_type = self.region_metadata.time_index_type();
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {

View File

@@ -325,10 +325,10 @@ impl Memtable for TimeSeriesMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
let stats = self.stats();
let range_stats = self.stats();
let range = MemtableRange::new(context, range_stats);
Ok(MemtableRanges {
ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
stats,
ranges: [(0, range)].into(),
})
}

View File

@@ -458,10 +458,7 @@ impl ScanRegion {
.with_pre_filter_mode(filter_mode),
)?;
mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
// todo: we should add stats to MemtableRange
let mut stats = ranges_in_memtable.stats.clone();
stats.num_ranges = 1;
stats.num_rows = v.num_rows();
let stats = v.stats().clone();
MemRangeBuilder::new(v, stats)
}));
}