feat: impl stats

This commit is contained in:
Lei, HUANG
2025-02-06 06:38:44 +00:00
parent adb5c3743c
commit ae59206caf

View File

@@ -14,6 +14,7 @@
//! Memtable implementation for bulk load
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use store_api::metadata::RegionMetadataRef;
@@ -21,10 +22,12 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats,
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
MemtableStats,
};
#[allow(unused)]
@@ -38,6 +41,31 @@ mod row_group_reader;
pub struct BulkMemtable {
id: MemtableId,
parts: RwLock<Vec<BulkPart>>,
region_metadata: RegionMetadataRef,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
num_rows: AtomicUsize,
}
impl BulkMemtable {
pub fn new(
region_metadata: RegionMetadataRef,
id: MemtableId,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
Self {
id,
parts: RwLock::new(vec![]),
region_metadata,
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
max_sequence: Default::default,
num_rows: Default::default(),
}
}
}
impl Memtable for BulkMemtable {
@@ -86,13 +114,43 @@ impl Memtable for BulkMemtable {
}
fn stats(&self) -> MemtableStats {
todo!()
let estimated_bytes = self.alloc_tracker.bytes_allocated();
if estimated_bytes == 0 {
// no rows ever written
return MemtableStats {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
max_sequence: 0,
};
}
let ts_type = self
.region_metadata
.time_index_column()
.column_schema
.data_type
.clone()
.as_timestamp()
.expect("Timestamp column must have timestamp type");
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1, //todo(hl): we should consider bulk parts as different ranges.
max_sequence: self.max_sequence.load(Ordering::Relaxed),
}
}
fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(Self {
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(Self::new(
metadata.clone(),
id,
parts: RwLock::new(vec![]),
})
self.alloc_tracker.write_buffer_manager(),
))
}
}