diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 2060a81cdc..ec50020837 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -14,6 +14,7 @@ //! Memtable implementation for bulk load +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use store_api::metadata::RegionMetadataRef; @@ -21,10 +22,12 @@ use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; +use crate::flush::WriteBufferManagerRef; use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::{ - BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats, + AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, + MemtableStats, }; #[allow(unused)] @@ -38,6 +41,31 @@ mod row_group_reader; pub struct BulkMemtable { id: MemtableId, parts: RwLock>, + region_metadata: RegionMetadataRef, + alloc_tracker: AllocTracker, + max_timestamp: AtomicI64, + min_timestamp: AtomicI64, + max_sequence: AtomicU64, + num_rows: AtomicUsize, +} + +impl BulkMemtable { + pub fn new( + region_metadata: RegionMetadataRef, + id: MemtableId, + write_buffer_manager: Option, + ) -> Self { + Self { + id, + parts: RwLock::new(vec![]), + region_metadata, + alloc_tracker: AllocTracker::new(write_buffer_manager), + max_timestamp: AtomicI64::new(i64::MIN), + min_timestamp: AtomicI64::new(i64::MAX), + max_sequence: Default::default, + num_rows: Default::default(), + } + } } impl Memtable for BulkMemtable { @@ -86,13 +114,43 @@ impl Memtable for BulkMemtable { } fn stats(&self) -> MemtableStats { - todo!() + let estimated_bytes = self.alloc_tracker.bytes_allocated(); + + if estimated_bytes == 0 { + // no rows ever written + return MemtableStats { + estimated_bytes, + time_range: None, + num_rows: 0, + num_ranges: 0, + max_sequence: 0, + }; + } + + let ts_type = self + .region_metadata + .time_index_column() + .column_schema + .data_type + .clone() + .as_timestamp() + .expect("Timestamp column must have timestamp type"); + let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed)); + let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed)); + MemtableStats { + estimated_bytes, + time_range: Some((min_timestamp, max_timestamp)), + num_rows: self.num_rows.load(Ordering::Relaxed), + num_ranges: 1, //todo(hl): we should consider bulk parts as different ranges. + max_sequence: self.max_sequence.load(Ordering::Relaxed), + } } - fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(Self { + fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(Self::new( + metadata.clone(), id, - parts: RwLock::new(vec![]), - }) + self.alloc_tracker.write_buffer_manager(), + )) } }