diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 8990b57eaf..898da08790 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -48,7 +48,7 @@ impl Timestamp { /// The result time unit remains unchanged even if `duration` has a different unit with `self`. /// For example, a timestamp with value 1 and time unit second, subtracted by 1 millisecond /// and the result is still 1 second. - pub fn sub(&self, duration: Duration) -> error::Result { + pub fn sub_duration(&self, duration: Duration) -> error::Result { let duration: i64 = match self.unit { TimeUnit::Second => { i64::try_from(duration.as_secs()).context(TimestampOverflowSnafu)? @@ -79,6 +79,13 @@ impl Timestamp { }) } + /// Subtracts current timestamp with another timestamp, yielding a duration. + pub fn sub(&self, rhs: Self) -> Option { + let lhs = self.to_chrono_datetime()?; + let rhs = rhs.to_chrono_datetime()?; + Some(lhs - rhs) + } + pub fn new(value: i64, unit: TimeUnit) -> Self { Self { unit, value } } @@ -863,19 +870,19 @@ mod tests { #[test] fn test_timestamp_sub() { let res = Timestamp::new(1, TimeUnit::Second) - .sub(Duration::from_secs(1)) + .sub_duration(Duration::from_secs(1)) .unwrap(); assert_eq!(0, res.value); assert_eq!(TimeUnit::Second, res.unit); let res = Timestamp::new(0, TimeUnit::Second) - .sub(Duration::from_secs(1)) + .sub_duration(Duration::from_secs(1)) .unwrap(); assert_eq!(-1, res.value); assert_eq!(TimeUnit::Second, res.unit); let res = Timestamp::new(1, TimeUnit::Second) - .sub(Duration::from_millis(1)) + .sub_duration(Duration::from_millis(1)) .unwrap(); assert_eq!(1, res.value); assert_eq!(TimeUnit::Second, res.unit); @@ -914,4 +921,17 @@ mod tests { Timestamp::new(1, TimeUnit::Second).to_local_string() ); } + + #[test] + fn test_subtract_timestamp() { + assert_eq!( + Some(chrono::Duration::milliseconds(42)), + Timestamp::new_millisecond(100).sub(Timestamp::new_millisecond(58)) + ); + + assert_eq!( + Some(chrono::Duration::milliseconds(-42)), + Timestamp::new_millisecond(58).sub(Timestamp::new_millisecond(100)) + ); + } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index b6d34c8c57..51044c96f0 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -250,6 +250,15 @@ impl Value { Ok(scalar_value) } + + /// Casts Value to [Timestamp]. Returns None if it's not a valid timestamp value. + pub fn as_timestamp(&self) -> Option { + match self { + Value::Int64(v) => Some(Timestamp::new_millisecond(*v)), + Value::Timestamp(t) => Some(*t), + _ => None, + } + } } fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue { diff --git a/src/storage/benches/memtable/mod.rs b/src/storage/benches/memtable/mod.rs index a53f26cd4a..a9a956649d 100644 --- a/src/storage/benches/memtable/mod.rs +++ b/src/storage/benches/memtable/mod.rs @@ -78,10 +78,8 @@ fn kvs_with_index( key_builders.0.push(Some(TimestampMillisecond::from(key.0))); key_builders.1.push(Some(key.1)); } - let row_keys = vec![ - Arc::new(key_builders.0.finish()) as _, - Arc::new(key_builders.1.finish()) as _, - ]; + let row_keys = vec![Arc::new(key_builders.1.finish()) as _]; + let mut value_builders = ( UInt64VectorBuilder::with_capacity(values.len()), StringVectorBuilder::with_capacity(values.len()), @@ -100,6 +98,7 @@ fn kvs_with_index( start_index_in_batch, keys: row_keys, values: row_values, + timestamp: Some(Arc::new(key_builders.0.finish()) as _), } } diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 96f25eb3a8..3ed8841aef 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -82,7 +82,7 @@ impl SimplePicker { let Some(ttl) = ttl else { return Ok(vec![]); }; let expire_time = Timestamp::current_millis() - .sub(ttl) + .sub_duration(ttl) .context(TtlCalculationSnafu)?; let mut expired_ssts = vec![]; diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 6e86a1e375..650ab28ccb 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -140,8 +140,7 @@ mod tests { for key in ts { key_builders.push(Some(*key)); } - let row_keys = vec![Arc::new(key_builders.finish()) as _]; - + let ts_col = Arc::new(key_builders.finish()) as _; let mut value_builders = UInt64VectorBuilder::with_capacity(values.len()); for value in values { @@ -153,8 +152,9 @@ mod tests { sequence, op_type, start_index_in_batch, - keys: row_keys, + keys: vec![], values: row_values, + timestamp: Some(ts_col), }; assert_eq!(ts.len(), kvs.len()); diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index b6debb1f21..4ff3d638ef 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -18,7 +18,7 @@ mod inserter; pub mod tests; mod version; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; use datatypes::vectors::VectorRef; @@ -34,6 +34,35 @@ use crate::schema::{ProjectedSchemaRef, RegionSchemaRef}; /// Unique id for memtables under same region. pub type MemtableId = u32; +#[derive(Debug)] +pub struct MemtableStats { + /// The estimated bytes allocated by this memtable from heap. Result + /// of this method may be larger than the estimated based on [`num_rows`] because + /// of the implementor's pre-alloc behavior. + estimated_bytes: AtomicUsize, + /// The max timestamp that this memtable contains. + max_timestamp: AtomicI64, + /// The min timestamp that this memtable contains. + min_timestamp: AtomicI64, +} + +impl MemtableStats { + pub fn bytes_allocated(&self) -> usize { + self.estimated_bytes + .load(std::sync::atomic::Ordering::Relaxed) + } +} + +impl Default for MemtableStats { + fn default() -> Self { + Self { + estimated_bytes: AtomicUsize::default(), + max_timestamp: AtomicI64::new(i64::MIN), + min_timestamp: AtomicI64::new(i64::MAX), + } + } +} + /// In memory storage. pub trait Memtable: Send + Sync + std::fmt::Debug { /// Returns id of this memtable. @@ -54,10 +83,10 @@ pub trait Memtable: Send + Sync + std::fmt::Debug { /// Returns the estimated bytes allocated by this memtable from heap. Result /// of this method may be larger than the estimated based on [`num_rows`] because /// of the implementor's pre-alloc behavior. - fn bytes_allocated(&self) -> usize; - - /// Return the number of rows contained in this memtable. fn num_rows(&self) -> usize; + + /// Returns stats of this memtable. + fn stats(&self) -> &MemtableStats; } pub type MemtableRef = Arc; @@ -125,7 +154,6 @@ pub trait MemtableBuilder: Send + Sync + std::fmt::Debug { pub type MemtableBuilderRef = Arc; -// TODO(yingwen): Maybe use individual vector for timestamp and version. /// Key-value pairs in columnar format. pub struct KeyValues { pub sequence: SequenceNumber, @@ -135,6 +163,7 @@ pub struct KeyValues { pub start_index_in_batch: usize, pub keys: Vec, pub values: Vec, + pub timestamp: Option, } impl KeyValues { @@ -144,10 +173,11 @@ impl KeyValues { self.start_index_in_batch = index_in_batch; self.keys.clear(); self.values.clear(); + self.timestamp = None; } pub fn len(&self) -> usize { - self.keys.first().map(|v| v.len()).unwrap_or_default() + self.timestamp.as_ref().map(|v| v.len()).unwrap_or_default() } pub fn is_empty(&self) -> bool { @@ -157,6 +187,11 @@ impl KeyValues { pub fn estimated_memory_size(&self) -> usize { self.keys.iter().fold(0, |acc, v| acc + v.memory_size()) + self.values.iter().fold(0, |acc, v| acc + v.memory_size()) + + self + .timestamp + .as_ref() + .map(|t| t.memory_size()) + .unwrap_or_default() } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 2b73896c7f..98f4d1c3d1 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -16,7 +16,7 @@ use std::cmp::Ordering; use std::collections::{btree_map, BTreeMap}; use std::fmt; use std::ops::Bound; -use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, RwLock}; use datatypes::data_type::DataType; @@ -27,7 +27,8 @@ use store_api::storage::{OpType, SequenceNumber}; use crate::error::Result; use crate::memtable::{ - BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering, + BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, MemtableStats, + RowOrdering, }; use crate::read::Batch; use crate::schema::compat::ReadAdapter; @@ -42,7 +43,7 @@ pub struct BTreeMemtable { id: MemtableId, schema: RegionSchemaRef, map: Arc, - estimated_bytes: AtomicUsize, + stats: MemtableStats, } impl BTreeMemtable { @@ -51,7 +52,37 @@ impl BTreeMemtable { id, schema, map: Arc::new(RwLock::new(BTreeMap::new())), - estimated_bytes: AtomicUsize::new(0), + stats: Default::default(), + } + } + + /// Updates memtable stats. + /// This function is guarded by `BTreeMemtable::map` so that store-after-load is safe. + fn update_stats(&self, min: Option, max: Option) { + if let Some(min) = min { + let min_val = min + .as_timestamp() + .expect("Min timestamp must be a valid timestamp value") + .value(); + let cur_min = self.stats.min_timestamp.load(AtomicOrdering::Relaxed); + if min_val < cur_min { + self.stats + .min_timestamp + .store(min_val, AtomicOrdering::Relaxed); + } + } + + if let Some(max) = max { + let cur_max = self.stats.max_timestamp.load(AtomicOrdering::Relaxed); + let max_val = max + .as_timestamp() + .expect("Max timestamp must be a valid timestamp value") + .value(); + if max_val > cur_max { + self.stats + .max_timestamp + .store(max_val, AtomicOrdering::Relaxed); + } } } } @@ -65,7 +96,7 @@ impl fmt::Debug for BTreeMemtable { // Only show StoreSchema .field("schema", &self.schema) .field("rows", &len) - .field("estimated_bytes", &self.estimated_bytes) + .field("stats", &self.stats) .finish() } } @@ -80,15 +111,30 @@ impl Memtable for BTreeMemtable { } fn write(&self, kvs: &KeyValues) -> Result<()> { - self.estimated_bytes + debug_assert!(kvs.timestamp.is_some()); + self.stats + .estimated_bytes .fetch_add(kvs.estimated_memory_size(), AtomicOrdering::Relaxed); - let mut map = self.map.write().unwrap(); let iter_row = IterRow::new(kvs); + let mut map = self.map.write().unwrap(); + + let mut min_ts = None; + let mut max_ts = None; for (inner_key, row_value) in iter_row { + let ts = inner_key.timestamp(); + let min_ts = min_ts.get_or_insert_with(|| ts.clone()); + let max_ts = max_ts.get_or_insert_with(|| ts.clone()); + if ts < min_ts { + *min_ts = ts.clone(); + } + if ts > max_ts { + *max_ts = ts.clone(); + } map.insert(inner_key, row_value); } + self.update_stats(min_ts, max_ts); Ok(()) } @@ -100,13 +146,13 @@ impl Memtable for BTreeMemtable { Ok(Box::new(iter)) } - fn bytes_allocated(&self) -> usize { - self.estimated_bytes.load(AtomicOrdering::Relaxed) - } - fn num_rows(&self) -> usize { self.map.read().unwrap().len() } + + fn stats(&self) -> &MemtableStats { + &self.stats + } } struct BTreeIterator { @@ -309,12 +355,15 @@ impl<'a> IterRow<'a> { } fn fetch_row(&mut self) -> (InnerKey, RowValue) { - let row_key = self + let mut row_key: Vec<_> = self .kvs .keys .iter() .map(|vector| vector.get(self.index)) .collect(); + + // unwrap safety: KeyValues always contains a timestamp as guaranteed in [Inserter::write_one_mutation] + row_key.push(self.kvs.timestamp.as_ref().unwrap().get(self.index)); let inner_key = InnerKey { row_key, sequence: self.kvs.sequence, @@ -382,6 +431,12 @@ impl PartialOrd for InnerKey { } impl InnerKey { + #[inline] + fn timestamp(&self) -> &Value { + // safety: row key shall at least contain a timestamp column + self.row_key.last().unwrap() + } + #[inline] fn is_row_key_equal(&self, other: &InnerKey) -> bool { self.row_key == other.row_key diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 0fed7fba18..17c663cd2a 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -56,6 +56,7 @@ impl Inserter { start_index_in_batch: self.index_in_batch, keys: Vec::with_capacity(total_column_num), values: Vec::with_capacity(total_column_num), + timestamp: None, }; for mutation in &payload.mutations { @@ -76,10 +77,11 @@ impl Inserter { kvs.reset(mutation.op_type, self.index_in_batch); - for key_idx in schema.row_key_indices() { + let ts_idx = schema.timestamp_index(); + kvs.timestamp = Some(mutation.record_batch.column(ts_idx).clone()); + for key_idx in 0..ts_idx { kvs.keys.push(mutation.record_batch.column(key_idx).clone()); } - for value_idx in schema.value_indices() { kvs.values .push(mutation.record_batch.column(value_idx).clone()); @@ -120,6 +122,7 @@ struct SliceIndex { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::atomic::Ordering; use std::sync::Arc; use common_time::timestamp::Timestamp; @@ -171,8 +174,12 @@ mod tests { mem: &MemtableRef, sequence: SequenceNumber, data: &[(i64, Option)], + max_ts: i64, + min_ts: i64, ) { let iter = mem.iter(&IterContext::default()).unwrap(); + assert_eq!(min_ts, mem.stats().min_timestamp.load(Ordering::Relaxed)); + assert_eq!(max_ts, mem.stats().max_timestamp.load(Ordering::Relaxed)); let mut index = 0; for batch in iter { @@ -233,6 +240,8 @@ mod tests { (102, None), (201, Some(201)), ], + 201, + 1, ); } } diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 9abcb7623a..f20e1fe8e6 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -52,7 +52,7 @@ fn kvs_for_test_with_index( for key in keys { key_builders.push(Some(*key)); } - let row_keys = vec![Arc::new(key_builders.finish()) as _]; + let ts_col = Arc::new(key_builders.finish()) as _; let mut value_builders = ( UInt64VectorBuilder::with_capacity(values.len()), @@ -71,8 +71,9 @@ fn kvs_for_test_with_index( sequence, op_type, start_index_in_batch, - keys: row_keys, + keys: vec![], values: row_values, + timestamp: Some(ts_col), }; assert_eq!(keys.len(), kvs.len()); @@ -198,7 +199,7 @@ fn write_iter_memtable_case(ctx: &TestContext) { assert!(iter.next().is_none()); // Poll the empty iterator again. assert!(iter.next().is_none()); - assert_eq!(0, ctx.memtable.bytes_allocated()); + assert_eq!(0, ctx.memtable.stats().bytes_allocated()); // Init test data. write_kvs( @@ -224,7 +225,7 @@ fn write_iter_memtable_case(ctx: &TestContext) { ); // 9 key value pairs (6 + 3). - assert_eq!(576, ctx.memtable.bytes_allocated()); + assert_eq!(576, ctx.memtable.stats().bytes_allocated()); let batch_sizes = [1, 4, 8, consts::READ_BATCH_SIZE]; for batch_size in batch_sizes { diff --git a/src/storage/src/memtable/version.rs b/src/storage/src/memtable/version.rs index 895566f5e1..ef66f758e6 100644 --- a/src/storage/src/memtable/version.rs +++ b/src/storage/src/memtable/version.rs @@ -64,15 +64,15 @@ impl MemtableVersion { } pub fn mutable_bytes_allocated(&self) -> usize { - self.mutable.bytes_allocated() + self.mutable.stats().bytes_allocated() } pub fn total_bytes_allocated(&self) -> usize { self.immutables .iter() - .map(|m| m.bytes_allocated()) + .map(|m| m.stats().bytes_allocated()) .sum::() - + self.mutable.bytes_allocated() + + self.mutable.stats().bytes_allocated() } /// Creates a new `MemtableVersion` that removes immutable memtables diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs index a9f9481e8d..876b0b035d 100644 --- a/src/storage/src/schema/region.rs +++ b/src/storage/src/schema/region.rs @@ -130,6 +130,11 @@ impl RegionSchema { self.store_schema.row_key_indices() } + #[inline] + pub(crate) fn timestamp_index(&self) -> usize { + self.store_schema.timestamp_index() + } + #[inline] pub(crate) fn value_indices(&self) -> impl Iterator { self.store_schema.value_indices() diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index b93fdc8ec3..8b2575b5cd 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -155,6 +155,11 @@ impl StoreSchema { self.user_column_end + 1 } + #[inline] + pub(crate) fn timestamp_index(&self) -> usize { + self.row_key_end - 1 + } + #[inline] pub(crate) fn row_key_indices(&self) -> impl Iterator { 0..self.row_key_end