diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 8dd0af2881..f359d24cc9 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -302,7 +302,10 @@ impl PartitionTreeMemtable { fn update_stats(&self, metrics: &WriteMetrics) { // Only let the tracker tracks value bytes. self.alloc_tracker.on_allocation(metrics.value_bytes); - metrics.update_timestamp_range(&self.max_timestamp, &self.min_timestamp); + self.max_timestamp + .fetch_max(metrics.max_ts, Ordering::SeqCst); + self.min_timestamp + .fetch_min(metrics.min_ts, Ordering::SeqCst); } } diff --git a/src/mito2/src/memtable/stats.rs b/src/mito2/src/memtable/stats.rs index cec31d6385..d203da3194 100644 --- a/src/mito2/src/memtable/stats.rs +++ b/src/mito2/src/memtable/stats.rs @@ -14,8 +14,6 @@ //! Internal metrics of the memtable. -use std::sync::atomic::{AtomicI64, Ordering}; - /// Metrics of writing memtables. pub(crate) struct WriteMetrics { /// Size allocated by keys. @@ -28,51 +26,6 @@ pub(crate) struct WriteMetrics { pub(crate) max_ts: i64, } -impl WriteMetrics { - /// Update the min/max timestamp range according to current write metric. - pub(crate) fn update_timestamp_range(&self, prev_max_ts: &AtomicI64, prev_min_ts: &AtomicI64) { - loop { - let current_min = prev_min_ts.load(Ordering::Relaxed); - if self.min_ts >= current_min { - break; - } - - let Err(updated) = prev_min_ts.compare_exchange( - current_min, - self.min_ts, - Ordering::Relaxed, - Ordering::Relaxed, - ) else { - break; - }; - - if updated == self.min_ts { - break; - } - } - - loop { - let current_max = prev_max_ts.load(Ordering::Relaxed); - if self.max_ts <= current_max { - break; - } - - let Err(updated) = prev_max_ts.compare_exchange( - current_max, - self.max_ts, - Ordering::Relaxed, - Ordering::Relaxed, - ) else { - break; - }; - - if updated == self.max_ts { - break; - } - } - } -} - impl Default for WriteMetrics { fn default() -> Self { Self { diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 44bce1ec74..751e0cdc5d 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -147,7 +147,8 @@ impl TimeSeriesMemtable { fn update_stats(&self, stats: WriteMetrics) { self.alloc_tracker .on_allocation(stats.key_bytes + stats.value_bytes); - stats.update_timestamp_range(&self.max_timestamp, &self.min_timestamp); + self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst); + self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst); } fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 8b9efb9ae8..311b8ba4bf 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -322,13 +322,10 @@ impl ScanRegion { let memtables: Vec<_> = memtables .into_iter() .filter(|mem| { - if mem.is_empty() { + // check if memtable is empty by reading stats. + let Some((start, end)) = mem.stats().time_range() else { return false; - } - let stats = mem.stats(); - // Safety: the memtable is not empty. - let (start, end) = stats.time_range().unwrap(); - + }; // The time range of the memtable is inclusive. let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end)); memtable_range.intersects(&time_range)