From e30879f638ada4ccaa5d88ff83b58800fbe0bdcc Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 11 Nov 2022 18:02:34 +0800 Subject: [PATCH] feat: Remove memtable's time bucket (#442) * refactor: partially replace MemtableSet with Memtable Signed-off-by: Ruihang Xia * remove MemtableWithMeta and MemtableSet in non-test mod Signed-off-by: Ruihang Xia * remove dead code Signed-off-by: Ruihang Xia * make test compile :rofl: Signed-off-by: Ruihang Xia * fix broken tests Signed-off-by: Ruihang Xia * make all tests pass Signed-off-by: Ruihang Xia * fix clippys Signed-off-by: Ruihang Xia * remove redundant clone Signed-off-by: Ruihang Xia * update comment Co-authored-by: Yingwen * resolve review comment Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia Co-authored-by: Yingwen --- src/storage/benches/memtable/util/mod.rs | 2 +- src/storage/src/chunk.rs | 9 +- src/storage/src/engine.rs | 2 +- src/storage/src/flush.rs | 16 +- src/storage/src/memtable.rs | 21 +- src/storage/src/memtable/btree.rs | 4 + src/storage/src/memtable/inserter.rs | 674 ++--------------------- src/storage/src/memtable/tests.rs | 7 +- src/storage/src/memtable/version.rs | 371 ++----------- src/storage/src/region.rs | 39 +- src/storage/src/region/tests.rs | 27 +- src/storage/src/region/writer.rs | 137 ++--- src/storage/src/schema.rs | 13 - src/storage/src/schema/compat.rs | 15 +- src/storage/src/schema/projected.rs | 10 +- src/storage/src/schema/region.rs | 3 +- src/storage/src/schema/store.rs | 3 +- src/storage/src/snapshot.rs | 8 +- src/storage/src/sst/parquet.rs | 2 +- src/storage/src/test_util/config_util.rs | 2 +- src/storage/src/test_util/schema_util.rs | 14 + src/storage/src/version.rs | 54 +- 22 files changed, 267 insertions(+), 1166 deletions(-) diff --git a/src/storage/benches/memtable/util/mod.rs b/src/storage/benches/memtable/util/mod.rs index abb6a488a3..bbe7f9a0d0 100644 --- a/src/storage/benches/memtable/util/mod.rs +++ b/src/storage/benches/memtable/util/mod.rs @@ -25,5 +25,5 @@ pub fn schema_for_test() -> RegionSchemaRef { } pub fn new_memtable() -> MemtableRef { - DefaultMemtableBuilder {}.build(1, schema_for_test()) + DefaultMemtableBuilder::default().build(schema_for_test()) } diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index e816d6f60c..96bdabc196 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -7,7 +7,7 @@ use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber}; use table::predicate::Predicate; use crate::error::{self, Error, Result}; -use crate::memtable::{IterContext, MemtableRef, MemtableSet}; +use crate::memtable::{IterContext, MemtableRef}; use crate::read::{BoxedBatchReader, DedupReader, MergeReaderBuilder}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor}; @@ -100,11 +100,8 @@ impl ChunkReaderBuilder { self } - pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Self { - for (_range, mem) in memtables.iter() { - self.memtables.push(mem.clone()); - } - + pub fn pick_memtables(mut self, memtables: MemtableRef) -> Self { + self.memtables.push(memtables); self } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index b9d05b9e24..016ade9d22 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -207,7 +207,7 @@ impl EngineInner { object_store, log_store, regions: RwLock::new(Default::default()), - memtable_builder: Arc::new(DefaultMemtableBuilder {}), + memtable_builder: Arc::new(DefaultMemtableBuilder::default()), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 263d5becfe..06f2da090c 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use async_trait::async_trait; use common_telemetry::logging; -use common_time::RangeMillis; use store_api::logstore::LogStore; use store_api::storage::consts::WRITE_ROW_GROUP_SIZE; use store_api::storage::SequenceNumber; @@ -104,12 +103,6 @@ impl FlushStrategy for SizeBasedStrategy { } } -#[derive(Debug)] -pub struct MemtableWithMeta { - pub memtable: MemtableRef, - pub bucket: RangeMillis, -} - #[async_trait] pub trait FlushScheduler: Send + Sync + std::fmt::Debug { async fn schedule_flush(&self, flush_job: Box) -> Result; @@ -141,7 +134,7 @@ pub struct FlushJob { /// used to remove immutable memtables in current version. pub max_memtable_id: MemtableId, /// Memtables to be flushed. - pub memtables: Vec, + pub memtables: Vec, /// Last sequence of data to be flushed. pub flush_sequence: SequenceNumber, /// Shared data of region to be flushed. @@ -170,9 +163,14 @@ impl FlushJob { ..Default::default() }; for m in &self.memtables { + // skip empty memtable + if m.num_rows() == 0 { + continue; + } + let file_name = Self::generate_sst_file_name(); // TODO(hl): Check if random file name already exists in meta. - let iter = m.memtable.iter(&iter_ctx)?; + let iter = m.iter(&iter_ctx)?; futures.push(async move { self.sst_layer .write_sst(&file_name, iter, &WriteOptions::default()) diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index a29283498a..846a5a3ead 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -4,6 +4,7 @@ mod inserter; pub mod tests; mod version; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use datatypes::vectors::VectorRef; @@ -12,7 +13,7 @@ use store_api::storage::{consts, OpType, SequenceNumber}; use crate::error::Result; use crate::memtable::btree::BTreeMemtable; pub use crate::memtable::inserter::Inserter; -pub use crate::memtable::version::{MemtableSet, MemtableVersion}; +pub use crate::memtable::version::MemtableVersion; use crate::read::Batch; use crate::schema::{ProjectedSchemaRef, RegionSchemaRef}; @@ -36,8 +37,13 @@ pub trait Memtable: Send + Sync + std::fmt::Debug { /// Iterates the memtable. fn iter(&self, ctx: &IterContext) -> Result; - /// Returns the estimated bytes allocated by this memtable from heap. + /// Returns the estimated bytes allocated by this memtable from heap. Result + /// of this method may be larger than the estimated based on [`num_rows`] because + /// of the implementor's pre-alloc behavior. fn bytes_allocated(&self) -> usize; + + /// Return the number of rows contained in this memtable. + fn num_rows(&self) -> usize; } pub type MemtableRef = Arc; @@ -100,7 +106,7 @@ pub trait BatchIterator: Iterator> + Send + Sync { pub type BoxedBatchIterator = Box; pub trait MemtableBuilder: Send + Sync + std::fmt::Debug { - fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef; + fn build(&self, schema: RegionSchemaRef) -> MemtableRef; } pub type MemtableBuilderRef = Arc; @@ -140,11 +146,14 @@ impl KeyValues { } } -#[derive(Debug)] -pub struct DefaultMemtableBuilder; +#[derive(Debug, Default)] +pub struct DefaultMemtableBuilder { + memtable_id: AtomicU32, +} impl MemtableBuilder for DefaultMemtableBuilder { - fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef { + fn build(&self, schema: RegionSchemaRef) -> MemtableRef { + let id = self.memtable_id.fetch_add(1, Ordering::Relaxed); Arc::new(BTreeMemtable::new(id, schema)) } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index efd5fe1cd9..f6b6e9b603 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -78,6 +78,10 @@ impl Memtable for BTreeMemtable { fn bytes_allocated(&self) -> usize { self.estimated_bytes.load(AtomicOrdering::Relaxed) } + + fn num_rows(&self) -> usize { + self.map.read().unwrap().len() + } } struct BTreeIterator { diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 0baf4f2a52..d63a7c9a37 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -1,68 +1,40 @@ -use std::collections::HashMap; -use std::time::Duration; - -use common_time::timestamp_millis::BucketAligned; -use common_time::{RangeMillis, TimestampMillis}; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::ScalarVector; -use datatypes::schema::SchemaRef; -use datatypes::vectors::{Int64Vector, TimestampVector, VectorRef}; +use datatypes::vectors::VectorRef; use snafu::OptionExt; use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber}; -use crate::error::{self, IllegalTimestampColumnTypeSnafu, Result}; -use crate::memtable::{KeyValues, Memtable, MemtableSet}; +use super::MemtableRef; +use crate::error::{self, Result}; +use crate::memtable::KeyValues; use crate::write_batch::{Mutation, PutData, WriteBatch}; -type RangeIndexMap = HashMap; - /// Wraps logic of inserting key/values in [WriteBatch] to [Memtable]. pub struct Inserter { /// Sequence of the batch to be inserted. sequence: SequenceNumber, - /// Time ranges of all input data. - time_ranges: Vec, - /// Map time range's start time to its index in time ranges. - time_range_indexes: RangeIndexMap, - /// Bucket duration of memtables. - bucket_duration: Duration, /// Used to calculate the start index in batch for `KeyValues`. index_in_batch: usize, } impl Inserter { - pub fn new( - sequence: SequenceNumber, - time_ranges: Vec, - bucket_duration: Duration, - ) -> Inserter { - let time_range_indexes = new_range_index_map(&time_ranges); - + pub fn new(sequence: SequenceNumber) -> Inserter { Inserter { sequence, - time_ranges, - time_range_indexes, - bucket_duration, index_in_batch: 0, } } // TODO(yingwen): Can we take the WriteBatch? - /// Insert write batch into memtables if both `batch` and `memtables` are not empty. + /// Insert write batch into memtable. /// - /// Won't do schema validation, caller (mostly the [`RegionWriter`]) should ensure the - /// schemas of `memtables` are consistent with `batch`'s, and the time ranges of `memtables` - /// are consistent with `self`'s time ranges. - /// - /// # Panics - /// Panics if there is time range in `self.time_ranges` but not in `memtables`. - pub fn insert_memtables(&mut self, batch: &WriteBatch, memtables: &MemtableSet) -> Result<()> { - if batch.is_empty() || memtables.is_empty() { + /// Won't do schema validation if not configured. Caller (mostly the [`RegionWriter`]) should ensure the + /// schemas of `memtable` are consistent with `batch`'s. + pub fn insert_memtable(&mut self, batch: &WriteBatch, memtable: &MemtableRef) -> Result<()> { + if batch.is_empty() { return Ok(()); } - // Only validate schema in debug mod. - validate_input_and_memtable_schemas(batch, memtables); + // This function only makes effect in debug mode. + validate_input_and_memtable_schemas(batch, memtable); // Enough to hold all key or value columns. let total_column_num = batch.schema().num_columns(); @@ -78,7 +50,7 @@ impl Inserter { for mutation in batch { match mutation { Mutation::Put(put_data) => { - self.put_memtables(batch.schema(), put_data, memtables, &mut kvs)?; + self.write_one_mutation(put_data, memtable, &mut kvs)?; } } } @@ -86,27 +58,10 @@ impl Inserter { Ok(()) } - fn put_memtables( - &mut self, - schema: &SchemaRef, - put_data: &PutData, - memtables: &MemtableSet, - kvs: &mut KeyValues, - ) -> Result<()> { - if memtables.len() == 1 { - // Fast path, only one memtable to put. - let (_range, memtable) = memtables.iter().next().unwrap(); - return self.put_one_memtable(put_data, &**memtable, kvs); - } - - // Split data by time range and put them into memtables. - self.put_multiple_memtables(schema, put_data, memtables, kvs) - } - - fn put_one_memtable( + fn write_one_mutation( &mut self, put_data: &PutData, - memtable: &dyn Memtable, + memtable: &MemtableRef, kvs: &mut KeyValues, ) -> Result<()> { let schema = memtable.schema(); @@ -128,86 +83,19 @@ impl Inserter { Ok(()) } - - /// Put data to multiple memtables. - fn put_multiple_memtables( - &mut self, - schema: &SchemaRef, - put_data: &PutData, - memtables: &MemtableSet, - kvs: &mut KeyValues, - ) -> Result<()> { - let timestamp_schema = schema - .timestamp_column() - .context(error::BatchMissingTimestampSnafu)?; - - let timestamps = put_data.column_by_name(×tamp_schema.name).context( - error::BatchMissingColumnSnafu { - column: ×tamp_schema.name, - }, - )?; - - let slice_indexes = match timestamps.data_type() { - ConcreteDataType::Int64(_) => { - let timestamps: &Int64Vector = timestamps - .as_any() - .downcast_ref() - .context(error::BatchMissingTimestampSnafu)?; - let iter = timestamps.iter_data(); - compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes) - } - ConcreteDataType::Timestamp(_) => { - let timestamps: &TimestampVector = timestamps - .as_any() - .downcast_ref() - .context(error::BatchMissingTimestampSnafu)?; - let iter = timestamps.iter_data().map(|v| v.map(|v| v.value())); - compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes) - } - _ => { - return IllegalTimestampColumnTypeSnafu { - data_type: timestamps.data_type(), - } - .fail(); - } - }; - - for slice_index in slice_indexes { - let sliced_data = put_data.slice(slice_index.start, slice_index.end); - let range = &self.time_ranges[slice_index.range_index]; - // The caller should ensure memtable for given time range is exists. - let memtable = memtables - .get_by_range(range) - .expect("Memtable not found for range"); - - self.put_one_memtable(&sliced_data, &**memtable, kvs)?; - } - - Ok(()) - } } -fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtables: &MemtableSet) { +fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtable: &MemtableRef) { if cfg!(debug_assertions) { let batch_schema = batch.schema(); - for (_, memtable) in memtables.iter() { - let memtable_schema = memtable.schema(); - let user_schema = memtable_schema.user_schema(); - debug_assert_eq!(batch_schema.version(), user_schema.version()); - // Only validate column schemas. - debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas()); - } + let memtable_schema = memtable.schema(); + let user_schema = memtable_schema.user_schema(); + debug_assert_eq!(batch_schema.version(), user_schema.version()); + // Only validate column schemas. + debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas()); } } -fn new_range_index_map(time_ranges: &[RangeMillis]) -> RangeIndexMap { - time_ranges - .iter() - .enumerate() - .map(|(i, range)| (*range.start(), i)) - .collect() -} - fn clone_put_data_column_to( put_data: &PutData, desc: &ColumnDescriptor, @@ -231,441 +119,22 @@ struct SliceIndex { range_index: usize, } -/// Computes the indexes used to split timestamps into time ranges aligned by `duration`, stores -/// the indexes in [`SliceIndex`]. -/// -/// # Panics -/// Panics if the duration is too large to be represented by i64, or `timestamps` are not all -/// included by `time_range_indexes`. -fn compute_slice_indices>>( - timestamps: I, - duration: Duration, - time_range_indexes: &RangeIndexMap, -) -> Vec { - let duration_ms = duration - .as_millis() - .try_into() - .unwrap_or_else(|e| panic!("Duration {:?} too large, {}", duration, e)); - - let mut slice_indexes = Vec::with_capacity(time_range_indexes.len()); - // Current start and end of a valid `SliceIndex`. - let (mut start, mut end) = (0, 0); - // Time range index of the valid but unpushed `SliceIndex`. - let mut last_range_index = None; - - // Iterate all timestamps, split timestamps by its time range. - for (i, ts) in timestamps.enumerate() { - // Find index for time range of the timestamp. - - let current_range_index = ts - .and_then(|v| v.align_by_bucket(duration_ms)) - .and_then(|aligned| time_range_indexes.get(&aligned).copied()); - - match current_range_index { - Some(current_range_index) => { - end = i; - - match last_range_index { - Some(last_index) => { - if last_index != current_range_index { - // Found a new range, we need to push a SliceIndex for last range. - slice_indexes.push(SliceIndex { - start, - end, - range_index: last_index, - }); - // Update last range index. - last_range_index = Some(current_range_index); - // Advance start. - start = i; - } - } - // No previous range index. - None => last_range_index = Some(current_range_index), - } - } - None => { - // Row without timestamp or out of time range will be skipped. This usually should not happen. - if let Some(last_index) = last_range_index { - // Need to store SliceIndex for last range. - slice_indexes.push(SliceIndex { - start, - end: i, - range_index: last_index, - }); - // Clear last range index. - last_range_index = None; - } - - // Advances start and end, skips current row. - start = i + 1; - end = start; - } - } - } - - // Process last slice index. - if let Some(last_index) = last_range_index { - slice_indexes.push(SliceIndex { - start, - // We need to use `end + 1` to include the last element. - end: end + 1, - range_index: last_index, - }); - } - - slice_indexes -} - #[cfg(test)] mod tests { use std::sync::Arc; use common_time::timestamp::Timestamp; - use datatypes::prelude::ScalarVectorBuilder; - use datatypes::vectors::{ - Int64Vector, Int64VectorBuilder, TimestampVector, TimestampVectorBuilder, - }; + use datatypes::vectors::{Int64Vector, TimestampVector}; use datatypes::{type_id::LogicalTypeId, value::Value}; use store_api::storage::{PutOperation, WriteRequest}; use super::*; - use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId}; + use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder}; use crate::metadata::RegionMetadata; use crate::schema::RegionSchemaRef; use crate::test_util::descriptor_util::RegionDescBuilder; use crate::test_util::write_batch_util; - fn new_time_ranges(starts: &[i64], duration: i64) -> Vec { - let mut ranges = Vec::with_capacity(starts.len()); - for start in starts { - assert_eq!(*start, start / duration * duration); - - ranges.push(RangeMillis::new(*start, start + duration).unwrap()); - } - - ranges - } - - fn check_compute_slice_indexes( - timestamps: &[Option], - range_starts: &[i64], - duration: i64, - expect: &[SliceIndex], - ) { - assert!(duration > 0); - - let mut builder = TimestampVectorBuilder::with_capacity(0); - for v in timestamps { - builder.push(v.map(common_time::timestamp::Timestamp::from_millis)); - } - - let ts_vec = builder.finish(); - - let time_ranges = new_time_ranges(range_starts, duration); - let time_range_indexes = new_range_index_map(&time_ranges); - - let slice_indexes = compute_slice_indices( - ts_vec.iter_data().map(|v| v.map(|v| v.value())), - Duration::from_millis(duration as u64), - &time_range_indexes, - ); - - assert_eq!(expect, slice_indexes); - } - - #[test] - fn test_check_compute_slice_indexes_i64() { - let timestamps = &[Some(99), Some(13), Some(18), Some(234)]; - let range_starts = &[0, 200]; - let duration = 100; - - let mut builder = Int64VectorBuilder::with_capacity(timestamps.len()); - for v in timestamps { - builder.push(*v); - } - - let ts_vec = builder.finish(); - - let time_ranges = new_time_ranges(range_starts, duration); - let time_range_indexes = new_range_index_map(&time_ranges); - - let slice_indexes = compute_slice_indices( - ts_vec.iter_data(), - Duration::from_millis(duration as u64), - &time_range_indexes, - ); - assert_eq!( - vec![ - SliceIndex { - start: 0, - end: 3, - range_index: 0, - }, - SliceIndex { - start: 3, - end: 4, - range_index: 1, - }, - ], - slice_indexes - ); - } - - #[test] - fn test_check_compute_slice_indexes_timestamp() { - let timestamps = &[Some(99), Some(13), Some(18), Some(234)]; - let range_starts = &[0, 200]; - let duration = 100; - - let mut builder = TimestampVectorBuilder::with_capacity(timestamps.len()); - for v in timestamps { - builder.push(v.map(Timestamp::from_millis)); - } - - let ts_vec = builder.finish(); - - let time_ranges = new_time_ranges(range_starts, duration); - let time_range_indexes = new_range_index_map(&time_ranges); - - let slice_indexes = compute_slice_indices( - ts_vec.iter_data().map(|v| v.map(|v| v.value())), - Duration::from_millis(duration as u64), - &time_range_indexes, - ); - assert_eq!( - vec![ - SliceIndex { - start: 0, - end: 3, - range_index: 0, - }, - SliceIndex { - start: 3, - end: 4, - range_index: 1, - }, - ], - slice_indexes - ); - } - - #[test] - fn test_compute_slice_indexes_valid() { - // Test empty input. - check_compute_slice_indexes(&[], &[], 100, &[]); - - // One valid input. - check_compute_slice_indexes( - &[Some(99)], - &[0], - 100, - &[SliceIndex { - start: 0, - end: 1, - range_index: 0, - }], - ); - - // 2 ranges. - check_compute_slice_indexes( - &[Some(99), Some(234)], - &[0, 200], - 100, - &[ - SliceIndex { - start: 0, - end: 1, - range_index: 0, - }, - SliceIndex { - start: 1, - end: 2, - range_index: 1, - }, - ], - ); - - // Multiple elements in first range. - check_compute_slice_indexes( - &[Some(99), Some(13), Some(18), Some(234)], - &[0, 200], - 100, - &[ - SliceIndex { - start: 0, - end: 3, - range_index: 0, - }, - SliceIndex { - start: 3, - end: 4, - range_index: 1, - }, - ], - ); - - // Multiple elements in last range. - check_compute_slice_indexes( - &[Some(99), Some(234), Some(271)], - &[0, 200], - 100, - &[ - SliceIndex { - start: 0, - end: 1, - range_index: 0, - }, - SliceIndex { - start: 1, - end: 3, - range_index: 1, - }, - ], - ); - - // Mulitple ranges. - check_compute_slice_indexes( - &[Some(99), Some(13), Some(234), Some(456)], - &[0, 200, 400], - 100, - &[ - SliceIndex { - start: 0, - end: 2, - range_index: 0, - }, - SliceIndex { - start: 2, - end: 3, - range_index: 1, - }, - SliceIndex { - start: 3, - end: 4, - range_index: 2, - }, - ], - ); - - // Different slices with same range. - check_compute_slice_indexes( - &[Some(99), Some(234), Some(15)], - &[0, 200], - 100, - &[ - SliceIndex { - start: 0, - end: 1, - range_index: 0, - }, - SliceIndex { - start: 1, - end: 2, - range_index: 1, - }, - SliceIndex { - start: 2, - end: 3, - range_index: 0, - }, - ], - ); - } - - #[test] - fn test_compute_slice_indexes_null_timestamp() { - check_compute_slice_indexes(&[None], &[0], 100, &[]); - - check_compute_slice_indexes( - &[None, None, Some(53)], - &[0], - 100, - &[SliceIndex { - start: 2, - end: 3, - range_index: 0, - }], - ); - - check_compute_slice_indexes( - &[Some(53), None, None], - &[0], - 100, - &[SliceIndex { - start: 0, - end: 1, - range_index: 0, - }], - ); - - check_compute_slice_indexes( - &[None, Some(53), None, Some(240), Some(13), None], - &[0, 200], - 100, - &[ - SliceIndex { - start: 1, - end: 2, - range_index: 0, - }, - SliceIndex { - start: 3, - end: 4, - range_index: 1, - }, - SliceIndex { - start: 4, - end: 5, - range_index: 0, - }, - ], - ); - } - - #[test] - fn test_compute_slice_indexes_no_range() { - check_compute_slice_indexes( - &[Some(99), Some(234), Some(15)], - &[0], - 100, - &[ - SliceIndex { - start: 0, - end: 1, - range_index: 0, - }, - SliceIndex { - start: 2, - end: 3, - range_index: 0, - }, - ], - ); - - check_compute_slice_indexes( - &[Some(99), Some(15), Some(234)], - &[0], - 100, - &[SliceIndex { - start: 0, - end: 2, - range_index: 0, - }], - ); - - check_compute_slice_indexes( - &[Some(-1), Some(99), Some(15)], - &[0], - 100, - &[SliceIndex { - start: 1, - end: 3, - range_index: 0, - }], - ); - } - fn new_test_write_batch() -> WriteBatch { write_batch_util::new_write_batch( &[ @@ -697,18 +166,8 @@ mod tests { batch.put(put_data).unwrap(); } - fn new_memtable_set(time_ranges: &[RangeMillis], schema: &RegionSchemaRef) -> MemtableSet { - let mut set = MemtableSet::new(); - for (id, range) in time_ranges.iter().enumerate() { - let mem = DefaultMemtableBuilder {}.build(id as MemtableId, schema.clone()); - set.insert(*range, mem) - } - - set - } - fn check_memtable_content( - mem: &dyn Memtable, + mem: &MemtableRef, sequence: SequenceNumber, data: &[(i64, Option)], ) { @@ -735,15 +194,9 @@ mod tests { #[test] fn test_inserter_put_one_memtable() { let sequence = 11111; - let bucket_duration = 100; - let time_ranges = new_time_ranges(&[0], bucket_duration); let memtable_schema = new_region_schema(); - let memtables = new_memtable_set(&time_ranges, &memtable_schema); - let mut inserter = Inserter::new( - sequence, - time_ranges, - Duration::from_millis(bucket_duration as u64), - ); + let mutable_memtable = DefaultMemtableBuilder::default().build(memtable_schema); + let mut inserter = Inserter::new(sequence); let mut batch = new_test_write_batch(); put_batch(&mut batch, &[(1, Some(1)), (2, None)]); @@ -752,77 +205,28 @@ mod tests { &mut batch, &[ (3, None), - // Duplicate entries in same put data. - (2, None), + (2, None), // Duplicate entries in same put data. (2, Some(2)), (4, Some(4)), - ], - ); - - inserter.insert_memtables(&batch, &memtables).unwrap(); - let mem = memtables - .get_by_range(&RangeMillis::new(0, 100).unwrap()) - .unwrap(); - check_memtable_content( - &**mem, - sequence, - &[(1, Some(1)), (2, Some(2)), (3, None), (4, Some(4))], - ); - } - - #[test] - fn test_inserter_put_multiple() { - let sequence = 11111; - let bucket_duration = 100; - let time_ranges = new_time_ranges(&[0, 100, 200], bucket_duration); - let memtable_schema = new_region_schema(); - let memtables = new_memtable_set(&time_ranges, &memtable_schema); - let mut inserter = Inserter::new( - sequence, - time_ranges, - Duration::from_millis(bucket_duration as u64), - ); - - let mut batch = new_test_write_batch(); - put_batch( - &mut batch, - &[ - (1, Some(1)), - (2, None), (201, Some(201)), (102, None), (101, Some(101)), ], ); - put_batch( - &mut batch, + + inserter.insert_memtable(&batch, &mutable_memtable).unwrap(); + check_memtable_content( + &mutable_memtable, + sequence, &[ - (180, Some(1)), - (3, Some(3)), - (1, None), - (211, Some(211)), - (180, Some(180)), + (1, Some(1)), + (2, Some(2)), + (3, None), + (4, Some(4)), + (101, Some(101)), + (102, None), + (201, Some(201)), ], ); - - inserter.insert_memtables(&batch, &memtables).unwrap(); - let mem = memtables - .get_by_range(&RangeMillis::new(0, 100).unwrap()) - .unwrap(); - check_memtable_content(&**mem, sequence, &[(1, None), (2, None), (3, Some(3))]); - - let mem = memtables - .get_by_range(&RangeMillis::new(100, 200).unwrap()) - .unwrap(); - check_memtable_content( - &**mem, - sequence, - &[(101, Some(101)), (102, None), (180, Some(180))], - ); - - let mem = memtables - .get_by_range(&RangeMillis::new(200, 300).unwrap()) - .unwrap(); - check_memtable_content(&**mem, sequence, &[(201, Some(201)), (211, Some(211))]); } } diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 9f4a9c742e..4f5d773b3e 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -10,9 +10,6 @@ use crate::metadata::RegionMetadata; use crate::schema::{ProjectedSchema, RegionSchemaRef}; use crate::test_util::descriptor_util::RegionDescBuilder; -// For simplicity, all memtables in test share same memtable id. -const MEMTABLE_ID: MemtableId = 1; - // Schema for testing memtable: // - key: Int64(timestamp), UInt64(version), // - value: UInt64, UInt64 @@ -157,7 +154,7 @@ impl Default for MemtableTester { impl MemtableTester { fn new() -> MemtableTester { let schema = schema_for_test(); - let builders = vec![Arc::new(DefaultMemtableBuilder {}) as _]; + let builders = vec![Arc::new(DefaultMemtableBuilder::default()) as _]; MemtableTester { schema, builders } } @@ -165,7 +162,7 @@ impl MemtableTester { fn new_memtables(&self) -> Vec { self.builders .iter() - .map(|b| b.build(MEMTABLE_ID, self.schema.clone())) + .map(|b| b.build(self.schema.clone())) .collect() } diff --git a/src/storage/src/memtable/version.rs b/src/storage/src/memtable/version.rs index cd4411f869..9406f3869b 100644 --- a/src/storage/src/memtable/version.rs +++ b/src/storage/src/memtable/version.rs @@ -1,49 +1,50 @@ use std::cmp::Ordering; -use std::collections::BTreeMap; -use std::sync::Arc; use common_time::RangeMillis; -use crate::flush::MemtableWithMeta; use crate::memtable::{MemtableId, MemtableRef}; /// A version of all memtables. /// /// This structure is immutable now. -#[derive(Default, Debug, PartialEq, Eq)] +#[derive(Debug)] pub struct MemtableVersion { - mutable: MemtableSet, + mutable: MemtableRef, /// Immutable memtables. - immutables: Vec, + immutables: Vec, } impl MemtableVersion { - pub fn new() -> MemtableVersion { - MemtableVersion::default() + pub fn new(mutable: MemtableRef) -> MemtableVersion { + Self { + mutable, + immutables: vec![], + } } #[inline] - pub fn mutable_memtables(&self) -> &MemtableSet { + pub fn mutable_memtable(&self) -> &MemtableRef { &self.mutable } #[inline] - pub fn immutable_memtables(&self) -> &[MemtableSetRef] { + pub fn immutable_memtables(&self) -> &[MemtableRef] { &self.immutables } pub fn num_memtables(&self) -> usize { - self.mutable.len() + self.immutables.iter().map(|set| set.len()).sum::() + // the last `1` is for `mutable` + self.immutable_memtables().len() + 1 } /// Clone current memtable version and freeze its mutable memtables, which moves /// all mutable memtables to immutable memtable list. - pub fn freeze_mutable(&self) -> MemtableVersion { + pub fn freeze_mutable(&self, new_mutable: MemtableRef) -> MemtableVersion { let mut immutables = self.immutables.clone(); - immutables.push(Arc::new(self.mutable.clone())); + immutables.push(self.mutable.clone()); MemtableVersion { - mutable: MemtableSet::new(), + mutable: new_mutable, immutables, } } @@ -60,26 +61,13 @@ impl MemtableVersion { + self.mutable.bytes_allocated() } - /// Creates a new `MemtableVersion` that contains memtables both in this and `other`. - /// - /// # Panics - /// Panics if there are memtables with same time ranges. - pub fn add_mutable(&self, other: MemtableSet) -> MemtableVersion { - let mutable = self.mutable.add(other); - - Self { - mutable, - immutables: self.immutables.clone(), - } - } - /// Creates a new `MemtableVersion` that removes immutable memtables /// less than or equal to max_memtable_id. pub fn remove_immutables(&self, max_memtable_id: MemtableId) -> MemtableVersion { let immutables = self .immutables .iter() - .filter(|immem| immem.max_memtable_id() > max_memtable_id) + .filter(|immem| immem.id() > max_memtable_id) .cloned() .collect(); @@ -89,17 +77,9 @@ impl MemtableVersion { } } - pub fn memtables_to_flush(&self) -> (Option, Vec) { - let max_memtable_id = self - .immutables - .iter() - .map(|immem| immem.max_memtable_id()) - .max(); - let memtables = self - .immutables - .iter() - .flat_map(|immem| immem.to_memtable_with_metas()) - .collect(); + pub fn memtables_to_flush(&self) -> (Option, Vec) { + let max_memtable_id = self.immutables.iter().map(|immem| immem.id()).max(); + let memtables = self.immutables.clone(); (max_memtable_id, memtables) } @@ -124,299 +104,44 @@ impl PartialOrd for RangeKey { } } -/// Collection of mutable memtables. -/// -/// Memtables are partitioned by their time range. Caller should ensure -/// there are no overlapped ranges and all ranges are aligned by same -/// bucket duration. -#[derive(Default, Clone, Debug)] -pub struct MemtableSet { - memtables: BTreeMap, - max_memtable_id: MemtableId, -} - -pub type MemtableSetRef = Arc; - -impl PartialEq for MemtableSet { - fn eq(&self, other: &MemtableSet) -> bool { - self.max_memtable_id == other.max_memtable_id - && self.memtables.len() == other.memtables.len() - && self - .memtables - .iter() - .zip(&other.memtables) - .all(|(a, b)| a.0 == b.0 && a.1.id() == b.1.id() && a.1.schema() == b.1.schema()) - } -} - -impl Eq for MemtableSet {} - -impl MemtableSet { - pub fn new() -> MemtableSet { - MemtableSet::default() - } - - /// Get memtable by time range. - /// - /// The range must exactly equal to the range of the memtable, otherwise `None` - /// is returned. - pub fn get_by_range(&self, range: &RangeMillis) -> Option<&MemtableRef> { - let range_key = RangeKey(*range); - self.memtables.get(&range_key) - } - - /// Insert a new memtable. - /// - /// # Panics - /// Panics if memtable with same range already exists. - pub fn insert(&mut self, range: RangeMillis, mem: MemtableRef) { - self.max_memtable_id = MemtableId::max(self.max_memtable_id, mem.id()); - let old = self.memtables.insert(RangeKey(range), mem); - assert!(old.is_none()); - } - - /// Returns number of memtables in the set. - #[inline] - pub fn len(&self) -> usize { - self.memtables.len() - } - - /// Returns true if there is no memtable in the set. - #[inline] - pub fn is_empty(&self) -> bool { - self.memtables.is_empty() - } - - pub fn bytes_allocated(&self) -> usize { - self.memtables.values().map(|m| m.bytes_allocated()).sum() - } - - pub fn max_memtable_id(&self) -> MemtableId { - self.max_memtable_id - } - - /// Creates a new `MemtableSet` that contains memtables both in `self` and - /// `other`, let `self` unchanged. - pub fn add(&self, mut other: MemtableSet) -> MemtableSet { - // We use `other.memtables` to extend `self.memtables` since memtables - // in other should be empty in usual, so overwriting it is okay. - other - .memtables - .extend(self.memtables.iter().map(|(k, v)| (*k, v.clone()))); - - MemtableSet { - memtables: other.memtables, - max_memtable_id: MemtableId::max(self.max_memtable_id, other.max_memtable_id), - } - } - - pub fn to_memtable_with_metas(&self) -> Vec { - self.memtables - .iter() - .map(|(range_key, memtable)| MemtableWithMeta { - memtable: memtable.clone(), - bucket: range_key.0, - }) - .collect() - } - - pub fn iter(&self) -> impl Iterator { - self.memtables.iter().map(|(k, v)| (&k.0, v)) - } -} - #[cfg(test)] mod tests { - use store_api::storage::OpType; + use std::sync::Arc; use super::*; - use crate::memtable::tests; - use crate::memtable::BTreeMemtable; - use crate::memtable::Memtable; + use crate::memtable::DefaultMemtableBuilder; + use crate::memtable::MemtableBuilder; + use crate::test_util::schema_util; #[test] - fn test_memtableset_misc() { - let mut set = MemtableSet::new(); + fn test_memtable_version() { + let memtable_builder = DefaultMemtableBuilder::default(); + let region_schema = Arc::new(schema_util::new_region_schema(1, 1)); - assert!(set.is_empty()); - assert_eq!(0, set.max_memtable_id()); - assert_eq!(0, set.bytes_allocated()); - assert!(set - .get_by_range(&RangeMillis::new(0, 10).unwrap()) - .is_none()); + let memtable_1 = memtable_builder.build(region_schema.clone()); + let v1 = MemtableVersion::new(memtable_1); + assert_eq!(1, v1.num_memtables()); - set.insert( - RangeMillis::new(0, 10).unwrap(), - Arc::new(BTreeMemtable::new(0, tests::schema_for_test())), - ); - set.insert( - RangeMillis::new(10, 20).unwrap(), - Arc::new(BTreeMemtable::new(1, tests::schema_for_test())), - ); - let memtable = Arc::new(BTreeMemtable::new(2, tests::schema_for_test())); - // Write some test data - tests::write_kvs( - &*memtable, - 10, // sequence - OpType::Put, - &[ - (1000, 1), - (1000, 2), - (2002, 1), - (2003, 1), - (2003, 5), - (1001, 1), - ], // keys - &[ - (Some(1), None), - (Some(2), None), - (Some(7), None), - (Some(8), None), - (Some(9), None), - (Some(3), None), - ], // values - ); + // Freeze and add new mutable. + let memtable_2 = memtable_builder.build(region_schema.clone()); + let v2 = v1.freeze_mutable(memtable_2); + let v2_immutables = v2.immutable_memtables(); + assert_eq!(1, v2_immutables.len()); + assert_eq!(0, v2_immutables[0].id()); + assert_eq!(1, v2.mutable_memtable().id()); + assert_eq!(2, v2.num_memtables()); - set.insert(RangeMillis::new(20, 30).unwrap(), memtable.clone()); + // Add another one and check immutable memtables that need flush + let memtable_3 = memtable_builder.build(region_schema); + let v3 = v2.freeze_mutable(memtable_3); + let (max_table_id, immutables) = v3.memtables_to_flush(); + assert_eq!(1, max_table_id.unwrap()); + assert_eq!(2, immutables.len()); - for (i, (range, _)) in set.iter().enumerate() { - assert_eq!( - *range, - RangeMillis::new(i as i64 * 10, i as i64 * 10 + 10).unwrap() - ); - } - - assert!(!set.is_empty()); - assert_eq!(2, set.max_memtable_id()); - assert_eq!(memtable.bytes_allocated(), set.bytes_allocated()); - assert!(set - .get_by_range(&RangeMillis::new(0, 10).unwrap()) - .is_some()); - assert!(set - .get_by_range(&RangeMillis::new(10, 20).unwrap()) - .is_some()); - assert!(set - .get_by_range(&RangeMillis::new(20, 30).unwrap()) - .is_some()); - assert!(set - .get_by_range(&RangeMillis::new(0, 100).unwrap()) - .is_none()); - } - - fn create_test_memtableset(ids: &[MemtableId]) -> MemtableSet { - let mut set = MemtableSet::new(); - - for id in ids { - let i = *id as i64; - set.insert( - RangeMillis::new(i * 10, (i + 1) * 10).unwrap(), - Arc::new(BTreeMemtable::new(*id, tests::schema_for_test())), - ); - } - - set - } - - #[test] - fn test_add_memtableset() { - let s1 = create_test_memtableset(&[0, 1, 2]); - let s2 = create_test_memtableset(&[3, 4, 5, 6]); - - let mut s1_memtables = s1.to_memtable_with_metas(); - let s2_memtables = s2.to_memtable_with_metas(); - s1_memtables.extend(s2_memtables); - - let empty = create_test_memtableset(&[]); - assert_eq!(s1, s1.add(empty)); - - let s3 = s1.add(s2); - assert_ne!(s1, s3); - - assert_eq!(7, s3.memtables.len()); - let s3_memtables = s3.to_memtable_with_metas(); - assert_eq!(7, s3_memtables.len()); - - for i in 0..7 { - assert_eq!(s1_memtables[i].bucket, s3_memtables[i].bucket); - assert_eq!(s1_memtables[i].memtable.id(), s3_memtables[i].memtable.id()); - } - assert_eq!(6, s3.max_memtable_id()); - } - - #[test] - fn test_memtableversion() { - let s1 = create_test_memtableset(&[0, 1, 2]); - let s2 = create_test_memtableset(&[3, 4, 5, 6]); - let s3 = s1.add(s2.clone()); - - let v1 = MemtableVersion::new(); - assert!(v1.mutable_memtables().is_empty()); - assert_eq!(0, v1.num_memtables()); - - // Add one mutable - let v2 = v1.add_mutable(s1.clone()); - assert_ne!(v1, v2); - let mutables = v2.mutable_memtables(); - assert_eq!(s1, *mutables); - assert_eq!(3, v2.num_memtables()); - - // Add another mutable - let v3 = v2.add_mutable(s2); - assert_ne!(v1, v3); - assert_ne!(v2, v3); - let mutables = v3.mutable_memtables(); - assert_eq!(s3, *mutables); - assert!(v3.memtables_to_flush().1.is_empty()); - assert_eq!(7, v3.num_memtables()); - - // Try to freeze s1, s2 - let v4 = v3.freeze_mutable(); - assert_ne!(v1, v4); - assert_ne!(v2, v4); - assert_ne!(v3, v4); - assert!(v4.mutable_memtables().is_empty()); - assert_eq!(v4.immutables.len(), 1); - assert_eq!(v4.immutables[0], Arc::new(s3.clone())); - - let (max_id, tables) = v4.memtables_to_flush(); - assert_eq!(6, max_id.unwrap()); - assert_eq!(7, tables.len()); - assert_eq!(7, v4.num_memtables()); - - // Add another mutable - let s4 = create_test_memtableset(&[7, 8]); - let v5 = v4.add_mutable(s4.clone()); - let mutables = v5.mutable_memtables(); - assert_eq!(s4, *mutables); - assert_eq!(v4.immutables, v5.immutables); - - // Try to freeze s4 - let v6 = v5.freeze_mutable(); - assert_eq!(v6.immutables.len(), 2); - assert_eq!(v6.immutables[0], Arc::new(s3)); - assert_eq!(v6.immutables[1], Arc::new(s4.clone())); - - let (max_id, tables) = v6.memtables_to_flush(); - assert_eq!(8, max_id.unwrap()); - assert_eq!(9, tables.len()); - assert_eq!(9, v6.num_memtables()); - // verify tables - for (i, table) in tables.iter().enumerate() { - assert_eq!(i as u32, table.memtable.id()); - let i = i as i64; - assert_eq!( - table.bucket, - RangeMillis::new(i * 10, (i + 1) * 10).unwrap() - ); - } - - // Remove tables - let v7 = v6.remove_immutables(6); - assert_eq!(v7.immutables.len(), 1); - assert_eq!(v7.immutables[0], Arc::new(s4)); - - let v8 = v7.remove_immutables(8); - assert_eq!(v8.immutables.len(), 0); - assert_eq!(0, v8.num_memtables()); + // Remove memtables + let v4 = v3.remove_immutables(1); + assert_eq!(1, v4.num_memtables()); + assert_eq!(0, v4.immutable_memtables().len()); + assert_eq!(2, v4.mutable_memtable().id()); } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index d94d36f4b3..a11a3c4762 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -21,7 +21,7 @@ use crate::manifest::{ region::RegionManifest, }; use crate::memtable::MemtableBuilderRef; -use crate::metadata::{RegionMetaImpl, RegionMetadata}; +use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef}; pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext}; use crate::schema::compat::CompatWrite; use crate::snapshot::SnapshotImpl; @@ -120,7 +120,10 @@ impl RegionImpl { ))) .await?; - let version = Version::with_manifest_version(metadata, manifest_version); + let mutable_memtable = store_config + .memtable_builder + .build(metadata.schema().clone()); + let version = Version::with_manifest_version(metadata, manifest_version, mutable_memtable); let region = RegionImpl::new(version, store_config); Ok(region) @@ -160,11 +163,15 @@ impl RegionImpl { _opts: &OpenOptions, ) -> Result>> { // Load version meta data from manifest. - let (version, mut recovered_metadata) = - match Self::recover_from_manifest(&store_config.manifest).await? { - (None, _) => return Ok(None), - (Some(v), m) => (v, m), - }; + let (version, mut recovered_metadata) = match Self::recover_from_manifest( + &store_config.manifest, + &store_config.memtable_builder, + ) + .await? + { + (None, _) => return Ok(None), + (Some(v), m) => (v, m), + }; logging::debug!( "Region recovered version from manifest, version: {:?}", @@ -179,12 +186,19 @@ impl RegionImpl { recovered_metadata.split_off(&(flushed_sequence + 1)); // apply the last flushed metadata if let Some((sequence, (manifest_version, metadata))) = recovered_metadata.pop_last() { - let metadata = Arc::new( + let metadata: RegionMetadataRef = Arc::new( metadata .try_into() .context(error::InvalidRawRegionSnafu { region: &name })?, ); - version_control.freeze_mutable_and_apply_metadata(metadata, manifest_version); + let mutable_memtable = store_config + .memtable_builder + .build(metadata.schema().clone()); + version_control.freeze_mutable_and_apply_metadata( + metadata, + manifest_version, + mutable_memtable, + ); logging::debug!( "Applied the last flushed metadata to region: {}, sequence: {}, manifest: {}", @@ -236,6 +250,7 @@ impl RegionImpl { async fn recover_from_manifest( manifest: &RegionManifest, + memtable_builder: &MemtableBuilderRef, ) -> Result<(Option, RecoveredMetadataMap)> { let (start, end) = Self::manifest_scan_range(); let mut iter = manifest.scan(start, end).await?; @@ -252,13 +267,17 @@ impl RegionImpl { match (action, version) { (RegionMetaAction::Change(c), None) => { let region = c.metadata.name.clone(); - let region_metadata = c + let region_metadata: RegionMetadata = c .metadata .try_into() .context(error::InvalidRawRegionSnafu { region })?; + // Use current schema to build a memtable. This might be replaced later + // in `freeze_mutable_and_apply_metadata()`. + let memtable = memtable_builder.build(region_metadata.schema().clone()); version = Some(Version::with_manifest_version( Arc::new(region_metadata), last_manifest_version, + memtable, )); for (manifest_version, action) in actions.drain(..) { version = Self::replay_edit(manifest_version, action, version); diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index ae7956cbf5..d7d753faae 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -20,6 +20,7 @@ use tempdir::TempDir; use super::*; use crate::manifest::action::{RegionChange, RegionMetaActionList}; use crate::manifest::test_utils::*; +use crate::memtable::DefaultMemtableBuilder; use crate::test_util::{ self, config_util, descriptor_util::RegionDescBuilder, schema_util, write_batch_util, }; @@ -166,7 +167,7 @@ async fn test_new_region() { .push_key_column(("k1", LogicalTypeId::Int32, false)) .push_value_column(("v0", LogicalTypeId::Float32, true)) .build(); - let metadata = desc.try_into().unwrap(); + let metadata: RegionMetadata = desc.try_into().unwrap(); let store_dir = TempDir::new("test_new_region") .unwrap() @@ -175,8 +176,14 @@ async fn test_new_region() { .to_string(); let store_config = config_util::new_store_config(region_name, &store_dir).await; + let placeholder_memtable = store_config + .memtable_builder + .build(metadata.schema().clone()); - let region = RegionImpl::new(Version::new(Arc::new(metadata)), store_config); + let region = RegionImpl::new( + Version::new(Arc::new(metadata), placeholder_memtable), + store_config, + ); let expect_schema = schema_util::new_schema_ref( &[ @@ -195,6 +202,7 @@ async fn test_new_region() { #[tokio::test] async fn test_recover_region_manifets() { let tmp_dir = TempDir::new("test_new_region").unwrap(); + let memtable_builder = Arc::new(DefaultMemtableBuilder::default()) as _; let object_store = ObjectStore::new( fs::Builder::default() @@ -207,11 +215,13 @@ async fn test_recover_region_manifets() { let region_meta = Arc::new(build_region_meta()); // Recover from empty - assert!(RegionImpl::::recover_from_manifest(&manifest) - .await - .unwrap() - .0 - .is_none()); + assert!( + RegionImpl::::recover_from_manifest(&manifest, &memtable_builder) + .await + .unwrap() + .0 + .is_none() + ); { // save some actions into region_meta @@ -246,7 +256,7 @@ async fn test_recover_region_manifets() { // try to recover let (version, recovered_metadata) = - RegionImpl::::recover_from_manifest(&manifest) + RegionImpl::::recover_from_manifest(&manifest, &memtable_builder) .await .unwrap(); @@ -261,7 +271,6 @@ async fn test_recover_region_manifets() { for (i, file) in files.iter().enumerate() { assert_eq!(format!("f{}", i + 1), file.file_name()); } - assert!(version.mutable_memtables().is_empty()); // check manifest state assert_eq!(3, manifest.last_version()); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index fd1edf0a37..6dd39a1c9f 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -1,12 +1,11 @@ use std::sync::Arc; use common_telemetry::logging; -use common_time::RangeMillis; use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; -use store_api::storage::{AlterRequest, WriteContext, WriteRequest, WriteResponse}; +use store_api::storage::{AlterRequest, WriteContext, WriteResponse}; use tokio::sync::Mutex; use crate::background::JobHandle; @@ -15,7 +14,8 @@ use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, }; -use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet}; +use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::metadata::RegionMetadataRef; use crate::proto::wal::WalHeader; use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef}; use crate::schema::compat::CompatWrite; @@ -134,7 +134,7 @@ impl RegionWriter { // avoid other writers write to the region and switch the memtable safely. // Another potential benefit is that the write lock also protect against concurrent // alter request to the region. - let _inner = self.inner.lock().await; + let inner = self.inner.lock().await; let version_control = alter_ctx.version_control(); @@ -176,7 +176,12 @@ impl RegionWriter { let manifest_version = alter_ctx.manifest.update(action_list).await?; // Now we could switch memtables and apply the new metadata to the version. - version_control.freeze_mutable_and_apply_metadata(new_metadata, manifest_version); + let new_mutable = inner.memtable_builder.build(new_metadata.schema().clone()); + version_control.freeze_mutable_and_apply_metadata( + new_metadata, + manifest_version, + new_mutable, + ); self.persist_manifest_version(alter_ctx.wal, version_control, manifest_version) .await @@ -250,7 +255,6 @@ impl<'a, S: LogStore> AlterContext<'a, S> { #[derive(Debug)] struct WriterInner { memtable_builder: MemtableBuilderRef, - last_memtable_id: MemtableId, flush_handle: Option, } @@ -258,7 +262,6 @@ impl WriterInner { fn new(memtable_builder: MemtableBuilderRef) -> WriterInner { WriterInner { memtable_builder, - last_memtable_id: 0, flush_handle: None, } } @@ -274,7 +277,7 @@ impl WriterInner { mut request: WriteBatch, writer_ctx: WriterContext<'_, S>, ) -> Result { - let time_ranges = self.preprocess_write(&request, &writer_ctx).await?; + self.preprocess_write(&writer_ctx).await?; let version_control = writer_ctx.version_control(); let _lock = version_mutex.lock().await; @@ -302,8 +305,8 @@ impl WriterInner { .await?; // Insert batch into memtable. - let mut inserter = Inserter::new(next_sequence, time_ranges, version.bucket_duration()); - inserter.insert_memtables(&request, version.mutable_memtables())?; + let mut inserter = Inserter::new(next_sequence); + inserter.insert_memtable(&request, version.mutable_memtable())?; // Update committed_sequence to make current batch visible. The `&mut self` of WriterInner // guarantees the writer is exclusive. @@ -340,13 +343,18 @@ impl WriterInner { // This is the first request that use the new metadata. // It's safe to unwrap here. It's checked above. Move out metadata to avoid cloning it. let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap(); + let region_metadata: RegionMetadataRef = Arc::new( + metadata.try_into().context(error::InvalidRawRegionSnafu { + region: &writer_ctx.shared.name, + })?, + ); + let new_mutable = self + .memtable_builder + .build(region_metadata.schema().clone()); version_control.freeze_mutable_and_apply_metadata( - Arc::new(metadata.try_into().context( - error::InvalidRawRegionSnafu { - region: &writer_ctx.shared.name, - }, - )?), + region_metadata, manifest_version, + new_mutable, ); num_recovered_metadata += 1; logging::debug!( @@ -364,7 +372,6 @@ impl WriterInner { if let Some(request) = request { num_requests += 1; - let time_ranges = self.prepare_memtables(&request, version_control)?; // Note that memtables of `Version` may be updated during replay. let version = version_control.current(); @@ -390,9 +397,8 @@ impl WriterInner { } // TODO(yingwen): Trigger flush if the size of memtables reach the flush threshold to avoid // out of memory during replay, but we need to do it carefully to avoid dead lock. - let mut inserter = - Inserter::new(last_sequence, time_ranges, version.bucket_duration()); - inserter.insert_memtables(&request, version.mutable_memtables())?; + let mut inserter = Inserter::new(last_sequence); + inserter.insert_memtable(&request, version.mutable_memtable())?; } } @@ -418,9 +424,8 @@ impl WriterInner { /// flush if necessary. Returns time ranges of the input write batch. async fn preprocess_write( &mut self, - request: &WriteBatch, writer_ctx: &WriterContext<'_, S>, - ) -> Result> { + ) -> Result<()> { let version_control = writer_ctx.version_control(); // Check whether memtable is full or flush should be triggered. We need to do this first since // switching memtables will clear all mutable memtables. @@ -429,52 +434,16 @@ impl WriterInner { version_control, writer_ctx.flush_strategy, ) { - self.trigger_flush( - writer_ctx.shared, - writer_ctx.flush_scheduler, - writer_ctx.sst_layer, - writer_ctx.writer, - writer_ctx.wal, - writer_ctx.manifest, - ) - .await?; + self.trigger_flush(writer_ctx).await?; } - self.prepare_memtables(request, version_control) + Ok(()) } - /// Create all needed mutable memtables, returns time ranges that overlapped with `request`. - fn prepare_memtables( - &mut self, - request: &WriteBatch, - version_control: &VersionControlRef, - ) -> Result> { - let current_version = version_control.current(); - let bucket_duration = current_version.bucket_duration(); - let time_ranges = request - .time_ranges(bucket_duration) - .context(error::InvalidTimestampSnafu)?; - let mutable = current_version.mutable_memtables(); - let mut memtables_to_add = MemtableSet::default(); - - // Pre-create all needed mutable memtables. - for range in &time_ranges { - if mutable.get_by_range(range).is_none() - && memtables_to_add.get_by_range(range).is_none() - { - // Memtable for this range is missing, need to create a new memtable. - let memtable_schema = current_version.schema().clone(); - let id = self.alloc_memtable_id(); - let memtable = self.memtable_builder.build(id, memtable_schema); - memtables_to_add.insert(*range, memtable); - } - } - - if !memtables_to_add.is_empty() { - version_control.add_mutable(memtables_to_add); - } - - Ok(time_ranges) + /// Create a new mutable memtable. + fn alloc_memtable(&self, version_control: &VersionControlRef) -> MemtableRef { + let memtable_schema = version_control.current().schema().clone(); + self.memtable_builder.build(memtable_schema) } fn should_flush( @@ -490,30 +459,23 @@ impl WriterInner { flush_strategy.should_flush(shared, mutable_bytes_allocated, total_bytes_allocated) } - async fn trigger_flush( - &mut self, - shared: &SharedDataRef, - flush_scheduler: &FlushSchedulerRef, - sst_layer: &AccessLayerRef, - writer: &RegionWriterRef, - wal: &Wal, - manifest: &RegionManifest, - ) -> Result<()> { - let version_control = &shared.version_control; + async fn trigger_flush(&mut self, ctx: &WriterContext<'_, S>) -> Result<()> { + let version_control = &ctx.shared.version_control; + let new_mutable = self.alloc_memtable(version_control); // Freeze all mutable memtables so we can flush them later. - version_control.freeze_mutable(); + version_control.freeze_mutable(new_mutable); if let Some(flush_handle) = self.flush_handle.take() { // Previous flush job is incomplete, wait util it is finished (write stall). // However the last flush job may fail, in which case, we just return error // and abort current write request. The flush handle is left empty, so the next // time we still have chance to trigger a new flush. - logging::info!("Write stall, region: {}", shared.name); + logging::info!("Write stall, region: {}", ctx.shared.name); // TODO(yingwen): We should release the write lock during waiting flush done, which // needs something like async condvar. flush_handle.join().await.map_err(|e| { - logging::error!(e; "Previous flush job failed, region: {}", shared.name); + logging::error!(e; "Previous flush job failed, region: {}", ctx.shared.name); e })?; } @@ -522,7 +484,7 @@ impl WriterInner { let (max_memtable_id, mem_to_flush) = current_version.memtables().memtables_to_flush(); if max_memtable_id.is_none() { - logging::info!("No memtables to flush in region: {}", shared.name); + logging::info!("No memtables to flush in region: {}", ctx.shared.name); return Ok(()); } @@ -531,22 +493,19 @@ impl WriterInner { memtables: mem_to_flush, // In write thread, safe to use current commited sequence. flush_sequence: version_control.committed_sequence(), - shared: shared.clone(), - sst_layer: sst_layer.clone(), - writer: writer.clone(), - wal: wal.clone(), - manifest: manifest.clone(), + shared: ctx.shared.clone(), + sst_layer: ctx.sst_layer.clone(), + writer: ctx.writer.clone(), + wal: ctx.wal.clone(), + manifest: ctx.manifest.clone(), }; - let flush_handle = flush_scheduler.schedule_flush(Box::new(flush_req)).await?; + let flush_handle = ctx + .flush_scheduler + .schedule_flush(Box::new(flush_req)) + .await?; self.flush_handle = Some(flush_handle); Ok(()) } - - #[inline] - fn alloc_memtable_id(&mut self) -> MemtableId { - self.last_memtable_id += 1; - self.last_memtable_id - } } diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index c7c240b79e..12ab495d2c 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -13,10 +13,7 @@ mod tests { use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector, VectorRef}; - use super::*; - use crate::metadata::RegionMetadata; use crate::read::Batch; - use crate::test_util::descriptor_util; pub const REGION_NAME: &str = "test"; @@ -42,14 +39,4 @@ mod tests { Batch::new(columns) } - - pub(crate) fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema { - let metadata: RegionMetadata = - descriptor_util::desc_with_value_columns(REGION_NAME, num_value_columns) - .try_into() - .unwrap(); - - let columns = metadata.columns; - RegionSchema::new(columns, version).unwrap() - } } diff --git a/src/storage/src/schema/compat.rs b/src/storage/src/schema/compat.rs index e77a3bb9e0..f87b83dd76 100644 --- a/src/storage/src/schema/compat.rs +++ b/src/storage/src/schema/compat.rs @@ -319,6 +319,7 @@ mod tests { use crate::schema::tests; use crate::schema::{ProjectedSchema, RegionSchema}; use crate::test_util::descriptor_util; + use crate::test_util::schema_util; fn check_fields(fields: &[Field], names: &[&str]) { for (field, name) in fields.iter().zip(names) { @@ -390,7 +391,7 @@ mod tests { #[test] fn test_compat_same_schema() { // (k0, timestamp, v0, v1) with version 0. - let region_schema = Arc::new(tests::new_region_schema(0, 2)); + let region_schema = Arc::new(schema_util::new_region_schema(0, 2)); let projected_schema = Arc::new(ProjectedSchema::no_projection(region_schema.clone())); let source_schema = region_schema.store_schema().clone(); @@ -420,7 +421,7 @@ mod tests { #[test] fn test_compat_same_version_with_projection() { // (k0, timestamp, v0, v1) with version 0. - let region_schema = Arc::new(tests::new_region_schema(0, 2)); + let region_schema = Arc::new(schema_util::new_region_schema(0, 2)); // Just read v0, k0. let projected_schema = Arc::new(ProjectedSchema::new(region_schema.clone(), Some(vec![2, 0])).unwrap()); @@ -452,9 +453,9 @@ mod tests { #[test] fn test_compat_old_column() { // (k0, timestamp, v0) with version 0. - let region_schema_old = Arc::new(tests::new_region_schema(0, 1)); + let region_schema_old = Arc::new(schema_util::new_region_schema(0, 1)); // (k0, timestamp, v0, v1) with version 1. - let region_schema_new = Arc::new(tests::new_region_schema(1, 1)); + let region_schema_new = Arc::new(schema_util::new_region_schema(1, 1)); // Just read v0, k0 let projected_schema = @@ -486,9 +487,9 @@ mod tests { #[test] fn test_compat_new_column() { // (k0, timestamp, v0, v1) with version 0. - let region_schema_old = Arc::new(tests::new_region_schema(0, 2)); + let region_schema_old = Arc::new(schema_util::new_region_schema(0, 2)); // (k0, timestamp, v0, v1, v2) with version 1. - let region_schema_new = Arc::new(tests::new_region_schema(1, 3)); + let region_schema_new = Arc::new(schema_util::new_region_schema(1, 3)); // Just read v2, v0, k0 let projected_schema = @@ -525,7 +526,7 @@ mod tests { #[test] fn test_compat_different_column() { // (k0, timestamp, v0, v1) with version 0. - let region_schema_old = Arc::new(tests::new_region_schema(0, 2)); + let region_schema_old = Arc::new(schema_util::new_region_schema(0, 2)); let mut descriptor = descriptor_util::desc_with_value_columns(tests::REGION_NAME, 2); // Assign a much larger column id to v0. diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index 4af9cde7d9..fe0ebce126 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -335,7 +335,7 @@ mod tests { fn test_projection() { // Build a region schema with 2 value columns. So the final user schema is // (k0, timestamp, v0, v1) - let region_schema = tests::new_region_schema(0, 2); + let region_schema = schema_util::new_region_schema(0, 2); // Projection, but still keep column order. // After projection: (timestamp, v0) @@ -376,7 +376,7 @@ mod tests { #[test] fn test_projected_schema_with_projection() { // (k0, timestamp, v0, v1, v2) - let region_schema = Arc::new(tests::new_region_schema(123, 3)); + let region_schema = Arc::new(schema_util::new_region_schema(123, 3)); // After projection: (v1, timestamp) let projected_schema = @@ -414,7 +414,7 @@ mod tests { // The schema to read should be same as region schema with (k0, timestamp, v0). // We can't use `new_schema_with_version()` because the StoreSchema also store other // metadata that `new_schema_with_version()` can't store. - let expect_schema = tests::new_region_schema(123, 1); + let expect_schema = schema_util::new_region_schema(123, 1); assert_eq!( expect_schema.store_schema(), projected_schema.schema_to_read() @@ -433,7 +433,7 @@ mod tests { #[test] fn test_projected_schema_no_projection() { // (k0, timestamp, v0) - let region_schema = Arc::new(tests::new_region_schema(123, 1)); + let region_schema = Arc::new(schema_util::new_region_schema(123, 1)); let projected_schema = ProjectedSchema::no_projection(region_schema.clone()); @@ -461,7 +461,7 @@ mod tests { #[test] fn test_projected_schema_empty_projection() { // (k0, timestamp, v0) - let region_schema = Arc::new(tests::new_region_schema(123, 1)); + let region_schema = Arc::new(schema_util::new_region_schema(123, 1)); let err = ProjectedSchema::new(region_schema, Some(Vec::new())) .err() diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs index dcc9ec4656..6e67c5a55c 100644 --- a/src/storage/src/schema/region.rs +++ b/src/storage/src/schema/region.rs @@ -139,12 +139,11 @@ mod tests { use datatypes::type_id::LogicalTypeId; use super::*; - use crate::schema::tests; use crate::test_util::schema_util; #[test] fn test_region_schema() { - let region_schema = Arc::new(tests::new_region_schema(123, 1)); + let region_schema = Arc::new(schema_util::new_region_schema(123, 1)); let expect_schema = schema_util::new_schema_with_version( &[ diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index a91e1360b2..b706540fc0 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -219,6 +219,7 @@ mod tests { use super::*; use crate::read::Batch; use crate::schema::tests; + use crate::test_util::schema_util; fn check_chunk_batch(chunk: &ArrowChunk>, batch: &Batch) { assert_eq!(5, chunk.columns().len()); @@ -231,7 +232,7 @@ mod tests { #[test] fn test_store_schema() { - let region_schema = Arc::new(tests::new_region_schema(123, 1)); + let region_schema = Arc::new(schema_util::new_region_schema(123, 1)); // Checks StoreSchema. let store_schema = region_schema.store_schema(); diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 110573100f..c7177f926d 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -36,7 +36,7 @@ impl Snapshot for SnapshotImpl { let visible_sequence = self.sequence_to_read(request.sequence); let memtable_version = self.version.memtables(); - let mutables = memtable_version.mutable_memtables(); + let mutables = memtable_version.mutable_memtable(); let immutables = memtable_version.immutable_memtables(); let mut builder = @@ -46,10 +46,10 @@ impl Snapshot for SnapshotImpl { .filters(request.filters) .batch_size(ctx.batch_size) .visible_sequence(visible_sequence) - .pick_memtables(mutables); + .pick_memtables(mutables.clone()); - for mem_set in immutables { - builder = builder.pick_memtables(mem_set); + for memtable in immutables { + builder = builder.pick_memtables(memtable.clone()); } let reader = builder.pick_ssts(self.version.ssts())?.build().await?; diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 5135061c49..fc65530681 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -302,7 +302,7 @@ mod tests { #[tokio::test] async fn test_parquet_writer() { let schema = memtable_tests::schema_for_test(); - let memtable = DefaultMemtableBuilder {}.build(1, schema); + let memtable = DefaultMemtableBuilder::default().build(schema); memtable_tests::write_kvs( &*memtable, diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index a8fe7dfc24..b72ff3fe7d 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -40,7 +40,7 @@ pub async fn new_store_config( log_store, sst_layer, manifest, - memtable_builder: Arc::new(DefaultMemtableBuilder {}), + memtable_builder: Arc::new(DefaultMemtableBuilder::default()), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), } diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs index af5e0fd056..ea2a3d379d 100644 --- a/src/storage/src/test_util/schema_util.rs +++ b/src/storage/src/test_util/schema_util.rs @@ -3,6 +3,10 @@ use std::sync::Arc; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +use super::descriptor_util; +use crate::metadata::RegionMetadata; +use crate::schema::RegionSchema; + /// Column definition: (name, datatype, is_nullable) pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); @@ -39,3 +43,13 @@ pub fn new_schema_with_version( pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option) -> SchemaRef { Arc::new(new_schema(column_defs, timestamp_index)) } + +pub fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema { + let metadata: RegionMetadata = + descriptor_util::desc_with_value_columns("REGION_NAME", num_value_columns) + .try_into() + .unwrap(); + + let columns = metadata.columns; + RegionSchema::new(columns, version).unwrap() +} diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index b9330d63f9..76aedac913 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -9,21 +9,17 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Duration; use store_api::manifest::ManifestVersion; use store_api::storage::{SchemaRef, SequenceNumber}; -use crate::memtable::{MemtableId, MemtableSet, MemtableVersion}; +use crate::memtable::{MemtableId, MemtableRef, MemtableVersion}; use crate::metadata::RegionMetadataRef; use crate::schema::RegionSchemaRef; use crate::sst::LevelMetas; use crate::sst::{FileHandle, FileMeta}; use crate::sync::CowCell; -/// Default bucket duration: 2 Hours. -const DEFAULT_BUCKET_DURATION: Duration = Duration::from_secs(3600 * 2); - pub const INIT_COMMITTED_SEQUENCE: u64 = 0; /// Controls version of in memory state for a region. @@ -79,26 +75,12 @@ impl VersionControl { self.committed_sequence.store(value, Ordering::Relaxed); } - /// Add mutable memtables and commit. - /// - /// # Panics - /// See [MemtableVersion::add_mutable](MemtableVersion::add_mutable). - pub fn add_mutable(&self, memtables_to_add: MemtableSet) { - let mut version_to_update = self.version.lock(); - - let memtable_version = version_to_update.memtables(); - let merged = memtable_version.add_mutable(memtables_to_add); - version_to_update.memtables = Arc::new(merged); - - version_to_update.commit(); - } - /// Freeze all mutable memtables. - pub fn freeze_mutable(&self) { + pub fn freeze_mutable(&self, new_memtable: MemtableRef) { let mut version_to_update = self.version.lock(); let memtable_version = version_to_update.memtables(); - let freezed = memtable_version.freeze_mutable(); + let freezed = memtable_version.freeze_mutable(new_memtable); version_to_update.memtables = Arc::new(freezed); version_to_update.commit(); @@ -116,16 +98,15 @@ impl VersionControl { &self, metadata: RegionMetadataRef, manifest_version: ManifestVersion, + mutable_memtable: MemtableRef, ) { let mut version_to_update = self.version.lock(); let memtable_version = version_to_update.memtables(); // When applying metadata, mutable memtable set might be empty and there is no // need to freeze it. - if !memtable_version.mutable_memtables().is_empty() { - let freezed = memtable_version.freeze_mutable(); - version_to_update.memtables = Arc::new(freezed); - } + let freezed = memtable_version.freeze_mutable(mutable_memtable); + version_to_update.memtables = Arc::new(freezed); version_to_update.apply_metadata(metadata, manifest_version); version_to_update.commit(); @@ -170,18 +151,19 @@ pub struct Version { impl Version { /// Create a new `Version` with given `metadata`. #[cfg(test)] - pub fn new(metadata: RegionMetadataRef) -> Version { - Version::with_manifest_version(metadata, 0) + pub fn new(metadata: RegionMetadataRef, memtable: MemtableRef) -> Version { + Version::with_manifest_version(metadata, 0, memtable) } /// Create a new `Version` with given `metadata` and initial `manifest_version`. pub fn with_manifest_version( metadata: RegionMetadataRef, manifest_version: ManifestVersion, + mutable_memtable: MemtableRef, ) -> Version { Version { metadata, - memtables: Arc::new(MemtableVersion::new()), + memtables: Arc::new(MemtableVersion::new(mutable_memtable)), ssts: Arc::new(LevelMetas::new()), flushed_sequence: 0, manifest_version, @@ -204,8 +186,8 @@ impl Version { } #[inline] - pub fn mutable_memtables(&self) -> &MemtableSet { - self.memtables.mutable_memtables() + pub fn mutable_memtable(&self) -> &MemtableRef { + self.memtables.mutable_memtable() } #[inline] @@ -223,11 +205,6 @@ impl Version { self.flushed_sequence } - /// Returns duration used to partition the memtables and ssts by time. - pub fn bucket_duration(&self) -> Duration { - DEFAULT_BUCKET_DURATION - } - pub fn apply_edit(&mut self, edit: VersionEdit) { let flushed_sequence = edit.flushed_sequence.unwrap_or(self.flushed_sequence); if self.flushed_sequence < flushed_sequence { @@ -282,16 +259,17 @@ impl Version { #[cfg(test)] mod tests { use super::*; - use crate::metadata::RegionMetadata; + use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder}; use crate::test_util::descriptor_util::RegionDescBuilder; fn new_version_control() -> VersionControl { let desc = RegionDescBuilder::new("version-test") .enable_version_column(false) .build(); - let metadata: RegionMetadata = desc.try_into().unwrap(); + let metadata: RegionMetadataRef = Arc::new(desc.try_into().unwrap()); + let memtable = DefaultMemtableBuilder::default().build(metadata.schema().clone()); - let version = Version::new(Arc::new(metadata)); + let version = Version::new(metadata, memtable); VersionControl::with_version(version) }