From 36231a5d506c63704cd916f440ea5bf6da209189 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Sun, 3 Sep 2023 10:41:47 +0800 Subject: [PATCH] feat(mito2): add alloc_tracker for memtable (#2266) * feat: add alloc_tracker for memtable * chore: integrate WriteBufferManager --- src/mito2/src/lib.rs | 1 + src/mito2/src/memtable.rs | 104 ++++++++++++++++++++- src/mito2/src/memtable/time_series.rs | 127 +++++++++++++++++++++++--- src/mito2/src/metrics.rs | 16 ++++ 4 files changed, 235 insertions(+), 13 deletions(-) create mode 100644 src/mito2/src/metrics.rs diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index acf0fea140..c64ad46ac8 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -33,6 +33,7 @@ mod flush; pub mod manifest; #[allow(dead_code)] pub mod memtable; +mod metrics; #[allow(dead_code)] pub mod read; #[allow(dead_code)] diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 21f5f6d056..8756829a9f 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -20,14 +20,18 @@ pub mod key_values; pub(crate) mod version; use std::fmt; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; +use common_time::Timestamp; +use metrics::{decrement_gauge, increment_gauge}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ScanRequest; use crate::error::Result; +use crate::flush::WriteBufferManagerRef; pub use crate::memtable::key_values::KeyValues; +use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; /// Id for memtables. @@ -35,6 +39,20 @@ use crate::read::Batch; /// Should be unique under the same region. pub type MemtableId = u32; +#[derive(Debug, Default)] +pub struct MemtableStats { + /// The estimated bytes allocated by this memtable from heap. + estimated_bytes: usize, + /// The time range that this memtable contains. + time_range: Option<(Timestamp, Timestamp)>, +} + +impl MemtableStats { + pub fn bytes_allocated(&self) -> usize { + self.estimated_bytes + } +} + pub type BoxedBatchIterator = Box> + Send + Sync>; /// In memory write buffer. @@ -53,6 +71,9 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Mark the memtable as immutable. fn mark_immutable(&self); + + /// Returns the [MemtableStats] info of Memtable. + fn stats(&self) -> MemtableStats; } pub type MemtableRef = Arc; @@ -98,6 +119,87 @@ impl Memtable for EmptyMemtable { } fn mark_immutable(&self) {} + + fn stats(&self) -> MemtableStats { + MemtableStats::default() + } +} + +/// Memtable memory allocation tracker. +#[derive(Default)] +pub struct AllocTracker { + write_buffer_manager: Option, + /// Bytes allocated by the tracker. + bytes_allocated: AtomicUsize, + /// Whether allocating is done. + is_done_allocating: AtomicBool, +} + +impl fmt::Debug for AllocTracker { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("AllocTracker") + .field("bytes_allocated", &self.bytes_allocated) + .field("is_done_allocating", &self.is_done_allocating) + .finish() + } +} + +impl AllocTracker { + /// Returns a new [AllocTracker]. + pub fn new(write_buffer_manager: Option) -> AllocTracker { + AllocTracker { + write_buffer_manager, + bytes_allocated: AtomicUsize::new(0), + is_done_allocating: AtomicBool::new(false), + } + } + + /// Tracks `bytes` memory is allocated. + pub(crate) fn on_allocation(&self, bytes: usize) { + let _ = self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed); + increment_gauge!(WRITE_BUFFER_BYTES, bytes as f64); + if let Some(write_buffer_manager) = &self.write_buffer_manager { + write_buffer_manager.reserve_mem(bytes); + } + } + + /// Marks we have finished allocating memory so we can free it from + /// the write buffer's limit. + /// + /// The region MUST ensure that it calls this method inside the region writer's write lock. + pub(crate) fn done_allocating(&self) { + if let Some(write_buffer_manager) = &self.write_buffer_manager { + if self + .is_done_allocating + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + write_buffer_manager + .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed)); + } + } + } + + /// Returns bytes allocated. + pub(crate) fn bytes_allocated(&self) -> usize { + self.bytes_allocated.load(Ordering::Relaxed) + } +} + +impl Drop for AllocTracker { + fn drop(&mut self) { + if !self.is_done_allocating.load(Ordering::Relaxed) { + self.done_allocating(); + } + + let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed); + decrement_gauge!(WRITE_BUFFER_BYTES, bytes_allocated as f64); + + // Memory tracked by this tracker is freed. + if let Some(write_buffer_manager) = &self.write_buffer_manager { + write_buffer_manager.free_mem(bytes_allocated); + } + } } /// Default memtable builder. diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 9666398fc0..b20ffb74fa 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -15,7 +15,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use api::v1::OpType; @@ -33,7 +33,8 @@ use store_api::storage::{ColumnId, ScanRequest}; use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::memtable::{ - BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, + AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, + MemtableRef, MemtableStats, }; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -59,6 +60,9 @@ pub struct TimeSeriesMemtable { region_metadata: RegionMetadataRef, row_codec: McmpRowCodec, series_set: SeriesSet, + alloc_tracker: AllocTracker, + max_timestamp: AtomicI64, + min_timestamp: AtomicI64, } impl TimeSeriesMemtable { @@ -75,6 +79,54 @@ impl TimeSeriesMemtable { region_metadata, series_set, row_codec, + alloc_tracker: AllocTracker::default(), + max_timestamp: AtomicI64::new(i64::MIN), + min_timestamp: AtomicI64::new(i64::MAX), + } + } + + /// Updates memtable stats. + fn update_stats(&self, request_size: usize, min: i64, max: i64) { + self.alloc_tracker.on_allocation(request_size); + + loop { + let current_min = self.min_timestamp.load(Ordering::Relaxed); + if min >= current_min { + break; + } + + let Err(updated) = self.min_timestamp.compare_exchange( + current_min, + min, + Ordering::Relaxed, + Ordering::Relaxed, + ) else { + break; + }; + + if updated == min { + break; + } + } + + loop { + let current_max = self.max_timestamp.load(Ordering::Relaxed); + if max <= current_max { + break; + } + + let Err(updated) = self.max_timestamp.compare_exchange( + current_max, + max, + Ordering::Relaxed, + Ordering::Relaxed, + ) else { + break; + }; + + if updated == max { + break; + } } } } @@ -91,6 +143,10 @@ impl Memtable for TimeSeriesMemtable { } fn write(&self, kvs: &KeyValues) -> Result<()> { + let mut allocated = 0; + let mut min_ts = i64::MAX; + let mut max_ts = i64::MIN; + for kv in kvs.iter() { ensure!( kv.num_primary_keys() == self.row_codec.num_fields(), @@ -100,11 +156,25 @@ impl Memtable for TimeSeriesMemtable { } ); let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; - let fields = kv.fields().collect(); - let series = self.series_set.get_or_add_series(primary_key_encoded); + let fields = kv.fields().collect::>(); + + allocated += fields.len() * std::mem::size_of::(); + let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded); + allocated += series_allocated; + + // safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + min_ts = min_ts.min(ts); + max_ts = max_ts.max(ts); + let mut guard = series.write().unwrap(); guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields); } + + // TODO(hl): this maybe inaccurate since for-iteration may return early. + // We may lift the primary key length check out of Memtable::write + // so that we can ensure writing to memtable will succeed. + self.update_stats(allocated, min_ts, max_ts); Ok(()) } @@ -129,7 +199,33 @@ impl Memtable for TimeSeriesMemtable { } fn mark_immutable(&self) { - // TODO(yingwen): AllocTracker.done_allocating() + self.alloc_tracker.done_allocating(); + } + + fn stats(&self) -> MemtableStats { + let estimated_bytes = self.alloc_tracker.bytes_allocated(); + + if estimated_bytes == 0 { + // no rows ever written + return MemtableStats { + estimated_bytes, + time_range: None, + }; + } + let ts_type = self + .region_metadata + .time_index_column() + .column_schema + .data_type + .clone() + .as_timestamp() + .expect("Timestamp column must have timestamp type"); + let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed)); + let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed)); + MemtableStats { + estimated_bytes, + time_range: Some((max_timestamp, min_timestamp)), + } } } @@ -150,20 +246,22 @@ impl SeriesSet { } impl SeriesSet { - /// Returns the series for given primary key, or create a new series if not already exist. - fn get_or_add_series(&self, primary_key: Vec) -> Arc> { + /// Returns the series for given primary key, or create a new series if not already exist, + /// along with the allocated memory footprint for primary keys. + fn get_or_add_series(&self, primary_key: Vec) -> (Arc>, usize) { if let Some(series) = self.series.read().unwrap().get(&primary_key) { - return series.clone(); + return (series.clone(), 0); }; let s = Arc::new(RwLock::new(Series::new(&self.region_metadata))); let mut indices = self.series.write().unwrap(); match indices.entry(primary_key) { Entry::Vacant(v) => { + let key_len = v.key().len(); v.insert(s.clone()); - s + (s, key_len) } // safety: series must exist at given index. - Entry::Occupied(v) => v.get().clone(), + Entry::Occupied(v) => (v.get().clone(), 0), } } @@ -176,6 +274,11 @@ impl SeriesSet { last_key: None, } } + + /// Returns if series set is empty. + fn is_empty(&self) -> bool { + self.series.read().unwrap().is_empty() + } } struct Iter { @@ -709,7 +812,7 @@ mod tests { for j in i * 100..(i + 1) * 100 { let pk = j % pk_num; let primary_key = format!("pk-{}", pk).as_bytes().to_vec(); - let series = set.get_or_add_series(primary_key); + let (series, _) = set.get_or_add_series(primary_key); let mut guard = series.write().unwrap(); guard.push( ts_value_ref(j as i64), @@ -732,7 +835,7 @@ mod tests { for i in 0..pk_num { let pk = format!("pk-{}", i).as_bytes().to_vec(); - let series = set.get_or_add_series(pk); + let (series, _) = set.get_or_add_series(pk); let mut guard = series.write().unwrap(); let values = guard.compact(&schema).unwrap(); timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64)); diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs new file mode 100644 index 0000000000..26d7147527 --- /dev/null +++ b/src/mito2/src/metrics.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Global write buffer size in bytes. +pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";