From abbfd23d4b69977bd86fde4209b7b6a5c032c533 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sat, 24 Feb 2024 20:11:16 +0800 Subject: [PATCH] feat: Add freeze and fork method to the memtable (#3374) * feat: add fork method to the memtable * feat: allow mark immutable returns result * feat: use fork to create the mutable memtable * feat: remove memtable builder from freeze * chore: warninigs * fix: inspect error * feat: iter returns result * chore: maintains memtable id in region * chore: update comment * fix: remove region status if failed to freeze a memtable * chroe: update comment * chore: iter should not require sync * chore: implement freeze and fork for the new memtable --- src/mito2/src/flush.rs | 13 ++++++----- src/mito2/src/memtable.rs | 20 ++++++++++++----- src/mito2/src/memtable/merge_tree.rs | 22 ++++++++++++------- src/mito2/src/memtable/time_series.rs | 28 +++++++++++++++--------- src/mito2/src/memtable/version.rs | 25 +++++++++++++++------ src/mito2/src/read/seq_scan.rs | 2 +- src/mito2/src/region/opener.rs | 6 +++-- src/mito2/src/region/version.rs | 21 ++++++++++++------ src/mito2/src/test_util/memtable_util.rs | 23 +++++++++---------- src/mito2/src/test_util/version_util.rs | 2 +- src/mito2/src/worker/handle_flush.rs | 1 - 11 files changed, 105 insertions(+), 58 deletions(-) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 3848ec0726..0eea330537 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -31,7 +31,6 @@ use crate::config::MitoConfig; use crate::error::{ Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; -use crate::memtable::MemtableBuilderRef; use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL}; use crate::read::Source; use crate::region::options::IndexOptions; @@ -198,7 +197,6 @@ pub(crate) struct RegionFlushTask { pub(crate) request_sender: mpsc::Sender, pub(crate) access_layer: AccessLayerRef, - pub(crate) memtable_builder: MemtableBuilderRef, pub(crate) file_purger: FilePurgerRef, pub(crate) listener: WorkerListener, pub(crate) engine_config: Arc, @@ -317,7 +315,7 @@ impl RegionFlushTask { } let file_id = FileId::random(); - let iter = mem.iter(None, None); + let iter = mem.iter(None, None)?; let source = Source::Iter(iter); let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto(); let mem_threshold_index_create = self @@ -466,7 +464,13 @@ impl FlushScheduler { } // Now we can flush the region directly. - version_control.freeze_mutable(&task.memtable_builder); + if let Err(e) = version_control.freeze_mutable() { + error!(e; "Failed to freeze the mutable memtable for region {}", region_id); + + // Remove from region status if we can't freeze the mutable memtable. + self.region_status.remove(®ion_id); + return Err(e); + } // Submit a flush job. let job = task.into_flush_job(version_control); if let Err(e) = self.scheduler.schedule(job) { @@ -765,7 +769,6 @@ mod tests { senders: Vec::new(), request_sender: tx, access_layer: env.access_layer.clone(), - memtable_builder: builder.memtable_builder(), file_purger: builder.file_purger(), listener: WorkerListener::default(), engine_config: Arc::new(MitoConfig::default()), diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 99fcc75919..81750de4db 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -60,7 +60,7 @@ impl MemtableStats { } } -pub type BoxedBatchIterator = Box> + Send + Sync>; +pub type BoxedBatchIterator = Box> + Send>; /// In memory write buffer. pub trait Memtable: Send + Sync + fmt::Debug { @@ -77,16 +77,21 @@ pub trait Memtable: Send + Sync + fmt::Debug { &self, projection: Option<&[ColumnId]>, predicate: Option, - ) -> BoxedBatchIterator; + ) -> Result; /// Returns true if the memtable is empty. fn is_empty(&self) -> bool; - /// Mark the memtable as immutable. - fn mark_immutable(&self); + /// Turns a mutable memtable into an immutable memtable. + fn freeze(&self) -> Result<()>; /// Returns the [MemtableStats] info of Memtable. fn stats(&self) -> MemtableStats; + + /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`. + /// + /// A region must freeze the memtable before invoking this method. + fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef; } pub type MemtableRef = Arc; @@ -94,7 +99,7 @@ pub type MemtableRef = Arc; /// Builder to build a new [Memtable]. pub trait MemtableBuilder: Send + Sync + fmt::Debug { /// Builds a new memtable instance. - fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef; + fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef; } pub type MemtableBuilderRef = Arc; @@ -158,6 +163,11 @@ impl AllocTracker { pub(crate) fn bytes_allocated(&self) -> usize { self.bytes_allocated.load(Ordering::Relaxed) } + + /// Returns the write buffer manager. + pub(crate) fn write_buffer_manager(&self) -> Option { + self.write_buffer_manager.clone() + } } impl Drop for AllocTracker { diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 4ab65a6e97..dd0511b1ec 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -24,7 +24,7 @@ mod shard_builder; mod tree; use std::fmt; -use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use store_api::metadata::RegionMetadataRef; @@ -108,8 +108,7 @@ impl Memtable for MergeTreeMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, - ) -> BoxedBatchIterator { - // FIXME(yingwen): Change return value to `Result`. + ) -> Result { todo!() } @@ -117,8 +116,10 @@ impl Memtable for MergeTreeMemtable { self.tree.is_empty() } - fn mark_immutable(&self) { + fn freeze(&self) -> Result<()> { self.alloc_tracker.done_allocating(); + + self.tree.freeze() } fn stats(&self) -> MemtableStats { @@ -148,6 +149,14 @@ impl Memtable for MergeTreeMemtable { time_range: Some((min_timestamp, max_timestamp)), } } + + fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { + let tree = self.tree.fork(metadata.clone()); + + let memtable = + MergeTreeMemtable::with_tree(id, tree, self.alloc_tracker.write_buffer_manager()); + Arc::new(memtable) + } } impl MergeTreeMemtable { @@ -235,7 +244,6 @@ impl MergeTreeMemtable { /// Builder to build a [MergeTreeMemtable]. #[derive(Debug, Default)] pub struct MergeTreeMemtableBuilder { - id: AtomicU32, write_buffer_manager: Option, config: MergeTreeConfig, } @@ -244,7 +252,6 @@ impl MergeTreeMemtableBuilder { /// Creates a new builder with specific `write_buffer_manager`. pub fn new(write_buffer_manager: Option) -> Self { Self { - id: AtomicU32::new(0), write_buffer_manager, config: MergeTreeConfig::default(), } @@ -252,8 +259,7 @@ impl MergeTreeMemtableBuilder { } impl MemtableBuilder for MergeTreeMemtableBuilder { - fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef { - let id = self.id.fetch_add(1, Ordering::Relaxed); + fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { Arc::new(MergeTreeMemtable::new( id, metadata.clone(), diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index f2bbe2030d..38ad4f328a 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -15,7 +15,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -52,7 +52,6 @@ const INITIAL_BUILDER_CAPACITY: usize = 0; /// Builder to build [TimeSeriesMemtable]. #[derive(Debug, Default)] pub struct TimeSeriesMemtableBuilder { - id: AtomicU32, write_buffer_manager: Option, } @@ -60,15 +59,13 @@ impl TimeSeriesMemtableBuilder { /// Creates a new builder with specific `write_buffer_manager`. pub fn new(write_buffer_manager: Option) -> Self { Self { - id: AtomicU32::new(0), write_buffer_manager, } } } impl MemtableBuilder for TimeSeriesMemtableBuilder { - fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef { - let id = self.id.fetch_add(1, Ordering::Relaxed); + fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { Arc::new(TimeSeriesMemtable::new( metadata.clone(), id, @@ -211,7 +208,7 @@ impl Memtable for TimeSeriesMemtable { &self, projection: Option<&[ColumnId]>, filters: Option, - ) -> BoxedBatchIterator { + ) -> Result { let projection = if let Some(projection) = projection { projection.iter().copied().collect() } else { @@ -221,15 +218,18 @@ impl Memtable for TimeSeriesMemtable { .collect() }; - Box::new(self.series_set.iter_series(projection, filters)) + let iter = self.series_set.iter_series(projection, filters); + Ok(Box::new(iter)) } fn is_empty(&self) -> bool { self.series_set.series.read().unwrap().is_empty() } - fn mark_immutable(&self) { + fn freeze(&self) -> Result<()> { self.alloc_tracker.done_allocating(); + + Ok(()) } fn stats(&self) -> MemtableStats { @@ -257,6 +257,14 @@ impl Memtable for TimeSeriesMemtable { time_range: Some((min_timestamp, max_timestamp)), } } + + fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(TimeSeriesMemtable::new( + metadata.clone(), + id, + self.alloc_tracker.write_buffer_manager(), + )) + } } type SeriesRwLockMap = RwLock, Arc>>>; @@ -1119,7 +1127,7 @@ mod tests { .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) .collect::>(); - let iter = memtable.iter(None, None); + let iter = memtable.iter(None, None).unwrap(); let read = iter .flat_map(|batch| { batch @@ -1155,7 +1163,7 @@ mod tests { let memtable = TimeSeriesMemtable::new(schema, 42, None); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(Some(&[3]), None); + let iter = memtable.iter(Some(&[3]), None).unwrap(); let mut v0_all = vec![]; diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index ce52a04379..c124370521 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use smallvec::SmallVec; +use store_api::metadata::RegionMetadataRef; +use crate::error::Result; use crate::memtable::{MemtableId, MemtableRef}; /// A version of current memtables in a region. @@ -61,17 +63,21 @@ impl MemtableVersion { /// memtable. /// /// Returns `None` if the mutable memtable is empty. - #[must_use] - pub(crate) fn freeze_mutable(&self, mutable: MemtableRef) -> Option { - debug_assert!(mutable.is_empty()); + pub(crate) fn freeze_mutable( + &self, + metadata: &RegionMetadataRef, + ) -> Result> { if self.mutable.is_empty() { // No need to freeze the mutable memtable. - return None; + return Ok(None); } // Marks the mutable memtable as immutable so it can free the memory usage from our // soft limit. - self.mutable.mark_immutable(); + self.mutable.freeze()?; + // Fork the memtable. + let mutable = self.mutable.fork(self.next_memtable_id(), metadata); + // Pushes the mutable memtable to immutable list. let immutables = self .immutables @@ -79,10 +85,10 @@ impl MemtableVersion { .cloned() .chain([self.mutable.clone()]) .collect(); - Some(MemtableVersion { + Ok(Some(MemtableVersion { mutable, immutables, - }) + })) } /// Removes memtables by ids from immutable memtables. @@ -115,4 +121,9 @@ impl MemtableVersion { pub(crate) fn is_empty(&self) -> bool { self.mutable.is_empty() && self.immutables.is_empty() } + + /// Returns the next memtable id. + pub(crate) fn next_memtable_id(&self) -> MemtableId { + self.mutable.id() + 1 + } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 2093921946..135714e91e 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -213,7 +213,7 @@ impl SeqScan { async fn build_sources(&self) -> Result> { let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len()); for mem in &self.memtables { - let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone()); + let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone())?; sources.push(Source::Iter(iter)); } for file in &self.files { diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index e4d52244ef..d634deb871 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -168,7 +168,8 @@ impl RegionOpener { let manifest_manager = RegionManifestManager::new(metadata.clone(), region_manifest_options).await?; - let mutable = self.memtable_builder.build(&metadata); + // Initial memtable id is 0. + let mutable = self.memtable_builder.build(0, &metadata); let version = VersionBuilder::new(metadata, mutable) .options(options) @@ -258,7 +259,8 @@ impl RegionOpener { access_layer.clone(), self.cache_manager.clone(), )); - let mutable = self.memtable_builder.build(&metadata); + // Initial memtable id is 0. + let mutable = self.memtable_builder.build(0, &metadata); let version = VersionBuilder::new(metadata, mutable) .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 471a85c1e3..261371640b 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -29,6 +29,7 @@ use std::time::Duration; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; +use crate::error::Result; use crate::manifest::action::RegionEdit; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef}; @@ -76,14 +77,16 @@ impl VersionControl { } /// Freezes the mutable memtable if it is not empty. - pub(crate) fn freeze_mutable(&self, builder: &MemtableBuilderRef) { + pub(crate) fn freeze_mutable(&self) -> Result<()> { let version = self.current().version; if version.memtables.mutable.is_empty() { - return; + return Ok(()); } - let new_mutable = builder.build(&version.metadata); // Safety: Immutable memtable is None. - let new_memtables = version.memtables.freeze_mutable(new_mutable).unwrap(); + let new_memtables = version + .memtables + .freeze_mutable(&version.metadata)? + .unwrap(); // Create a new version with memtable switched. let new_version = Arc::new( VersionBuilder::from_version(version) @@ -93,6 +96,8 @@ impl VersionControl { let mut version_data = self.data.write().unwrap(); version_data.version = new_version; + + Ok(()) } /// Apply edit to current version. @@ -117,7 +122,8 @@ impl VersionControl { /// Mark all opened files as deleted and set the delete marker in [VersionControlData] pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) { let version = self.current().version; - let new_mutable = memtable_builder.build(&version.metadata); + let new_mutable = + memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata); let mut data = self.data.write().unwrap(); data.is_dropped = true; @@ -133,8 +139,8 @@ impl VersionControl { /// It replaces existing mutable memtable with a memtable that uses the /// new schema. Memtables of the version must be empty. pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) { - let new_mutable = builder.build(&metadata); let version = self.current().version; + let new_mutable = builder.build(version.memtables.next_memtable_id(), &metadata); debug_assert!(version.memtables.mutable.is_empty()); debug_assert!(version.memtables.immutables().is_empty()); let new_version = Arc::new( @@ -157,7 +163,8 @@ impl VersionControl { ) { let version = self.current().version; - let new_mutable = memtable_builder.build(&version.metadata); + let new_mutable = + memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata); let new_version = Arc::new( VersionBuilder::new(version.metadata.clone(), new_mutable) .flushed_entry_id(truncated_entry_id) diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 584f213507..a640d3c20e 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -62,33 +62,34 @@ impl Memtable for EmptyMemtable { &self, _projection: Option<&[ColumnId]>, _filters: Option, - ) -> BoxedBatchIterator { - Box::new(std::iter::empty()) + ) -> Result { + Ok(Box::new(std::iter::empty())) } fn is_empty(&self) -> bool { true } - fn mark_immutable(&self) {} + fn freeze(&self) -> Result<()> { + Ok(()) + } fn stats(&self) -> MemtableStats { MemtableStats::default() } + + fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(EmptyMemtable::new(id)) + } } /// Empty memtable builder. #[derive(Debug, Default)] -pub(crate) struct EmptyMemtableBuilder { - /// Next memtable id. - next_id: AtomicU32, -} +pub(crate) struct EmptyMemtableBuilder {} impl MemtableBuilder for EmptyMemtableBuilder { - fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(EmptyMemtable::new( - self.next_id.fetch_add(1, Ordering::Relaxed), - )) + fn build(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(EmptyMemtable::new(id)) } } diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index bc8035bb9a..0653c1307d 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -105,7 +105,7 @@ impl VersionControlBuilder { pub(crate) fn build_version(&self) -> Version { let metadata = Arc::new(self.metadata.clone()); - let mutable = self.memtable_builder.build(&metadata); + let mutable = self.memtable_builder.build(0, &metadata); VersionBuilder::new(metadata, mutable) .add_files(self.file_purger.clone(), self.files.values().cloned()) .build() diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 56b1456367..e3ce1f2f63 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -143,7 +143,6 @@ impl RegionWorkerLoop { senders: Vec::new(), request_sender: self.sender.clone(), access_layer: region.access_layer.clone(), - memtable_builder: self.memtable_builder.clone(), file_purger: region.file_purger.clone(), listener: self.listener.clone(), engine_config,