feat(mito2): add alloc_tracker for memtable (#2266)

* feat: add alloc_tracker for memtable

* chore: integrate WriteBufferManager
This commit is contained in:
Lei, HUANG
2023-09-03 10:41:47 +08:00
committed by Ruihang Xia
parent a7fa40e16d
commit 36231a5d50
4 changed files with 235 additions and 13 deletions

View File

@@ -33,6 +33,7 @@ mod flush;
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
mod metrics;
#[allow(dead_code)]
pub mod read;
#[allow(dead_code)]

View File

@@ -20,14 +20,18 @@ pub mod key_values;
pub(crate) mod version;
use std::fmt;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use common_time::Timestamp;
use metrics::{decrement_gauge, increment_gauge};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ScanRequest;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
pub use crate::memtable::key_values::KeyValues;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
/// Id for memtables.
@@ -35,6 +39,20 @@ use crate::read::Batch;
/// Should be unique under the same region.
pub type MemtableId = u32;
#[derive(Debug, Default)]
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
estimated_bytes: usize,
/// The time range that this memtable contains.
time_range: Option<(Timestamp, Timestamp)>,
}
impl MemtableStats {
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes
}
}
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send + Sync>;
/// In memory write buffer.
@@ -53,6 +71,9 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Mark the memtable as immutable.
fn mark_immutable(&self);
/// Returns the [MemtableStats] info of Memtable.
fn stats(&self) -> MemtableStats;
}
pub type MemtableRef = Arc<dyn Memtable>;
@@ -98,6 +119,87 @@ impl Memtable for EmptyMemtable {
}
fn mark_immutable(&self) {}
fn stats(&self) -> MemtableStats {
MemtableStats::default()
}
}
/// Memtable memory allocation tracker.
#[derive(Default)]
pub struct AllocTracker {
write_buffer_manager: Option<WriteBufferManagerRef>,
/// Bytes allocated by the tracker.
bytes_allocated: AtomicUsize,
/// Whether allocating is done.
is_done_allocating: AtomicBool,
}
impl fmt::Debug for AllocTracker {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("AllocTracker")
.field("bytes_allocated", &self.bytes_allocated)
.field("is_done_allocating", &self.is_done_allocating)
.finish()
}
}
impl AllocTracker {
/// Returns a new [AllocTracker].
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
AllocTracker {
write_buffer_manager,
bytes_allocated: AtomicUsize::new(0),
is_done_allocating: AtomicBool::new(false),
}
}
/// Tracks `bytes` memory is allocated.
pub(crate) fn on_allocation(&self, bytes: usize) {
let _ = self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
increment_gauge!(WRITE_BUFFER_BYTES, bytes as f64);
if let Some(write_buffer_manager) = &self.write_buffer_manager {
write_buffer_manager.reserve_mem(bytes);
}
}
/// Marks we have finished allocating memory so we can free it from
/// the write buffer's limit.
///
/// The region MUST ensure that it calls this method inside the region writer's write lock.
pub(crate) fn done_allocating(&self) {
if let Some(write_buffer_manager) = &self.write_buffer_manager {
if self
.is_done_allocating
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
write_buffer_manager
.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
}
}
}
/// Returns bytes allocated.
pub(crate) fn bytes_allocated(&self) -> usize {
self.bytes_allocated.load(Ordering::Relaxed)
}
}
impl Drop for AllocTracker {
fn drop(&mut self) {
if !self.is_done_allocating.load(Ordering::Relaxed) {
self.done_allocating();
}
let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
decrement_gauge!(WRITE_BUFFER_BYTES, bytes_allocated as f64);
// Memory tracked by this tracker is freed.
if let Some(write_buffer_manager) = &self.write_buffer_manager {
write_buffer_manager.free_mem(bytes_allocated);
}
}
}
/// Default memtable builder.

View File

@@ -15,7 +15,7 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, Bound, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::OpType;
@@ -33,7 +33,8 @@ use store_api::storage::{ColumnId, ScanRequest};
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef,
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -59,6 +60,9 @@ pub struct TimeSeriesMemtable {
region_metadata: RegionMetadataRef,
row_codec: McmpRowCodec,
series_set: SeriesSet,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
}
impl TimeSeriesMemtable {
@@ -75,6 +79,54 @@ impl TimeSeriesMemtable {
region_metadata,
series_set,
row_codec,
alloc_tracker: AllocTracker::default(),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
}
}
/// Updates memtable stats.
fn update_stats(&self, request_size: usize, min: i64, max: i64) {
self.alloc_tracker.on_allocation(request_size);
loop {
let current_min = self.min_timestamp.load(Ordering::Relaxed);
if min >= current_min {
break;
}
let Err(updated) = self.min_timestamp.compare_exchange(
current_min,
min,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == min {
break;
}
}
loop {
let current_max = self.max_timestamp.load(Ordering::Relaxed);
if max <= current_max {
break;
}
let Err(updated) = self.max_timestamp.compare_exchange(
current_max,
max,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == max {
break;
}
}
}
}
@@ -91,6 +143,10 @@ impl Memtable for TimeSeriesMemtable {
}
fn write(&self, kvs: &KeyValues) -> Result<()> {
let mut allocated = 0;
let mut min_ts = i64::MAX;
let mut max_ts = i64::MIN;
for kv in kvs.iter() {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
@@ -100,11 +156,25 @@ impl Memtable for TimeSeriesMemtable {
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect();
let series = self.series_set.get_or_add_series(primary_key_encoded);
let fields = kv.fields().collect::<Vec<_>>();
allocated += fields.len() * std::mem::size_of::<ValueRef>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
allocated += series_allocated;
// safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
min_ts = min_ts.min(ts);
max_ts = max_ts.max(ts);
let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
}
// TODO(hl): this maybe inaccurate since for-iteration may return early.
// We may lift the primary key length check out of Memtable::write
// so that we can ensure writing to memtable will succeed.
self.update_stats(allocated, min_ts, max_ts);
Ok(())
}
@@ -129,7 +199,33 @@ impl Memtable for TimeSeriesMemtable {
}
fn mark_immutable(&self) {
// TODO(yingwen): AllocTracker.done_allocating()
self.alloc_tracker.done_allocating();
}
fn stats(&self) -> MemtableStats {
let estimated_bytes = self.alloc_tracker.bytes_allocated();
if estimated_bytes == 0 {
// no rows ever written
return MemtableStats {
estimated_bytes,
time_range: None,
};
}
let ts_type = self
.region_metadata
.time_index_column()
.column_schema
.data_type
.clone()
.as_timestamp()
.expect("Timestamp column must have timestamp type");
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {
estimated_bytes,
time_range: Some((max_timestamp, min_timestamp)),
}
}
}
@@ -150,20 +246,22 @@ impl SeriesSet {
}
impl SeriesSet {
/// Returns the series for given primary key, or create a new series if not already exist.
fn get_or_add_series(&self, primary_key: Vec<u8>) -> Arc<RwLock<Series>> {
/// Returns the series for given primary key, or create a new series if not already exist,
/// along with the allocated memory footprint for primary keys.
fn get_or_add_series(&self, primary_key: Vec<u8>) -> (Arc<RwLock<Series>>, usize) {
if let Some(series) = self.series.read().unwrap().get(&primary_key) {
return series.clone();
return (series.clone(), 0);
};
let s = Arc::new(RwLock::new(Series::new(&self.region_metadata)));
let mut indices = self.series.write().unwrap();
match indices.entry(primary_key) {
Entry::Vacant(v) => {
let key_len = v.key().len();
v.insert(s.clone());
s
(s, key_len)
}
// safety: series must exist at given index.
Entry::Occupied(v) => v.get().clone(),
Entry::Occupied(v) => (v.get().clone(), 0),
}
}
@@ -176,6 +274,11 @@ impl SeriesSet {
last_key: None,
}
}
/// Returns if series set is empty.
fn is_empty(&self) -> bool {
self.series.read().unwrap().is_empty()
}
}
struct Iter {
@@ -709,7 +812,7 @@ mod tests {
for j in i * 100..(i + 1) * 100 {
let pk = j % pk_num;
let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
let series = set.get_or_add_series(primary_key);
let (series, _) = set.get_or_add_series(primary_key);
let mut guard = series.write().unwrap();
guard.push(
ts_value_ref(j as i64),
@@ -732,7 +835,7 @@ mod tests {
for i in 0..pk_num {
let pk = format!("pk-{}", i).as_bytes().to_vec();
let series = set.get_or_add_series(pk);
let (series, _) = set.get_or_add_series(pk);
let mut guard = series.write().unwrap();
let values = guard.compact(&schema).unwrap();
timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));

16
src/mito2/src/metrics.rs Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";