feat: Add freeze and fork method to the memtable (#3374)

* feat: add fork method to the memtable

* feat: allow mark immutable returns result

* feat: use fork to create the mutable memtable

* feat: remove memtable builder from freeze

* chore: warninigs

* fix: inspect error

* feat: iter returns result

* chore: maintains memtable id in region

* chore: update comment

* fix: remove region status if failed to freeze a memtable

* chroe: update comment

* chore: iter should not require sync

* chore: implement freeze and fork for the new memtable
This commit is contained in:
Yingwen
2024-02-24 20:11:16 +08:00
committed by GitHub
parent 1df64f294b
commit abbfd23d4b
11 changed files with 105 additions and 58 deletions

View File

@@ -31,7 +31,6 @@ use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::memtable::MemtableBuilderRef;
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::read::Source;
use crate::region::options::IndexOptions;
@@ -198,7 +197,6 @@ pub(crate) struct RegionFlushTask {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) access_layer: AccessLayerRef,
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
pub(crate) engine_config: Arc<MitoConfig>,
@@ -317,7 +315,7 @@ impl RegionFlushTask {
}
let file_id = FileId::random();
let iter = mem.iter(None, None);
let iter = mem.iter(None, None)?;
let source = Source::Iter(iter);
let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto();
let mem_threshold_index_create = self
@@ -466,7 +464,13 @@ impl FlushScheduler {
}
// Now we can flush the region directly.
version_control.freeze_mutable(&task.memtable_builder);
if let Err(e) = version_control.freeze_mutable() {
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
// Remove from region status if we can't freeze the mutable memtable.
self.region_status.remove(&region_id);
return Err(e);
}
// Submit a flush job.
let job = task.into_flush_job(version_control);
if let Err(e) = self.scheduler.schedule(job) {
@@ -765,7 +769,6 @@ mod tests {
senders: Vec::new(),
request_sender: tx,
access_layer: env.access_layer.clone(),
memtable_builder: builder.memtable_builder(),
file_purger: builder.file_purger(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),

View File

@@ -60,7 +60,7 @@ impl MemtableStats {
}
}
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send + Sync>;
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
/// In memory write buffer.
pub trait Memtable: Send + Sync + fmt::Debug {
@@ -77,16 +77,21 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BoxedBatchIterator;
) -> Result<BoxedBatchIterator>;
/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
/// Mark the memtable as immutable.
fn mark_immutable(&self);
/// Turns a mutable memtable into an immutable memtable.
fn freeze(&self) -> Result<()>;
/// Returns the [MemtableStats] info of Memtable.
fn stats(&self) -> MemtableStats;
/// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
///
/// A region must freeze the memtable before invoking this method.
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
}
pub type MemtableRef = Arc<dyn Memtable>;
@@ -94,7 +99,7 @@ pub type MemtableRef = Arc<dyn Memtable>;
/// Builder to build a new [Memtable].
pub trait MemtableBuilder: Send + Sync + fmt::Debug {
/// Builds a new memtable instance.
fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef;
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
}
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
@@ -158,6 +163,11 @@ impl AllocTracker {
pub(crate) fn bytes_allocated(&self) -> usize {
self.bytes_allocated.load(Ordering::Relaxed)
}
/// Returns the write buffer manager.
pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
self.write_buffer_manager.clone()
}
}
impl Drop for AllocTracker {

View File

@@ -24,7 +24,7 @@ mod shard_builder;
mod tree;
use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use store_api::metadata::RegionMetadataRef;
@@ -108,8 +108,7 @@ impl Memtable for MergeTreeMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> BoxedBatchIterator {
// FIXME(yingwen): Change return value to `Result<BoxedBatchIterator>`.
) -> Result<BoxedBatchIterator> {
todo!()
}
@@ -117,8 +116,10 @@ impl Memtable for MergeTreeMemtable {
self.tree.is_empty()
}
fn mark_immutable(&self) {
fn freeze(&self) -> Result<()> {
self.alloc_tracker.done_allocating();
self.tree.freeze()
}
fn stats(&self) -> MemtableStats {
@@ -148,6 +149,14 @@ impl Memtable for MergeTreeMemtable {
time_range: Some((min_timestamp, max_timestamp)),
}
}
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
let tree = self.tree.fork(metadata.clone());
let memtable =
MergeTreeMemtable::with_tree(id, tree, self.alloc_tracker.write_buffer_manager());
Arc::new(memtable)
}
}
impl MergeTreeMemtable {
@@ -235,7 +244,6 @@ impl MergeTreeMemtable {
/// Builder to build a [MergeTreeMemtable].
#[derive(Debug, Default)]
pub struct MergeTreeMemtableBuilder {
id: AtomicU32,
write_buffer_manager: Option<WriteBufferManagerRef>,
config: MergeTreeConfig,
}
@@ -244,7 +252,6 @@ impl MergeTreeMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
Self {
id: AtomicU32::new(0),
write_buffer_manager,
config: MergeTreeConfig::default(),
}
@@ -252,8 +259,7 @@ impl MergeTreeMemtableBuilder {
}
impl MemtableBuilder for MergeTreeMemtableBuilder {
fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef {
let id = self.id.fetch_add(1, Ordering::Relaxed);
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(MergeTreeMemtable::new(
id,
metadata.clone(),

View File

@@ -15,7 +15,7 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, Bound, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -52,7 +52,6 @@ const INITIAL_BUILDER_CAPACITY: usize = 0;
/// Builder to build [TimeSeriesMemtable].
#[derive(Debug, Default)]
pub struct TimeSeriesMemtableBuilder {
id: AtomicU32,
write_buffer_manager: Option<WriteBufferManagerRef>,
}
@@ -60,15 +59,13 @@ impl TimeSeriesMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
Self {
id: AtomicU32::new(0),
write_buffer_manager,
}
}
}
impl MemtableBuilder for TimeSeriesMemtableBuilder {
fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef {
let id = self.id.fetch_add(1, Ordering::Relaxed);
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(TimeSeriesMemtable::new(
metadata.clone(),
id,
@@ -211,7 +208,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
filters: Option<Predicate>,
) -> BoxedBatchIterator {
) -> Result<BoxedBatchIterator> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
@@ -221,15 +218,18 @@ impl Memtable for TimeSeriesMemtable {
.collect()
};
Box::new(self.series_set.iter_series(projection, filters))
let iter = self.series_set.iter_series(projection, filters);
Ok(Box::new(iter))
}
fn is_empty(&self) -> bool {
self.series_set.series.read().unwrap().is_empty()
}
fn mark_immutable(&self) {
fn freeze(&self) -> Result<()> {
self.alloc_tracker.done_allocating();
Ok(())
}
fn stats(&self) -> MemtableStats {
@@ -257,6 +257,14 @@ impl Memtable for TimeSeriesMemtable {
time_range: Some((min_timestamp, max_timestamp)),
}
}
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(TimeSeriesMemtable::new(
metadata.clone(),
id,
self.alloc_tracker.write_buffer_manager(),
))
}
}
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
@@ -1119,7 +1127,7 @@ mod tests {
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<HashSet<_>>();
let iter = memtable.iter(None, None);
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
@@ -1155,7 +1163,7 @@ mod tests {
let memtable = TimeSeriesMemtable::new(schema, 42, None);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None);
let iter = memtable.iter(Some(&[3]), None).unwrap();
let mut v0_all = vec![];

View File

@@ -17,7 +17,9 @@
use std::sync::Arc;
use smallvec::SmallVec;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::{MemtableId, MemtableRef};
/// A version of current memtables in a region.
@@ -61,17 +63,21 @@ impl MemtableVersion {
/// memtable.
///
/// Returns `None` if the mutable memtable is empty.
#[must_use]
pub(crate) fn freeze_mutable(&self, mutable: MemtableRef) -> Option<MemtableVersion> {
debug_assert!(mutable.is_empty());
pub(crate) fn freeze_mutable(
&self,
metadata: &RegionMetadataRef,
) -> Result<Option<MemtableVersion>> {
if self.mutable.is_empty() {
// No need to freeze the mutable memtable.
return None;
return Ok(None);
}
// Marks the mutable memtable as immutable so it can free the memory usage from our
// soft limit.
self.mutable.mark_immutable();
self.mutable.freeze()?;
// Fork the memtable.
let mutable = self.mutable.fork(self.next_memtable_id(), metadata);
// Pushes the mutable memtable to immutable list.
let immutables = self
.immutables
@@ -79,10 +85,10 @@ impl MemtableVersion {
.cloned()
.chain([self.mutable.clone()])
.collect();
Some(MemtableVersion {
Ok(Some(MemtableVersion {
mutable,
immutables,
})
}))
}
/// Removes memtables by ids from immutable memtables.
@@ -115,4 +121,9 @@ impl MemtableVersion {
pub(crate) fn is_empty(&self) -> bool {
self.mutable.is_empty() && self.immutables.is_empty()
}
/// Returns the next memtable id.
pub(crate) fn next_memtable_id(&self) -> MemtableId {
self.mutable.id() + 1
}
}

View File

@@ -213,7 +213,7 @@ impl SeqScan {
async fn build_sources(&self) -> Result<Vec<Source>> {
let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len());
for mem in &self.memtables {
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone());
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone())?;
sources.push(Source::Iter(iter));
}
for file in &self.files {

View File

@@ -168,7 +168,8 @@ impl RegionOpener {
let manifest_manager =
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
let mutable = self.memtable_builder.build(&metadata);
// Initial memtable id is 0.
let mutable = self.memtable_builder.build(0, &metadata);
let version = VersionBuilder::new(metadata, mutable)
.options(options)
@@ -258,7 +259,8 @@ impl RegionOpener {
access_layer.clone(),
self.cache_manager.clone(),
));
let mutable = self.memtable_builder.build(&metadata);
// Initial memtable id is 0.
let mutable = self.memtable_builder.build(0, &metadata);
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)

View File

@@ -29,6 +29,7 @@ use std::time::Duration;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use crate::error::Result;
use crate::manifest::action::RegionEdit;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
@@ -76,14 +77,16 @@ impl VersionControl {
}
/// Freezes the mutable memtable if it is not empty.
pub(crate) fn freeze_mutable(&self, builder: &MemtableBuilderRef) {
pub(crate) fn freeze_mutable(&self) -> Result<()> {
let version = self.current().version;
if version.memtables.mutable.is_empty() {
return;
return Ok(());
}
let new_mutable = builder.build(&version.metadata);
// Safety: Immutable memtable is None.
let new_memtables = version.memtables.freeze_mutable(new_mutable).unwrap();
let new_memtables = version
.memtables
.freeze_mutable(&version.metadata)?
.unwrap();
// Create a new version with memtable switched.
let new_version = Arc::new(
VersionBuilder::from_version(version)
@@ -93,6 +96,8 @@ impl VersionControl {
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
Ok(())
}
/// Apply edit to current version.
@@ -117,7 +122,8 @@ impl VersionControl {
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
let version = self.current().version;
let new_mutable = memtable_builder.build(&version.metadata);
let new_mutable =
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
let mut data = self.data.write().unwrap();
data.is_dropped = true;
@@ -133,8 +139,8 @@ impl VersionControl {
/// It replaces existing mutable memtable with a memtable that uses the
/// new schema. Memtables of the version must be empty.
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
let new_mutable = builder.build(&metadata);
let version = self.current().version;
let new_mutable = builder.build(version.memtables.next_memtable_id(), &metadata);
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
@@ -157,7 +163,8 @@ impl VersionControl {
) {
let version = self.current().version;
let new_mutable = memtable_builder.build(&version.metadata);
let new_mutable =
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(truncated_entry_id)

View File

@@ -62,33 +62,34 @@ impl Memtable for EmptyMemtable {
&self,
_projection: Option<&[ColumnId]>,
_filters: Option<Predicate>,
) -> BoxedBatchIterator {
Box::new(std::iter::empty())
) -> Result<BoxedBatchIterator> {
Ok(Box::new(std::iter::empty()))
}
fn is_empty(&self) -> bool {
true
}
fn mark_immutable(&self) {}
fn freeze(&self) -> Result<()> {
Ok(())
}
fn stats(&self) -> MemtableStats {
MemtableStats::default()
}
fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(EmptyMemtable::new(id))
}
}
/// Empty memtable builder.
#[derive(Debug, Default)]
pub(crate) struct EmptyMemtableBuilder {
/// Next memtable id.
next_id: AtomicU32,
}
pub(crate) struct EmptyMemtableBuilder {}
impl MemtableBuilder for EmptyMemtableBuilder {
fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(EmptyMemtable::new(
self.next_id.fetch_add(1, Ordering::Relaxed),
))
fn build(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(EmptyMemtable::new(id))
}
}

View File

@@ -105,7 +105,7 @@ impl VersionControlBuilder {
pub(crate) fn build_version(&self) -> Version {
let metadata = Arc::new(self.metadata.clone());
let mutable = self.memtable_builder.build(&metadata);
let mutable = self.memtable_builder.build(0, &metadata);
VersionBuilder::new(metadata, mutable)
.add_files(self.file_purger.clone(), self.files.values().cloned())
.build()

View File

@@ -143,7 +143,6 @@ impl<S> RegionWorkerLoop<S> {
senders: Vec::new(),
request_sender: self.sender.clone(),
access_layer: region.access_layer.clone(),
memtable_builder: self.memtable_builder.clone(),
file_purger: region.file_purger.clone(),
listener: self.listener.clone(),
engine_config,