feat: memtable stats (#1591)

* feat: memtable stats

* chore: add tests for timestamp subtraction

* feat: add `Value:as_timestamp` method
This commit is contained in:
Lei, HUANG
2023-05-17 11:07:07 +08:00
committed by GitHub
parent ca75a7b744
commit e70d49b9cf
12 changed files with 177 additions and 39 deletions

View File

@@ -48,7 +48,7 @@ impl Timestamp {
/// The result time unit remains unchanged even if `duration` has a different unit with `self`.
/// For example, a timestamp with value 1 and time unit second, subtracted by 1 millisecond
/// and the result is still 1 second.
pub fn sub(&self, duration: Duration) -> error::Result<Self> {
pub fn sub_duration(&self, duration: Duration) -> error::Result<Self> {
let duration: i64 = match self.unit {
TimeUnit::Second => {
i64::try_from(duration.as_secs()).context(TimestampOverflowSnafu)?
@@ -79,6 +79,13 @@ impl Timestamp {
})
}
/// Subtracts current timestamp with another timestamp, yielding a duration.
pub fn sub(&self, rhs: Self) -> Option<chrono::Duration> {
let lhs = self.to_chrono_datetime()?;
let rhs = rhs.to_chrono_datetime()?;
Some(lhs - rhs)
}
pub fn new(value: i64, unit: TimeUnit) -> Self {
Self { unit, value }
}
@@ -863,19 +870,19 @@ mod tests {
#[test]
fn test_timestamp_sub() {
let res = Timestamp::new(1, TimeUnit::Second)
.sub(Duration::from_secs(1))
.sub_duration(Duration::from_secs(1))
.unwrap();
assert_eq!(0, res.value);
assert_eq!(TimeUnit::Second, res.unit);
let res = Timestamp::new(0, TimeUnit::Second)
.sub(Duration::from_secs(1))
.sub_duration(Duration::from_secs(1))
.unwrap();
assert_eq!(-1, res.value);
assert_eq!(TimeUnit::Second, res.unit);
let res = Timestamp::new(1, TimeUnit::Second)
.sub(Duration::from_millis(1))
.sub_duration(Duration::from_millis(1))
.unwrap();
assert_eq!(1, res.value);
assert_eq!(TimeUnit::Second, res.unit);
@@ -914,4 +921,17 @@ mod tests {
Timestamp::new(1, TimeUnit::Second).to_local_string()
);
}
#[test]
fn test_subtract_timestamp() {
assert_eq!(
Some(chrono::Duration::milliseconds(42)),
Timestamp::new_millisecond(100).sub(Timestamp::new_millisecond(58))
);
assert_eq!(
Some(chrono::Duration::milliseconds(-42)),
Timestamp::new_millisecond(58).sub(Timestamp::new_millisecond(100))
);
}
}

View File

@@ -250,6 +250,15 @@ impl Value {
Ok(scalar_value)
}
/// Casts Value to [Timestamp]. Returns None if it's not a valid timestamp value.
pub fn as_timestamp(&self) -> Option<Timestamp> {
match self {
Value::Int64(v) => Some(Timestamp::new_millisecond(*v)),
Value::Timestamp(t) => Some(*t),
_ => None,
}
}
}
fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue {

View File

@@ -78,10 +78,8 @@ fn kvs_with_index(
key_builders.0.push(Some(TimestampMillisecond::from(key.0)));
key_builders.1.push(Some(key.1));
}
let row_keys = vec![
Arc::new(key_builders.0.finish()) as _,
Arc::new(key_builders.1.finish()) as _,
];
let row_keys = vec![Arc::new(key_builders.1.finish()) as _];
let mut value_builders = (
UInt64VectorBuilder::with_capacity(values.len()),
StringVectorBuilder::with_capacity(values.len()),
@@ -100,6 +98,7 @@ fn kvs_with_index(
start_index_in_batch,
keys: row_keys,
values: row_values,
timestamp: Some(Arc::new(key_builders.0.finish()) as _),
}
}

View File

@@ -82,7 +82,7 @@ impl<S> SimplePicker<S> {
let Some(ttl) = ttl else { return Ok(vec![]); };
let expire_time = Timestamp::current_millis()
.sub(ttl)
.sub_duration(ttl)
.context(TtlCalculationSnafu)?;
let mut expired_ssts = vec![];

View File

@@ -140,8 +140,7 @@ mod tests {
for key in ts {
key_builders.push(Some(*key));
}
let row_keys = vec![Arc::new(key_builders.finish()) as _];
let ts_col = Arc::new(key_builders.finish()) as _;
let mut value_builders = UInt64VectorBuilder::with_capacity(values.len());
for value in values {
@@ -153,8 +152,9 @@ mod tests {
sequence,
op_type,
start_index_in_batch,
keys: row_keys,
keys: vec![],
values: row_values,
timestamp: Some(ts_col),
};
assert_eq!(ts.len(), kvs.len());

View File

@@ -18,7 +18,7 @@ mod inserter;
pub mod tests;
mod version;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use datatypes::vectors::VectorRef;
@@ -34,6 +34,35 @@ use crate::schema::{ProjectedSchemaRef, RegionSchemaRef};
/// Unique id for memtables under same region.
pub type MemtableId = u32;
#[derive(Debug)]
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap. Result
/// of this method may be larger than the estimated based on [`num_rows`] because
/// of the implementor's pre-alloc behavior.
estimated_bytes: AtomicUsize,
/// The max timestamp that this memtable contains.
max_timestamp: AtomicI64,
/// The min timestamp that this memtable contains.
min_timestamp: AtomicI64,
}
impl MemtableStats {
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes
.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Default for MemtableStats {
fn default() -> Self {
Self {
estimated_bytes: AtomicUsize::default(),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
}
}
}
/// In memory storage.
pub trait Memtable: Send + Sync + std::fmt::Debug {
/// Returns id of this memtable.
@@ -54,10 +83,10 @@ pub trait Memtable: Send + Sync + std::fmt::Debug {
/// Returns the estimated bytes allocated by this memtable from heap. Result
/// of this method may be larger than the estimated based on [`num_rows`] because
/// of the implementor's pre-alloc behavior.
fn bytes_allocated(&self) -> usize;
/// Return the number of rows contained in this memtable.
fn num_rows(&self) -> usize;
/// Returns stats of this memtable.
fn stats(&self) -> &MemtableStats;
}
pub type MemtableRef = Arc<dyn Memtable>;
@@ -125,7 +154,6 @@ pub trait MemtableBuilder: Send + Sync + std::fmt::Debug {
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
// TODO(yingwen): Maybe use individual vector for timestamp and version.
/// Key-value pairs in columnar format.
pub struct KeyValues {
pub sequence: SequenceNumber,
@@ -135,6 +163,7 @@ pub struct KeyValues {
pub start_index_in_batch: usize,
pub keys: Vec<VectorRef>,
pub values: Vec<VectorRef>,
pub timestamp: Option<VectorRef>,
}
impl KeyValues {
@@ -144,10 +173,11 @@ impl KeyValues {
self.start_index_in_batch = index_in_batch;
self.keys.clear();
self.values.clear();
self.timestamp = None;
}
pub fn len(&self) -> usize {
self.keys.first().map(|v| v.len()).unwrap_or_default()
self.timestamp.as_ref().map(|v| v.len()).unwrap_or_default()
}
pub fn is_empty(&self) -> bool {
@@ -157,6 +187,11 @@ impl KeyValues {
pub fn estimated_memory_size(&self) -> usize {
self.keys.iter().fold(0, |acc, v| acc + v.memory_size())
+ self.values.iter().fold(0, |acc, v| acc + v.memory_size())
+ self
.timestamp
.as_ref()
.map(|t| t.memory_size())
.unwrap_or_default()
}
}

View File

@@ -16,7 +16,7 @@ use std::cmp::Ordering;
use std::collections::{btree_map, BTreeMap};
use std::fmt;
use std::ops::Bound;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, RwLock};
use datatypes::data_type::DataType;
@@ -27,7 +27,8 @@ use store_api::storage::{OpType, SequenceNumber};
use crate::error::Result;
use crate::memtable::{
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering,
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, MemtableStats,
RowOrdering,
};
use crate::read::Batch;
use crate::schema::compat::ReadAdapter;
@@ -42,7 +43,7 @@ pub struct BTreeMemtable {
id: MemtableId,
schema: RegionSchemaRef,
map: Arc<RwLockMap>,
estimated_bytes: AtomicUsize,
stats: MemtableStats,
}
impl BTreeMemtable {
@@ -51,7 +52,37 @@ impl BTreeMemtable {
id,
schema,
map: Arc::new(RwLock::new(BTreeMap::new())),
estimated_bytes: AtomicUsize::new(0),
stats: Default::default(),
}
}
/// Updates memtable stats.
/// This function is guarded by `BTreeMemtable::map` so that store-after-load is safe.
fn update_stats(&self, min: Option<Value>, max: Option<Value>) {
if let Some(min) = min {
let min_val = min
.as_timestamp()
.expect("Min timestamp must be a valid timestamp value")
.value();
let cur_min = self.stats.min_timestamp.load(AtomicOrdering::Relaxed);
if min_val < cur_min {
self.stats
.min_timestamp
.store(min_val, AtomicOrdering::Relaxed);
}
}
if let Some(max) = max {
let cur_max = self.stats.max_timestamp.load(AtomicOrdering::Relaxed);
let max_val = max
.as_timestamp()
.expect("Max timestamp must be a valid timestamp value")
.value();
if max_val > cur_max {
self.stats
.max_timestamp
.store(max_val, AtomicOrdering::Relaxed);
}
}
}
}
@@ -65,7 +96,7 @@ impl fmt::Debug for BTreeMemtable {
// Only show StoreSchema
.field("schema", &self.schema)
.field("rows", &len)
.field("estimated_bytes", &self.estimated_bytes)
.field("stats", &self.stats)
.finish()
}
}
@@ -80,15 +111,30 @@ impl Memtable for BTreeMemtable {
}
fn write(&self, kvs: &KeyValues) -> Result<()> {
self.estimated_bytes
debug_assert!(kvs.timestamp.is_some());
self.stats
.estimated_bytes
.fetch_add(kvs.estimated_memory_size(), AtomicOrdering::Relaxed);
let mut map = self.map.write().unwrap();
let iter_row = IterRow::new(kvs);
let mut map = self.map.write().unwrap();
let mut min_ts = None;
let mut max_ts = None;
for (inner_key, row_value) in iter_row {
let ts = inner_key.timestamp();
let min_ts = min_ts.get_or_insert_with(|| ts.clone());
let max_ts = max_ts.get_or_insert_with(|| ts.clone());
if ts < min_ts {
*min_ts = ts.clone();
}
if ts > max_ts {
*max_ts = ts.clone();
}
map.insert(inner_key, row_value);
}
self.update_stats(min_ts, max_ts);
Ok(())
}
@@ -100,13 +146,13 @@ impl Memtable for BTreeMemtable {
Ok(Box::new(iter))
}
fn bytes_allocated(&self) -> usize {
self.estimated_bytes.load(AtomicOrdering::Relaxed)
}
fn num_rows(&self) -> usize {
self.map.read().unwrap().len()
}
fn stats(&self) -> &MemtableStats {
&self.stats
}
}
struct BTreeIterator {
@@ -309,12 +355,15 @@ impl<'a> IterRow<'a> {
}
fn fetch_row(&mut self) -> (InnerKey, RowValue) {
let row_key = self
let mut row_key: Vec<_> = self
.kvs
.keys
.iter()
.map(|vector| vector.get(self.index))
.collect();
// unwrap safety: KeyValues always contains a timestamp as guaranteed in [Inserter::write_one_mutation]
row_key.push(self.kvs.timestamp.as_ref().unwrap().get(self.index));
let inner_key = InnerKey {
row_key,
sequence: self.kvs.sequence,
@@ -382,6 +431,12 @@ impl PartialOrd for InnerKey {
}
impl InnerKey {
#[inline]
fn timestamp(&self) -> &Value {
// safety: row key shall at least contain a timestamp column
self.row_key.last().unwrap()
}
#[inline]
fn is_row_key_equal(&self, other: &InnerKey) -> bool {
self.row_key == other.row_key

View File

@@ -56,6 +56,7 @@ impl Inserter {
start_index_in_batch: self.index_in_batch,
keys: Vec::with_capacity(total_column_num),
values: Vec::with_capacity(total_column_num),
timestamp: None,
};
for mutation in &payload.mutations {
@@ -76,10 +77,11 @@ impl Inserter {
kvs.reset(mutation.op_type, self.index_in_batch);
for key_idx in schema.row_key_indices() {
let ts_idx = schema.timestamp_index();
kvs.timestamp = Some(mutation.record_batch.column(ts_idx).clone());
for key_idx in 0..ts_idx {
kvs.keys.push(mutation.record_batch.column(key_idx).clone());
}
for value_idx in schema.value_indices() {
kvs.values
.push(mutation.record_batch.column(value_idx).clone());
@@ -120,6 +122,7 @@ struct SliceIndex {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use common_time::timestamp::Timestamp;
@@ -171,8 +174,12 @@ mod tests {
mem: &MemtableRef,
sequence: SequenceNumber,
data: &[(i64, Option<i64>)],
max_ts: i64,
min_ts: i64,
) {
let iter = mem.iter(&IterContext::default()).unwrap();
assert_eq!(min_ts, mem.stats().min_timestamp.load(Ordering::Relaxed));
assert_eq!(max_ts, mem.stats().max_timestamp.load(Ordering::Relaxed));
let mut index = 0;
for batch in iter {
@@ -233,6 +240,8 @@ mod tests {
(102, None),
(201, Some(201)),
],
201,
1,
);
}
}

View File

@@ -52,7 +52,7 @@ fn kvs_for_test_with_index(
for key in keys {
key_builders.push(Some(*key));
}
let row_keys = vec![Arc::new(key_builders.finish()) as _];
let ts_col = Arc::new(key_builders.finish()) as _;
let mut value_builders = (
UInt64VectorBuilder::with_capacity(values.len()),
@@ -71,8 +71,9 @@ fn kvs_for_test_with_index(
sequence,
op_type,
start_index_in_batch,
keys: row_keys,
keys: vec![],
values: row_values,
timestamp: Some(ts_col),
};
assert_eq!(keys.len(), kvs.len());
@@ -198,7 +199,7 @@ fn write_iter_memtable_case(ctx: &TestContext) {
assert!(iter.next().is_none());
// Poll the empty iterator again.
assert!(iter.next().is_none());
assert_eq!(0, ctx.memtable.bytes_allocated());
assert_eq!(0, ctx.memtable.stats().bytes_allocated());
// Init test data.
write_kvs(
@@ -224,7 +225,7 @@ fn write_iter_memtable_case(ctx: &TestContext) {
);
// 9 key value pairs (6 + 3).
assert_eq!(576, ctx.memtable.bytes_allocated());
assert_eq!(576, ctx.memtable.stats().bytes_allocated());
let batch_sizes = [1, 4, 8, consts::READ_BATCH_SIZE];
for batch_size in batch_sizes {

View File

@@ -64,15 +64,15 @@ impl MemtableVersion {
}
pub fn mutable_bytes_allocated(&self) -> usize {
self.mutable.bytes_allocated()
self.mutable.stats().bytes_allocated()
}
pub fn total_bytes_allocated(&self) -> usize {
self.immutables
.iter()
.map(|m| m.bytes_allocated())
.map(|m| m.stats().bytes_allocated())
.sum::<usize>()
+ self.mutable.bytes_allocated()
+ self.mutable.stats().bytes_allocated()
}
/// Creates a new `MemtableVersion` that removes immutable memtables

View File

@@ -130,6 +130,11 @@ impl RegionSchema {
self.store_schema.row_key_indices()
}
#[inline]
pub(crate) fn timestamp_index(&self) -> usize {
self.store_schema.timestamp_index()
}
#[inline]
pub(crate) fn value_indices(&self) -> impl Iterator<Item = usize> {
self.store_schema.value_indices()

View File

@@ -155,6 +155,11 @@ impl StoreSchema {
self.user_column_end + 1
}
#[inline]
pub(crate) fn timestamp_index(&self) -> usize {
self.row_key_end - 1
}
#[inline]
pub(crate) fn row_key_indices(&self) -> impl Iterator<Item = usize> {
0..self.row_key_end