feat: Remove memtable's time bucket (#442)

* refactor: partially replace MemtableSet with Memtable

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove MemtableWithMeta and MemtableSet in non-test mod

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* make test compile 🤣

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix broken tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* make all tests pass

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippys

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove redundant clone

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

* resolve review comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2022-11-11 18:02:34 +08:00
committed by GitHub
parent 74ea529d1a
commit e30879f638
22 changed files with 267 additions and 1166 deletions

View File

@@ -25,5 +25,5 @@ pub fn schema_for_test() -> RegionSchemaRef {
}
pub fn new_memtable() -> MemtableRef {
DefaultMemtableBuilder {}.build(1, schema_for_test())
DefaultMemtableBuilder::default().build(schema_for_test())
}

View File

@@ -7,7 +7,7 @@ use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef, MemtableSet};
use crate::memtable::{IterContext, MemtableRef};
use crate::read::{BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
@@ -100,11 +100,8 @@ impl ChunkReaderBuilder {
self
}
pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Self {
for (_range, mem) in memtables.iter() {
self.memtables.push(mem.clone());
}
pub fn pick_memtables(mut self, memtables: MemtableRef) -> Self {
self.memtables.push(memtables);
self
}

View File

@@ -207,7 +207,7 @@ impl<S: LogStore> EngineInner<S> {
object_store,
log_store,
regions: RwLock::new(Default::default()),
memtable_builder: Arc::new(DefaultMemtableBuilder {}),
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
}

View File

@@ -2,7 +2,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_telemetry::logging;
use common_time::RangeMillis;
use store_api::logstore::LogStore;
use store_api::storage::consts::WRITE_ROW_GROUP_SIZE;
use store_api::storage::SequenceNumber;
@@ -104,12 +103,6 @@ impl FlushStrategy for SizeBasedStrategy {
}
}
#[derive(Debug)]
pub struct MemtableWithMeta {
pub memtable: MemtableRef,
pub bucket: RangeMillis,
}
#[async_trait]
pub trait FlushScheduler: Send + Sync + std::fmt::Debug {
async fn schedule_flush(&self, flush_job: Box<dyn Job>) -> Result<JobHandle>;
@@ -141,7 +134,7 @@ pub struct FlushJob<S: LogStore> {
/// used to remove immutable memtables in current version.
pub max_memtable_id: MemtableId,
/// Memtables to be flushed.
pub memtables: Vec<MemtableWithMeta>,
pub memtables: Vec<MemtableRef>,
/// Last sequence of data to be flushed.
pub flush_sequence: SequenceNumber,
/// Shared data of region to be flushed.
@@ -170,9 +163,14 @@ impl<S: LogStore> FlushJob<S> {
..Default::default()
};
for m in &self.memtables {
// skip empty memtable
if m.num_rows() == 0 {
continue;
}
let file_name = Self::generate_sst_file_name();
// TODO(hl): Check if random file name already exists in meta.
let iter = m.memtable.iter(&iter_ctx)?;
let iter = m.iter(&iter_ctx)?;
futures.push(async move {
self.sst_layer
.write_sst(&file_name, iter, &WriteOptions::default())

View File

@@ -4,6 +4,7 @@ mod inserter;
pub mod tests;
mod version;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use datatypes::vectors::VectorRef;
@@ -12,7 +13,7 @@ use store_api::storage::{consts, OpType, SequenceNumber};
use crate::error::Result;
use crate::memtable::btree::BTreeMemtable;
pub use crate::memtable::inserter::Inserter;
pub use crate::memtable::version::{MemtableSet, MemtableVersion};
pub use crate::memtable::version::MemtableVersion;
use crate::read::Batch;
use crate::schema::{ProjectedSchemaRef, RegionSchemaRef};
@@ -36,8 +37,13 @@ pub trait Memtable: Send + Sync + std::fmt::Debug {
/// Iterates the memtable.
fn iter(&self, ctx: &IterContext) -> Result<BoxedBatchIterator>;
/// Returns the estimated bytes allocated by this memtable from heap.
/// Returns the estimated bytes allocated by this memtable from heap. Result
/// of this method may be larger than the estimated based on [`num_rows`] because
/// of the implementor's pre-alloc behavior.
fn bytes_allocated(&self) -> usize;
/// Return the number of rows contained in this memtable.
fn num_rows(&self) -> usize;
}
pub type MemtableRef = Arc<dyn Memtable>;
@@ -100,7 +106,7 @@ pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
pub type BoxedBatchIterator = Box<dyn BatchIterator>;
pub trait MemtableBuilder: Send + Sync + std::fmt::Debug {
fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef;
fn build(&self, schema: RegionSchemaRef) -> MemtableRef;
}
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
@@ -140,11 +146,14 @@ impl KeyValues {
}
}
#[derive(Debug)]
pub struct DefaultMemtableBuilder;
#[derive(Debug, Default)]
pub struct DefaultMemtableBuilder {
memtable_id: AtomicU32,
}
impl MemtableBuilder for DefaultMemtableBuilder {
fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef {
fn build(&self, schema: RegionSchemaRef) -> MemtableRef {
let id = self.memtable_id.fetch_add(1, Ordering::Relaxed);
Arc::new(BTreeMemtable::new(id, schema))
}
}

View File

@@ -78,6 +78,10 @@ impl Memtable for BTreeMemtable {
fn bytes_allocated(&self) -> usize {
self.estimated_bytes.load(AtomicOrdering::Relaxed)
}
fn num_rows(&self) -> usize {
self.map.read().unwrap().len()
}
}
struct BTreeIterator {

View File

@@ -1,68 +1,40 @@
use std::collections::HashMap;
use std::time::Duration;
use common_time::timestamp_millis::BucketAligned;
use common_time::{RangeMillis, TimestampMillis};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::ScalarVector;
use datatypes::schema::SchemaRef;
use datatypes::vectors::{Int64Vector, TimestampVector, VectorRef};
use datatypes::vectors::VectorRef;
use snafu::OptionExt;
use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber};
use crate::error::{self, IllegalTimestampColumnTypeSnafu, Result};
use crate::memtable::{KeyValues, Memtable, MemtableSet};
use super::MemtableRef;
use crate::error::{self, Result};
use crate::memtable::KeyValues;
use crate::write_batch::{Mutation, PutData, WriteBatch};
type RangeIndexMap = HashMap<TimestampMillis, usize>;
/// Wraps logic of inserting key/values in [WriteBatch] to [Memtable].
pub struct Inserter {
/// Sequence of the batch to be inserted.
sequence: SequenceNumber,
/// Time ranges of all input data.
time_ranges: Vec<RangeMillis>,
/// Map time range's start time to its index in time ranges.
time_range_indexes: RangeIndexMap,
/// Bucket duration of memtables.
bucket_duration: Duration,
/// Used to calculate the start index in batch for `KeyValues`.
index_in_batch: usize,
}
impl Inserter {
pub fn new(
sequence: SequenceNumber,
time_ranges: Vec<RangeMillis>,
bucket_duration: Duration,
) -> Inserter {
let time_range_indexes = new_range_index_map(&time_ranges);
pub fn new(sequence: SequenceNumber) -> Inserter {
Inserter {
sequence,
time_ranges,
time_range_indexes,
bucket_duration,
index_in_batch: 0,
}
}
// TODO(yingwen): Can we take the WriteBatch?
/// Insert write batch into memtables if both `batch` and `memtables` are not empty.
/// Insert write batch into memtable.
///
/// Won't do schema validation, caller (mostly the [`RegionWriter`]) should ensure the
/// schemas of `memtables` are consistent with `batch`'s, and the time ranges of `memtables`
/// are consistent with `self`'s time ranges.
///
/// # Panics
/// Panics if there is time range in `self.time_ranges` but not in `memtables`.
pub fn insert_memtables(&mut self, batch: &WriteBatch, memtables: &MemtableSet) -> Result<()> {
if batch.is_empty() || memtables.is_empty() {
/// Won't do schema validation if not configured. Caller (mostly the [`RegionWriter`]) should ensure the
/// schemas of `memtable` are consistent with `batch`'s.
pub fn insert_memtable(&mut self, batch: &WriteBatch, memtable: &MemtableRef) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
// Only validate schema in debug mod.
validate_input_and_memtable_schemas(batch, memtables);
// This function only makes effect in debug mode.
validate_input_and_memtable_schemas(batch, memtable);
// Enough to hold all key or value columns.
let total_column_num = batch.schema().num_columns();
@@ -78,7 +50,7 @@ impl Inserter {
for mutation in batch {
match mutation {
Mutation::Put(put_data) => {
self.put_memtables(batch.schema(), put_data, memtables, &mut kvs)?;
self.write_one_mutation(put_data, memtable, &mut kvs)?;
}
}
}
@@ -86,27 +58,10 @@ impl Inserter {
Ok(())
}
fn put_memtables(
&mut self,
schema: &SchemaRef,
put_data: &PutData,
memtables: &MemtableSet,
kvs: &mut KeyValues,
) -> Result<()> {
if memtables.len() == 1 {
// Fast path, only one memtable to put.
let (_range, memtable) = memtables.iter().next().unwrap();
return self.put_one_memtable(put_data, &**memtable, kvs);
}
// Split data by time range and put them into memtables.
self.put_multiple_memtables(schema, put_data, memtables, kvs)
}
fn put_one_memtable(
fn write_one_mutation(
&mut self,
put_data: &PutData,
memtable: &dyn Memtable,
memtable: &MemtableRef,
kvs: &mut KeyValues,
) -> Result<()> {
let schema = memtable.schema();
@@ -128,86 +83,19 @@ impl Inserter {
Ok(())
}
/// Put data to multiple memtables.
fn put_multiple_memtables(
&mut self,
schema: &SchemaRef,
put_data: &PutData,
memtables: &MemtableSet,
kvs: &mut KeyValues,
) -> Result<()> {
let timestamp_schema = schema
.timestamp_column()
.context(error::BatchMissingTimestampSnafu)?;
let timestamps = put_data.column_by_name(&timestamp_schema.name).context(
error::BatchMissingColumnSnafu {
column: &timestamp_schema.name,
},
)?;
let slice_indexes = match timestamps.data_type() {
ConcreteDataType::Int64(_) => {
let timestamps: &Int64Vector = timestamps
.as_any()
.downcast_ref()
.context(error::BatchMissingTimestampSnafu)?;
let iter = timestamps.iter_data();
compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes)
}
ConcreteDataType::Timestamp(_) => {
let timestamps: &TimestampVector = timestamps
.as_any()
.downcast_ref()
.context(error::BatchMissingTimestampSnafu)?;
let iter = timestamps.iter_data().map(|v| v.map(|v| v.value()));
compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes)
}
_ => {
return IllegalTimestampColumnTypeSnafu {
data_type: timestamps.data_type(),
}
.fail();
}
};
for slice_index in slice_indexes {
let sliced_data = put_data.slice(slice_index.start, slice_index.end);
let range = &self.time_ranges[slice_index.range_index];
// The caller should ensure memtable for given time range is exists.
let memtable = memtables
.get_by_range(range)
.expect("Memtable not found for range");
self.put_one_memtable(&sliced_data, &**memtable, kvs)?;
}
Ok(())
}
}
fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtables: &MemtableSet) {
fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtable: &MemtableRef) {
if cfg!(debug_assertions) {
let batch_schema = batch.schema();
for (_, memtable) in memtables.iter() {
let memtable_schema = memtable.schema();
let user_schema = memtable_schema.user_schema();
debug_assert_eq!(batch_schema.version(), user_schema.version());
// Only validate column schemas.
debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas());
}
let memtable_schema = memtable.schema();
let user_schema = memtable_schema.user_schema();
debug_assert_eq!(batch_schema.version(), user_schema.version());
// Only validate column schemas.
debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas());
}
}
fn new_range_index_map(time_ranges: &[RangeMillis]) -> RangeIndexMap {
time_ranges
.iter()
.enumerate()
.map(|(i, range)| (*range.start(), i))
.collect()
}
fn clone_put_data_column_to(
put_data: &PutData,
desc: &ColumnDescriptor,
@@ -231,441 +119,22 @@ struct SliceIndex {
range_index: usize,
}
/// Computes the indexes used to split timestamps into time ranges aligned by `duration`, stores
/// the indexes in [`SliceIndex`].
///
/// # Panics
/// Panics if the duration is too large to be represented by i64, or `timestamps` are not all
/// included by `time_range_indexes`.
fn compute_slice_indices<I: Iterator<Item = Option<i64>>>(
timestamps: I,
duration: Duration,
time_range_indexes: &RangeIndexMap,
) -> Vec<SliceIndex> {
let duration_ms = duration
.as_millis()
.try_into()
.unwrap_or_else(|e| panic!("Duration {:?} too large, {}", duration, e));
let mut slice_indexes = Vec::with_capacity(time_range_indexes.len());
// Current start and end of a valid `SliceIndex`.
let (mut start, mut end) = (0, 0);
// Time range index of the valid but unpushed `SliceIndex`.
let mut last_range_index = None;
// Iterate all timestamps, split timestamps by its time range.
for (i, ts) in timestamps.enumerate() {
// Find index for time range of the timestamp.
let current_range_index = ts
.and_then(|v| v.align_by_bucket(duration_ms))
.and_then(|aligned| time_range_indexes.get(&aligned).copied());
match current_range_index {
Some(current_range_index) => {
end = i;
match last_range_index {
Some(last_index) => {
if last_index != current_range_index {
// Found a new range, we need to push a SliceIndex for last range.
slice_indexes.push(SliceIndex {
start,
end,
range_index: last_index,
});
// Update last range index.
last_range_index = Some(current_range_index);
// Advance start.
start = i;
}
}
// No previous range index.
None => last_range_index = Some(current_range_index),
}
}
None => {
// Row without timestamp or out of time range will be skipped. This usually should not happen.
if let Some(last_index) = last_range_index {
// Need to store SliceIndex for last range.
slice_indexes.push(SliceIndex {
start,
end: i,
range_index: last_index,
});
// Clear last range index.
last_range_index = None;
}
// Advances start and end, skips current row.
start = i + 1;
end = start;
}
}
}
// Process last slice index.
if let Some(last_index) = last_range_index {
slice_indexes.push(SliceIndex {
start,
// We need to use `end + 1` to include the last element.
end: end + 1,
range_index: last_index,
});
}
slice_indexes
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::timestamp::Timestamp;
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::vectors::{
Int64Vector, Int64VectorBuilder, TimestampVector, TimestampVectorBuilder,
};
use datatypes::vectors::{Int64Vector, TimestampVector};
use datatypes::{type_id::LogicalTypeId, value::Value};
use store_api::storage::{PutOperation, WriteRequest};
use super::*;
use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId};
use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder};
use crate::metadata::RegionMetadata;
use crate::schema::RegionSchemaRef;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::write_batch_util;
fn new_time_ranges(starts: &[i64], duration: i64) -> Vec<RangeMillis> {
let mut ranges = Vec::with_capacity(starts.len());
for start in starts {
assert_eq!(*start, start / duration * duration);
ranges.push(RangeMillis::new(*start, start + duration).unwrap());
}
ranges
}
fn check_compute_slice_indexes(
timestamps: &[Option<i64>],
range_starts: &[i64],
duration: i64,
expect: &[SliceIndex],
) {
assert!(duration > 0);
let mut builder = TimestampVectorBuilder::with_capacity(0);
for v in timestamps {
builder.push(v.map(common_time::timestamp::Timestamp::from_millis));
}
let ts_vec = builder.finish();
let time_ranges = new_time_ranges(range_starts, duration);
let time_range_indexes = new_range_index_map(&time_ranges);
let slice_indexes = compute_slice_indices(
ts_vec.iter_data().map(|v| v.map(|v| v.value())),
Duration::from_millis(duration as u64),
&time_range_indexes,
);
assert_eq!(expect, slice_indexes);
}
#[test]
fn test_check_compute_slice_indexes_i64() {
let timestamps = &[Some(99), Some(13), Some(18), Some(234)];
let range_starts = &[0, 200];
let duration = 100;
let mut builder = Int64VectorBuilder::with_capacity(timestamps.len());
for v in timestamps {
builder.push(*v);
}
let ts_vec = builder.finish();
let time_ranges = new_time_ranges(range_starts, duration);
let time_range_indexes = new_range_index_map(&time_ranges);
let slice_indexes = compute_slice_indices(
ts_vec.iter_data(),
Duration::from_millis(duration as u64),
&time_range_indexes,
);
assert_eq!(
vec![
SliceIndex {
start: 0,
end: 3,
range_index: 0,
},
SliceIndex {
start: 3,
end: 4,
range_index: 1,
},
],
slice_indexes
);
}
#[test]
fn test_check_compute_slice_indexes_timestamp() {
let timestamps = &[Some(99), Some(13), Some(18), Some(234)];
let range_starts = &[0, 200];
let duration = 100;
let mut builder = TimestampVectorBuilder::with_capacity(timestamps.len());
for v in timestamps {
builder.push(v.map(Timestamp::from_millis));
}
let ts_vec = builder.finish();
let time_ranges = new_time_ranges(range_starts, duration);
let time_range_indexes = new_range_index_map(&time_ranges);
let slice_indexes = compute_slice_indices(
ts_vec.iter_data().map(|v| v.map(|v| v.value())),
Duration::from_millis(duration as u64),
&time_range_indexes,
);
assert_eq!(
vec![
SliceIndex {
start: 0,
end: 3,
range_index: 0,
},
SliceIndex {
start: 3,
end: 4,
range_index: 1,
},
],
slice_indexes
);
}
#[test]
fn test_compute_slice_indexes_valid() {
// Test empty input.
check_compute_slice_indexes(&[], &[], 100, &[]);
// One valid input.
check_compute_slice_indexes(
&[Some(99)],
&[0],
100,
&[SliceIndex {
start: 0,
end: 1,
range_index: 0,
}],
);
// 2 ranges.
check_compute_slice_indexes(
&[Some(99), Some(234)],
&[0, 200],
100,
&[
SliceIndex {
start: 0,
end: 1,
range_index: 0,
},
SliceIndex {
start: 1,
end: 2,
range_index: 1,
},
],
);
// Multiple elements in first range.
check_compute_slice_indexes(
&[Some(99), Some(13), Some(18), Some(234)],
&[0, 200],
100,
&[
SliceIndex {
start: 0,
end: 3,
range_index: 0,
},
SliceIndex {
start: 3,
end: 4,
range_index: 1,
},
],
);
// Multiple elements in last range.
check_compute_slice_indexes(
&[Some(99), Some(234), Some(271)],
&[0, 200],
100,
&[
SliceIndex {
start: 0,
end: 1,
range_index: 0,
},
SliceIndex {
start: 1,
end: 3,
range_index: 1,
},
],
);
// Mulitple ranges.
check_compute_slice_indexes(
&[Some(99), Some(13), Some(234), Some(456)],
&[0, 200, 400],
100,
&[
SliceIndex {
start: 0,
end: 2,
range_index: 0,
},
SliceIndex {
start: 2,
end: 3,
range_index: 1,
},
SliceIndex {
start: 3,
end: 4,
range_index: 2,
},
],
);
// Different slices with same range.
check_compute_slice_indexes(
&[Some(99), Some(234), Some(15)],
&[0, 200],
100,
&[
SliceIndex {
start: 0,
end: 1,
range_index: 0,
},
SliceIndex {
start: 1,
end: 2,
range_index: 1,
},
SliceIndex {
start: 2,
end: 3,
range_index: 0,
},
],
);
}
#[test]
fn test_compute_slice_indexes_null_timestamp() {
check_compute_slice_indexes(&[None], &[0], 100, &[]);
check_compute_slice_indexes(
&[None, None, Some(53)],
&[0],
100,
&[SliceIndex {
start: 2,
end: 3,
range_index: 0,
}],
);
check_compute_slice_indexes(
&[Some(53), None, None],
&[0],
100,
&[SliceIndex {
start: 0,
end: 1,
range_index: 0,
}],
);
check_compute_slice_indexes(
&[None, Some(53), None, Some(240), Some(13), None],
&[0, 200],
100,
&[
SliceIndex {
start: 1,
end: 2,
range_index: 0,
},
SliceIndex {
start: 3,
end: 4,
range_index: 1,
},
SliceIndex {
start: 4,
end: 5,
range_index: 0,
},
],
);
}
#[test]
fn test_compute_slice_indexes_no_range() {
check_compute_slice_indexes(
&[Some(99), Some(234), Some(15)],
&[0],
100,
&[
SliceIndex {
start: 0,
end: 1,
range_index: 0,
},
SliceIndex {
start: 2,
end: 3,
range_index: 0,
},
],
);
check_compute_slice_indexes(
&[Some(99), Some(15), Some(234)],
&[0],
100,
&[SliceIndex {
start: 0,
end: 2,
range_index: 0,
}],
);
check_compute_slice_indexes(
&[Some(-1), Some(99), Some(15)],
&[0],
100,
&[SliceIndex {
start: 1,
end: 3,
range_index: 0,
}],
);
}
fn new_test_write_batch() -> WriteBatch {
write_batch_util::new_write_batch(
&[
@@ -697,18 +166,8 @@ mod tests {
batch.put(put_data).unwrap();
}
fn new_memtable_set(time_ranges: &[RangeMillis], schema: &RegionSchemaRef) -> MemtableSet {
let mut set = MemtableSet::new();
for (id, range) in time_ranges.iter().enumerate() {
let mem = DefaultMemtableBuilder {}.build(id as MemtableId, schema.clone());
set.insert(*range, mem)
}
set
}
fn check_memtable_content(
mem: &dyn Memtable,
mem: &MemtableRef,
sequence: SequenceNumber,
data: &[(i64, Option<i64>)],
) {
@@ -735,15 +194,9 @@ mod tests {
#[test]
fn test_inserter_put_one_memtable() {
let sequence = 11111;
let bucket_duration = 100;
let time_ranges = new_time_ranges(&[0], bucket_duration);
let memtable_schema = new_region_schema();
let memtables = new_memtable_set(&time_ranges, &memtable_schema);
let mut inserter = Inserter::new(
sequence,
time_ranges,
Duration::from_millis(bucket_duration as u64),
);
let mutable_memtable = DefaultMemtableBuilder::default().build(memtable_schema);
let mut inserter = Inserter::new(sequence);
let mut batch = new_test_write_batch();
put_batch(&mut batch, &[(1, Some(1)), (2, None)]);
@@ -752,77 +205,28 @@ mod tests {
&mut batch,
&[
(3, None),
// Duplicate entries in same put data.
(2, None),
(2, None), // Duplicate entries in same put data.
(2, Some(2)),
(4, Some(4)),
],
);
inserter.insert_memtables(&batch, &memtables).unwrap();
let mem = memtables
.get_by_range(&RangeMillis::new(0, 100).unwrap())
.unwrap();
check_memtable_content(
&**mem,
sequence,
&[(1, Some(1)), (2, Some(2)), (3, None), (4, Some(4))],
);
}
#[test]
fn test_inserter_put_multiple() {
let sequence = 11111;
let bucket_duration = 100;
let time_ranges = new_time_ranges(&[0, 100, 200], bucket_duration);
let memtable_schema = new_region_schema();
let memtables = new_memtable_set(&time_ranges, &memtable_schema);
let mut inserter = Inserter::new(
sequence,
time_ranges,
Duration::from_millis(bucket_duration as u64),
);
let mut batch = new_test_write_batch();
put_batch(
&mut batch,
&[
(1, Some(1)),
(2, None),
(201, Some(201)),
(102, None),
(101, Some(101)),
],
);
put_batch(
&mut batch,
inserter.insert_memtable(&batch, &mutable_memtable).unwrap();
check_memtable_content(
&mutable_memtable,
sequence,
&[
(180, Some(1)),
(3, Some(3)),
(1, None),
(211, Some(211)),
(180, Some(180)),
(1, Some(1)),
(2, Some(2)),
(3, None),
(4, Some(4)),
(101, Some(101)),
(102, None),
(201, Some(201)),
],
);
inserter.insert_memtables(&batch, &memtables).unwrap();
let mem = memtables
.get_by_range(&RangeMillis::new(0, 100).unwrap())
.unwrap();
check_memtable_content(&**mem, sequence, &[(1, None), (2, None), (3, Some(3))]);
let mem = memtables
.get_by_range(&RangeMillis::new(100, 200).unwrap())
.unwrap();
check_memtable_content(
&**mem,
sequence,
&[(101, Some(101)), (102, None), (180, Some(180))],
);
let mem = memtables
.get_by_range(&RangeMillis::new(200, 300).unwrap())
.unwrap();
check_memtable_content(&**mem, sequence, &[(201, Some(201)), (211, Some(211))]);
}
}

View File

@@ -10,9 +10,6 @@ use crate::metadata::RegionMetadata;
use crate::schema::{ProjectedSchema, RegionSchemaRef};
use crate::test_util::descriptor_util::RegionDescBuilder;
// For simplicity, all memtables in test share same memtable id.
const MEMTABLE_ID: MemtableId = 1;
// Schema for testing memtable:
// - key: Int64(timestamp), UInt64(version),
// - value: UInt64, UInt64
@@ -157,7 +154,7 @@ impl Default for MemtableTester {
impl MemtableTester {
fn new() -> MemtableTester {
let schema = schema_for_test();
let builders = vec![Arc::new(DefaultMemtableBuilder {}) as _];
let builders = vec![Arc::new(DefaultMemtableBuilder::default()) as _];
MemtableTester { schema, builders }
}
@@ -165,7 +162,7 @@ impl MemtableTester {
fn new_memtables(&self) -> Vec<MemtableRef> {
self.builders
.iter()
.map(|b| b.build(MEMTABLE_ID, self.schema.clone()))
.map(|b| b.build(self.schema.clone()))
.collect()
}

View File

@@ -1,49 +1,50 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::sync::Arc;
use common_time::RangeMillis;
use crate::flush::MemtableWithMeta;
use crate::memtable::{MemtableId, MemtableRef};
/// A version of all memtables.
///
/// This structure is immutable now.
#[derive(Default, Debug, PartialEq, Eq)]
#[derive(Debug)]
pub struct MemtableVersion {
mutable: MemtableSet,
mutable: MemtableRef,
/// Immutable memtables.
immutables: Vec<MemtableSetRef>,
immutables: Vec<MemtableRef>,
}
impl MemtableVersion {
pub fn new() -> MemtableVersion {
MemtableVersion::default()
pub fn new(mutable: MemtableRef) -> MemtableVersion {
Self {
mutable,
immutables: vec![],
}
}
#[inline]
pub fn mutable_memtables(&self) -> &MemtableSet {
pub fn mutable_memtable(&self) -> &MemtableRef {
&self.mutable
}
#[inline]
pub fn immutable_memtables(&self) -> &[MemtableSetRef] {
pub fn immutable_memtables(&self) -> &[MemtableRef] {
&self.immutables
}
pub fn num_memtables(&self) -> usize {
self.mutable.len() + self.immutables.iter().map(|set| set.len()).sum::<usize>()
// the last `1` is for `mutable`
self.immutable_memtables().len() + 1
}
/// Clone current memtable version and freeze its mutable memtables, which moves
/// all mutable memtables to immutable memtable list.
pub fn freeze_mutable(&self) -> MemtableVersion {
pub fn freeze_mutable(&self, new_mutable: MemtableRef) -> MemtableVersion {
let mut immutables = self.immutables.clone();
immutables.push(Arc::new(self.mutable.clone()));
immutables.push(self.mutable.clone());
MemtableVersion {
mutable: MemtableSet::new(),
mutable: new_mutable,
immutables,
}
}
@@ -60,26 +61,13 @@ impl MemtableVersion {
+ self.mutable.bytes_allocated()
}
/// Creates a new `MemtableVersion` that contains memtables both in this and `other`.
///
/// # Panics
/// Panics if there are memtables with same time ranges.
pub fn add_mutable(&self, other: MemtableSet) -> MemtableVersion {
let mutable = self.mutable.add(other);
Self {
mutable,
immutables: self.immutables.clone(),
}
}
/// Creates a new `MemtableVersion` that removes immutable memtables
/// less than or equal to max_memtable_id.
pub fn remove_immutables(&self, max_memtable_id: MemtableId) -> MemtableVersion {
let immutables = self
.immutables
.iter()
.filter(|immem| immem.max_memtable_id() > max_memtable_id)
.filter(|immem| immem.id() > max_memtable_id)
.cloned()
.collect();
@@ -89,17 +77,9 @@ impl MemtableVersion {
}
}
pub fn memtables_to_flush(&self) -> (Option<MemtableId>, Vec<MemtableWithMeta>) {
let max_memtable_id = self
.immutables
.iter()
.map(|immem| immem.max_memtable_id())
.max();
let memtables = self
.immutables
.iter()
.flat_map(|immem| immem.to_memtable_with_metas())
.collect();
pub fn memtables_to_flush(&self) -> (Option<MemtableId>, Vec<MemtableRef>) {
let max_memtable_id = self.immutables.iter().map(|immem| immem.id()).max();
let memtables = self.immutables.clone();
(max_memtable_id, memtables)
}
@@ -124,299 +104,44 @@ impl PartialOrd for RangeKey {
}
}
/// Collection of mutable memtables.
///
/// Memtables are partitioned by their time range. Caller should ensure
/// there are no overlapped ranges and all ranges are aligned by same
/// bucket duration.
#[derive(Default, Clone, Debug)]
pub struct MemtableSet {
memtables: BTreeMap<RangeKey, MemtableRef>,
max_memtable_id: MemtableId,
}
pub type MemtableSetRef = Arc<MemtableSet>;
impl PartialEq for MemtableSet {
fn eq(&self, other: &MemtableSet) -> bool {
self.max_memtable_id == other.max_memtable_id
&& self.memtables.len() == other.memtables.len()
&& self
.memtables
.iter()
.zip(&other.memtables)
.all(|(a, b)| a.0 == b.0 && a.1.id() == b.1.id() && a.1.schema() == b.1.schema())
}
}
impl Eq for MemtableSet {}
impl MemtableSet {
pub fn new() -> MemtableSet {
MemtableSet::default()
}
/// Get memtable by time range.
///
/// The range must exactly equal to the range of the memtable, otherwise `None`
/// is returned.
pub fn get_by_range(&self, range: &RangeMillis) -> Option<&MemtableRef> {
let range_key = RangeKey(*range);
self.memtables.get(&range_key)
}
/// Insert a new memtable.
///
/// # Panics
/// Panics if memtable with same range already exists.
pub fn insert(&mut self, range: RangeMillis, mem: MemtableRef) {
self.max_memtable_id = MemtableId::max(self.max_memtable_id, mem.id());
let old = self.memtables.insert(RangeKey(range), mem);
assert!(old.is_none());
}
/// Returns number of memtables in the set.
#[inline]
pub fn len(&self) -> usize {
self.memtables.len()
}
/// Returns true if there is no memtable in the set.
#[inline]
pub fn is_empty(&self) -> bool {
self.memtables.is_empty()
}
pub fn bytes_allocated(&self) -> usize {
self.memtables.values().map(|m| m.bytes_allocated()).sum()
}
pub fn max_memtable_id(&self) -> MemtableId {
self.max_memtable_id
}
/// Creates a new `MemtableSet` that contains memtables both in `self` and
/// `other`, let `self` unchanged.
pub fn add(&self, mut other: MemtableSet) -> MemtableSet {
// We use `other.memtables` to extend `self.memtables` since memtables
// in other should be empty in usual, so overwriting it is okay.
other
.memtables
.extend(self.memtables.iter().map(|(k, v)| (*k, v.clone())));
MemtableSet {
memtables: other.memtables,
max_memtable_id: MemtableId::max(self.max_memtable_id, other.max_memtable_id),
}
}
pub fn to_memtable_with_metas(&self) -> Vec<MemtableWithMeta> {
self.memtables
.iter()
.map(|(range_key, memtable)| MemtableWithMeta {
memtable: memtable.clone(),
bucket: range_key.0,
})
.collect()
}
pub fn iter(&self) -> impl Iterator<Item = (&RangeMillis, &MemtableRef)> {
self.memtables.iter().map(|(k, v)| (&k.0, v))
}
}
#[cfg(test)]
mod tests {
use store_api::storage::OpType;
use std::sync::Arc;
use super::*;
use crate::memtable::tests;
use crate::memtable::BTreeMemtable;
use crate::memtable::Memtable;
use crate::memtable::DefaultMemtableBuilder;
use crate::memtable::MemtableBuilder;
use crate::test_util::schema_util;
#[test]
fn test_memtableset_misc() {
let mut set = MemtableSet::new();
fn test_memtable_version() {
let memtable_builder = DefaultMemtableBuilder::default();
let region_schema = Arc::new(schema_util::new_region_schema(1, 1));
assert!(set.is_empty());
assert_eq!(0, set.max_memtable_id());
assert_eq!(0, set.bytes_allocated());
assert!(set
.get_by_range(&RangeMillis::new(0, 10).unwrap())
.is_none());
let memtable_1 = memtable_builder.build(region_schema.clone());
let v1 = MemtableVersion::new(memtable_1);
assert_eq!(1, v1.num_memtables());
set.insert(
RangeMillis::new(0, 10).unwrap(),
Arc::new(BTreeMemtable::new(0, tests::schema_for_test())),
);
set.insert(
RangeMillis::new(10, 20).unwrap(),
Arc::new(BTreeMemtable::new(1, tests::schema_for_test())),
);
let memtable = Arc::new(BTreeMemtable::new(2, tests::schema_for_test()));
// Write some test data
tests::write_kvs(
&*memtable,
10, // sequence
OpType::Put,
&[
(1000, 1),
(1000, 2),
(2002, 1),
(2003, 1),
(2003, 5),
(1001, 1),
], // keys
&[
(Some(1), None),
(Some(2), None),
(Some(7), None),
(Some(8), None),
(Some(9), None),
(Some(3), None),
], // values
);
// Freeze and add new mutable.
let memtable_2 = memtable_builder.build(region_schema.clone());
let v2 = v1.freeze_mutable(memtable_2);
let v2_immutables = v2.immutable_memtables();
assert_eq!(1, v2_immutables.len());
assert_eq!(0, v2_immutables[0].id());
assert_eq!(1, v2.mutable_memtable().id());
assert_eq!(2, v2.num_memtables());
set.insert(RangeMillis::new(20, 30).unwrap(), memtable.clone());
// Add another one and check immutable memtables that need flush
let memtable_3 = memtable_builder.build(region_schema);
let v3 = v2.freeze_mutable(memtable_3);
let (max_table_id, immutables) = v3.memtables_to_flush();
assert_eq!(1, max_table_id.unwrap());
assert_eq!(2, immutables.len());
for (i, (range, _)) in set.iter().enumerate() {
assert_eq!(
*range,
RangeMillis::new(i as i64 * 10, i as i64 * 10 + 10).unwrap()
);
}
assert!(!set.is_empty());
assert_eq!(2, set.max_memtable_id());
assert_eq!(memtable.bytes_allocated(), set.bytes_allocated());
assert!(set
.get_by_range(&RangeMillis::new(0, 10).unwrap())
.is_some());
assert!(set
.get_by_range(&RangeMillis::new(10, 20).unwrap())
.is_some());
assert!(set
.get_by_range(&RangeMillis::new(20, 30).unwrap())
.is_some());
assert!(set
.get_by_range(&RangeMillis::new(0, 100).unwrap())
.is_none());
}
fn create_test_memtableset(ids: &[MemtableId]) -> MemtableSet {
let mut set = MemtableSet::new();
for id in ids {
let i = *id as i64;
set.insert(
RangeMillis::new(i * 10, (i + 1) * 10).unwrap(),
Arc::new(BTreeMemtable::new(*id, tests::schema_for_test())),
);
}
set
}
#[test]
fn test_add_memtableset() {
let s1 = create_test_memtableset(&[0, 1, 2]);
let s2 = create_test_memtableset(&[3, 4, 5, 6]);
let mut s1_memtables = s1.to_memtable_with_metas();
let s2_memtables = s2.to_memtable_with_metas();
s1_memtables.extend(s2_memtables);
let empty = create_test_memtableset(&[]);
assert_eq!(s1, s1.add(empty));
let s3 = s1.add(s2);
assert_ne!(s1, s3);
assert_eq!(7, s3.memtables.len());
let s3_memtables = s3.to_memtable_with_metas();
assert_eq!(7, s3_memtables.len());
for i in 0..7 {
assert_eq!(s1_memtables[i].bucket, s3_memtables[i].bucket);
assert_eq!(s1_memtables[i].memtable.id(), s3_memtables[i].memtable.id());
}
assert_eq!(6, s3.max_memtable_id());
}
#[test]
fn test_memtableversion() {
let s1 = create_test_memtableset(&[0, 1, 2]);
let s2 = create_test_memtableset(&[3, 4, 5, 6]);
let s3 = s1.add(s2.clone());
let v1 = MemtableVersion::new();
assert!(v1.mutable_memtables().is_empty());
assert_eq!(0, v1.num_memtables());
// Add one mutable
let v2 = v1.add_mutable(s1.clone());
assert_ne!(v1, v2);
let mutables = v2.mutable_memtables();
assert_eq!(s1, *mutables);
assert_eq!(3, v2.num_memtables());
// Add another mutable
let v3 = v2.add_mutable(s2);
assert_ne!(v1, v3);
assert_ne!(v2, v3);
let mutables = v3.mutable_memtables();
assert_eq!(s3, *mutables);
assert!(v3.memtables_to_flush().1.is_empty());
assert_eq!(7, v3.num_memtables());
// Try to freeze s1, s2
let v4 = v3.freeze_mutable();
assert_ne!(v1, v4);
assert_ne!(v2, v4);
assert_ne!(v3, v4);
assert!(v4.mutable_memtables().is_empty());
assert_eq!(v4.immutables.len(), 1);
assert_eq!(v4.immutables[0], Arc::new(s3.clone()));
let (max_id, tables) = v4.memtables_to_flush();
assert_eq!(6, max_id.unwrap());
assert_eq!(7, tables.len());
assert_eq!(7, v4.num_memtables());
// Add another mutable
let s4 = create_test_memtableset(&[7, 8]);
let v5 = v4.add_mutable(s4.clone());
let mutables = v5.mutable_memtables();
assert_eq!(s4, *mutables);
assert_eq!(v4.immutables, v5.immutables);
// Try to freeze s4
let v6 = v5.freeze_mutable();
assert_eq!(v6.immutables.len(), 2);
assert_eq!(v6.immutables[0], Arc::new(s3));
assert_eq!(v6.immutables[1], Arc::new(s4.clone()));
let (max_id, tables) = v6.memtables_to_flush();
assert_eq!(8, max_id.unwrap());
assert_eq!(9, tables.len());
assert_eq!(9, v6.num_memtables());
// verify tables
for (i, table) in tables.iter().enumerate() {
assert_eq!(i as u32, table.memtable.id());
let i = i as i64;
assert_eq!(
table.bucket,
RangeMillis::new(i * 10, (i + 1) * 10).unwrap()
);
}
// Remove tables
let v7 = v6.remove_immutables(6);
assert_eq!(v7.immutables.len(), 1);
assert_eq!(v7.immutables[0], Arc::new(s4));
let v8 = v7.remove_immutables(8);
assert_eq!(v8.immutables.len(), 0);
assert_eq!(0, v8.num_memtables());
// Remove memtables
let v4 = v3.remove_immutables(1);
assert_eq!(1, v4.num_memtables());
assert_eq!(0, v4.immutable_memtables().len());
assert_eq!(2, v4.mutable_memtable().id());
}
}

View File

@@ -21,7 +21,7 @@ use crate::manifest::{
region::RegionManifest,
};
use crate::memtable::MemtableBuilderRef;
use crate::metadata::{RegionMetaImpl, RegionMetadata};
use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef};
pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext};
use crate::schema::compat::CompatWrite;
use crate::snapshot::SnapshotImpl;
@@ -120,7 +120,10 @@ impl<S: LogStore> RegionImpl<S> {
)))
.await?;
let version = Version::with_manifest_version(metadata, manifest_version);
let mutable_memtable = store_config
.memtable_builder
.build(metadata.schema().clone());
let version = Version::with_manifest_version(metadata, manifest_version, mutable_memtable);
let region = RegionImpl::new(version, store_config);
Ok(region)
@@ -160,11 +163,15 @@ impl<S: LogStore> RegionImpl<S> {
_opts: &OpenOptions,
) -> Result<Option<RegionImpl<S>>> {
// Load version meta data from manifest.
let (version, mut recovered_metadata) =
match Self::recover_from_manifest(&store_config.manifest).await? {
(None, _) => return Ok(None),
(Some(v), m) => (v, m),
};
let (version, mut recovered_metadata) = match Self::recover_from_manifest(
&store_config.manifest,
&store_config.memtable_builder,
)
.await?
{
(None, _) => return Ok(None),
(Some(v), m) => (v, m),
};
logging::debug!(
"Region recovered version from manifest, version: {:?}",
@@ -179,12 +186,19 @@ impl<S: LogStore> RegionImpl<S> {
recovered_metadata.split_off(&(flushed_sequence + 1));
// apply the last flushed metadata
if let Some((sequence, (manifest_version, metadata))) = recovered_metadata.pop_last() {
let metadata = Arc::new(
let metadata: RegionMetadataRef = Arc::new(
metadata
.try_into()
.context(error::InvalidRawRegionSnafu { region: &name })?,
);
version_control.freeze_mutable_and_apply_metadata(metadata, manifest_version);
let mutable_memtable = store_config
.memtable_builder
.build(metadata.schema().clone());
version_control.freeze_mutable_and_apply_metadata(
metadata,
manifest_version,
mutable_memtable,
);
logging::debug!(
"Applied the last flushed metadata to region: {}, sequence: {}, manifest: {}",
@@ -236,6 +250,7 @@ impl<S: LogStore> RegionImpl<S> {
async fn recover_from_manifest(
manifest: &RegionManifest,
memtable_builder: &MemtableBuilderRef,
) -> Result<(Option<Version>, RecoveredMetadataMap)> {
let (start, end) = Self::manifest_scan_range();
let mut iter = manifest.scan(start, end).await?;
@@ -252,13 +267,17 @@ impl<S: LogStore> RegionImpl<S> {
match (action, version) {
(RegionMetaAction::Change(c), None) => {
let region = c.metadata.name.clone();
let region_metadata = c
let region_metadata: RegionMetadata = c
.metadata
.try_into()
.context(error::InvalidRawRegionSnafu { region })?;
// Use current schema to build a memtable. This might be replaced later
// in `freeze_mutable_and_apply_metadata()`.
let memtable = memtable_builder.build(region_metadata.schema().clone());
version = Some(Version::with_manifest_version(
Arc::new(region_metadata),
last_manifest_version,
memtable,
));
for (manifest_version, action) in actions.drain(..) {
version = Self::replay_edit(manifest_version, action, version);

View File

@@ -20,6 +20,7 @@ use tempdir::TempDir;
use super::*;
use crate::manifest::action::{RegionChange, RegionMetaActionList};
use crate::manifest::test_utils::*;
use crate::memtable::DefaultMemtableBuilder;
use crate::test_util::{
self, config_util, descriptor_util::RegionDescBuilder, schema_util, write_batch_util,
};
@@ -166,7 +167,7 @@ async fn test_new_region() {
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v0", LogicalTypeId::Float32, true))
.build();
let metadata = desc.try_into().unwrap();
let metadata: RegionMetadata = desc.try_into().unwrap();
let store_dir = TempDir::new("test_new_region")
.unwrap()
@@ -175,8 +176,14 @@ async fn test_new_region() {
.to_string();
let store_config = config_util::new_store_config(region_name, &store_dir).await;
let placeholder_memtable = store_config
.memtable_builder
.build(metadata.schema().clone());
let region = RegionImpl::new(Version::new(Arc::new(metadata)), store_config);
let region = RegionImpl::new(
Version::new(Arc::new(metadata), placeholder_memtable),
store_config,
);
let expect_schema = schema_util::new_schema_ref(
&[
@@ -195,6 +202,7 @@ async fn test_new_region() {
#[tokio::test]
async fn test_recover_region_manifets() {
let tmp_dir = TempDir::new("test_new_region").unwrap();
let memtable_builder = Arc::new(DefaultMemtableBuilder::default()) as _;
let object_store = ObjectStore::new(
fs::Builder::default()
@@ -207,11 +215,13 @@ async fn test_recover_region_manifets() {
let region_meta = Arc::new(build_region_meta());
// Recover from empty
assert!(RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
.await
.unwrap()
.0
.is_none());
assert!(
RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest, &memtable_builder)
.await
.unwrap()
.0
.is_none()
);
{
// save some actions into region_meta
@@ -246,7 +256,7 @@ async fn test_recover_region_manifets() {
// try to recover
let (version, recovered_metadata) =
RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest, &memtable_builder)
.await
.unwrap();
@@ -261,7 +271,6 @@ async fn test_recover_region_manifets() {
for (i, file) in files.iter().enumerate() {
assert_eq!(format!("f{}", i + 1), file.file_name());
}
assert!(version.mutable_memtables().is_empty());
// check manifest state
assert_eq!(3, manifest.last_version());

View File

@@ -1,12 +1,11 @@
use std::sync::Arc;
use common_telemetry::logging;
use common_time::RangeMillis;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
use store_api::storage::{AlterRequest, WriteContext, WriteRequest, WriteResponse};
use store_api::storage::{AlterRequest, WriteContext, WriteResponse};
use tokio::sync::Mutex;
use crate::background::JobHandle;
@@ -15,7 +14,8 @@ use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::metadata::RegionMetadataRef;
use crate::proto::wal::WalHeader;
use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef};
use crate::schema::compat::CompatWrite;
@@ -134,7 +134,7 @@ impl RegionWriter {
// avoid other writers write to the region and switch the memtable safely.
// Another potential benefit is that the write lock also protect against concurrent
// alter request to the region.
let _inner = self.inner.lock().await;
let inner = self.inner.lock().await;
let version_control = alter_ctx.version_control();
@@ -176,7 +176,12 @@ impl RegionWriter {
let manifest_version = alter_ctx.manifest.update(action_list).await?;
// Now we could switch memtables and apply the new metadata to the version.
version_control.freeze_mutable_and_apply_metadata(new_metadata, manifest_version);
let new_mutable = inner.memtable_builder.build(new_metadata.schema().clone());
version_control.freeze_mutable_and_apply_metadata(
new_metadata,
manifest_version,
new_mutable,
);
self.persist_manifest_version(alter_ctx.wal, version_control, manifest_version)
.await
@@ -250,7 +255,6 @@ impl<'a, S: LogStore> AlterContext<'a, S> {
#[derive(Debug)]
struct WriterInner {
memtable_builder: MemtableBuilderRef,
last_memtable_id: MemtableId,
flush_handle: Option<JobHandle>,
}
@@ -258,7 +262,6 @@ impl WriterInner {
fn new(memtable_builder: MemtableBuilderRef) -> WriterInner {
WriterInner {
memtable_builder,
last_memtable_id: 0,
flush_handle: None,
}
}
@@ -274,7 +277,7 @@ impl WriterInner {
mut request: WriteBatch,
writer_ctx: WriterContext<'_, S>,
) -> Result<WriteResponse> {
let time_ranges = self.preprocess_write(&request, &writer_ctx).await?;
self.preprocess_write(&writer_ctx).await?;
let version_control = writer_ctx.version_control();
let _lock = version_mutex.lock().await;
@@ -302,8 +305,8 @@ impl WriterInner {
.await?;
// Insert batch into memtable.
let mut inserter = Inserter::new(next_sequence, time_ranges, version.bucket_duration());
inserter.insert_memtables(&request, version.mutable_memtables())?;
let mut inserter = Inserter::new(next_sequence);
inserter.insert_memtable(&request, version.mutable_memtable())?;
// Update committed_sequence to make current batch visible. The `&mut self` of WriterInner
// guarantees the writer is exclusive.
@@ -340,13 +343,18 @@ impl WriterInner {
// This is the first request that use the new metadata.
// It's safe to unwrap here. It's checked above. Move out metadata to avoid cloning it.
let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap();
let region_metadata: RegionMetadataRef = Arc::new(
metadata.try_into().context(error::InvalidRawRegionSnafu {
region: &writer_ctx.shared.name,
})?,
);
let new_mutable = self
.memtable_builder
.build(region_metadata.schema().clone());
version_control.freeze_mutable_and_apply_metadata(
Arc::new(metadata.try_into().context(
error::InvalidRawRegionSnafu {
region: &writer_ctx.shared.name,
},
)?),
region_metadata,
manifest_version,
new_mutable,
);
num_recovered_metadata += 1;
logging::debug!(
@@ -364,7 +372,6 @@ impl WriterInner {
if let Some(request) = request {
num_requests += 1;
let time_ranges = self.prepare_memtables(&request, version_control)?;
// Note that memtables of `Version` may be updated during replay.
let version = version_control.current();
@@ -390,9 +397,8 @@ impl WriterInner {
}
// TODO(yingwen): Trigger flush if the size of memtables reach the flush threshold to avoid
// out of memory during replay, but we need to do it carefully to avoid dead lock.
let mut inserter =
Inserter::new(last_sequence, time_ranges, version.bucket_duration());
inserter.insert_memtables(&request, version.mutable_memtables())?;
let mut inserter = Inserter::new(last_sequence);
inserter.insert_memtable(&request, version.mutable_memtable())?;
}
}
@@ -418,9 +424,8 @@ impl WriterInner {
/// flush if necessary. Returns time ranges of the input write batch.
async fn preprocess_write<S: LogStore>(
&mut self,
request: &WriteBatch,
writer_ctx: &WriterContext<'_, S>,
) -> Result<Vec<RangeMillis>> {
) -> Result<()> {
let version_control = writer_ctx.version_control();
// Check whether memtable is full or flush should be triggered. We need to do this first since
// switching memtables will clear all mutable memtables.
@@ -429,52 +434,16 @@ impl WriterInner {
version_control,
writer_ctx.flush_strategy,
) {
self.trigger_flush(
writer_ctx.shared,
writer_ctx.flush_scheduler,
writer_ctx.sst_layer,
writer_ctx.writer,
writer_ctx.wal,
writer_ctx.manifest,
)
.await?;
self.trigger_flush(writer_ctx).await?;
}
self.prepare_memtables(request, version_control)
Ok(())
}
/// Create all needed mutable memtables, returns time ranges that overlapped with `request`.
fn prepare_memtables(
&mut self,
request: &WriteBatch,
version_control: &VersionControlRef,
) -> Result<Vec<RangeMillis>> {
let current_version = version_control.current();
let bucket_duration = current_version.bucket_duration();
let time_ranges = request
.time_ranges(bucket_duration)
.context(error::InvalidTimestampSnafu)?;
let mutable = current_version.mutable_memtables();
let mut memtables_to_add = MemtableSet::default();
// Pre-create all needed mutable memtables.
for range in &time_ranges {
if mutable.get_by_range(range).is_none()
&& memtables_to_add.get_by_range(range).is_none()
{
// Memtable for this range is missing, need to create a new memtable.
let memtable_schema = current_version.schema().clone();
let id = self.alloc_memtable_id();
let memtable = self.memtable_builder.build(id, memtable_schema);
memtables_to_add.insert(*range, memtable);
}
}
if !memtables_to_add.is_empty() {
version_control.add_mutable(memtables_to_add);
}
Ok(time_ranges)
/// Create a new mutable memtable.
fn alloc_memtable(&self, version_control: &VersionControlRef) -> MemtableRef {
let memtable_schema = version_control.current().schema().clone();
self.memtable_builder.build(memtable_schema)
}
fn should_flush(
@@ -490,30 +459,23 @@ impl WriterInner {
flush_strategy.should_flush(shared, mutable_bytes_allocated, total_bytes_allocated)
}
async fn trigger_flush<S: LogStore>(
&mut self,
shared: &SharedDataRef,
flush_scheduler: &FlushSchedulerRef,
sst_layer: &AccessLayerRef,
writer: &RegionWriterRef,
wal: &Wal<S>,
manifest: &RegionManifest,
) -> Result<()> {
let version_control = &shared.version_control;
async fn trigger_flush<S: LogStore>(&mut self, ctx: &WriterContext<'_, S>) -> Result<()> {
let version_control = &ctx.shared.version_control;
let new_mutable = self.alloc_memtable(version_control);
// Freeze all mutable memtables so we can flush them later.
version_control.freeze_mutable();
version_control.freeze_mutable(new_mutable);
if let Some(flush_handle) = self.flush_handle.take() {
// Previous flush job is incomplete, wait util it is finished (write stall).
// However the last flush job may fail, in which case, we just return error
// and abort current write request. The flush handle is left empty, so the next
// time we still have chance to trigger a new flush.
logging::info!("Write stall, region: {}", shared.name);
logging::info!("Write stall, region: {}", ctx.shared.name);
// TODO(yingwen): We should release the write lock during waiting flush done, which
// needs something like async condvar.
flush_handle.join().await.map_err(|e| {
logging::error!(e; "Previous flush job failed, region: {}", shared.name);
logging::error!(e; "Previous flush job failed, region: {}", ctx.shared.name);
e
})?;
}
@@ -522,7 +484,7 @@ impl WriterInner {
let (max_memtable_id, mem_to_flush) = current_version.memtables().memtables_to_flush();
if max_memtable_id.is_none() {
logging::info!("No memtables to flush in region: {}", shared.name);
logging::info!("No memtables to flush in region: {}", ctx.shared.name);
return Ok(());
}
@@ -531,22 +493,19 @@ impl WriterInner {
memtables: mem_to_flush,
// In write thread, safe to use current commited sequence.
flush_sequence: version_control.committed_sequence(),
shared: shared.clone(),
sst_layer: sst_layer.clone(),
writer: writer.clone(),
wal: wal.clone(),
manifest: manifest.clone(),
shared: ctx.shared.clone(),
sst_layer: ctx.sst_layer.clone(),
writer: ctx.writer.clone(),
wal: ctx.wal.clone(),
manifest: ctx.manifest.clone(),
};
let flush_handle = flush_scheduler.schedule_flush(Box::new(flush_req)).await?;
let flush_handle = ctx
.flush_scheduler
.schedule_flush(Box::new(flush_req))
.await?;
self.flush_handle = Some(flush_handle);
Ok(())
}
#[inline]
fn alloc_memtable_id(&mut self) -> MemtableId {
self.last_memtable_id += 1;
self.last_memtable_id
}
}

View File

@@ -13,10 +13,7 @@ mod tests {
use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector, VectorRef};
use super::*;
use crate::metadata::RegionMetadata;
use crate::read::Batch;
use crate::test_util::descriptor_util;
pub const REGION_NAME: &str = "test";
@@ -42,14 +39,4 @@ mod tests {
Batch::new(columns)
}
pub(crate) fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema {
let metadata: RegionMetadata =
descriptor_util::desc_with_value_columns(REGION_NAME, num_value_columns)
.try_into()
.unwrap();
let columns = metadata.columns;
RegionSchema::new(columns, version).unwrap()
}
}

View File

@@ -319,6 +319,7 @@ mod tests {
use crate::schema::tests;
use crate::schema::{ProjectedSchema, RegionSchema};
use crate::test_util::descriptor_util;
use crate::test_util::schema_util;
fn check_fields(fields: &[Field], names: &[&str]) {
for (field, name) in fields.iter().zip(names) {
@@ -390,7 +391,7 @@ mod tests {
#[test]
fn test_compat_same_schema() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema = Arc::new(tests::new_region_schema(0, 2));
let region_schema = Arc::new(schema_util::new_region_schema(0, 2));
let projected_schema = Arc::new(ProjectedSchema::no_projection(region_schema.clone()));
let source_schema = region_schema.store_schema().clone();
@@ -420,7 +421,7 @@ mod tests {
#[test]
fn test_compat_same_version_with_projection() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema = Arc::new(tests::new_region_schema(0, 2));
let region_schema = Arc::new(schema_util::new_region_schema(0, 2));
// Just read v0, k0.
let projected_schema =
Arc::new(ProjectedSchema::new(region_schema.clone(), Some(vec![2, 0])).unwrap());
@@ -452,9 +453,9 @@ mod tests {
#[test]
fn test_compat_old_column() {
// (k0, timestamp, v0) with version 0.
let region_schema_old = Arc::new(tests::new_region_schema(0, 1));
let region_schema_old = Arc::new(schema_util::new_region_schema(0, 1));
// (k0, timestamp, v0, v1) with version 1.
let region_schema_new = Arc::new(tests::new_region_schema(1, 1));
let region_schema_new = Arc::new(schema_util::new_region_schema(1, 1));
// Just read v0, k0
let projected_schema =
@@ -486,9 +487,9 @@ mod tests {
#[test]
fn test_compat_new_column() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema_old = Arc::new(tests::new_region_schema(0, 2));
let region_schema_old = Arc::new(schema_util::new_region_schema(0, 2));
// (k0, timestamp, v0, v1, v2) with version 1.
let region_schema_new = Arc::new(tests::new_region_schema(1, 3));
let region_schema_new = Arc::new(schema_util::new_region_schema(1, 3));
// Just read v2, v0, k0
let projected_schema =
@@ -525,7 +526,7 @@ mod tests {
#[test]
fn test_compat_different_column() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema_old = Arc::new(tests::new_region_schema(0, 2));
let region_schema_old = Arc::new(schema_util::new_region_schema(0, 2));
let mut descriptor = descriptor_util::desc_with_value_columns(tests::REGION_NAME, 2);
// Assign a much larger column id to v0.

View File

@@ -335,7 +335,7 @@ mod tests {
fn test_projection() {
// Build a region schema with 2 value columns. So the final user schema is
// (k0, timestamp, v0, v1)
let region_schema = tests::new_region_schema(0, 2);
let region_schema = schema_util::new_region_schema(0, 2);
// Projection, but still keep column order.
// After projection: (timestamp, v0)
@@ -376,7 +376,7 @@ mod tests {
#[test]
fn test_projected_schema_with_projection() {
// (k0, timestamp, v0, v1, v2)
let region_schema = Arc::new(tests::new_region_schema(123, 3));
let region_schema = Arc::new(schema_util::new_region_schema(123, 3));
// After projection: (v1, timestamp)
let projected_schema =
@@ -414,7 +414,7 @@ mod tests {
// The schema to read should be same as region schema with (k0, timestamp, v0).
// We can't use `new_schema_with_version()` because the StoreSchema also store other
// metadata that `new_schema_with_version()` can't store.
let expect_schema = tests::new_region_schema(123, 1);
let expect_schema = schema_util::new_region_schema(123, 1);
assert_eq!(
expect_schema.store_schema(),
projected_schema.schema_to_read()
@@ -433,7 +433,7 @@ mod tests {
#[test]
fn test_projected_schema_no_projection() {
// (k0, timestamp, v0)
let region_schema = Arc::new(tests::new_region_schema(123, 1));
let region_schema = Arc::new(schema_util::new_region_schema(123, 1));
let projected_schema = ProjectedSchema::no_projection(region_schema.clone());
@@ -461,7 +461,7 @@ mod tests {
#[test]
fn test_projected_schema_empty_projection() {
// (k0, timestamp, v0)
let region_schema = Arc::new(tests::new_region_schema(123, 1));
let region_schema = Arc::new(schema_util::new_region_schema(123, 1));
let err = ProjectedSchema::new(region_schema, Some(Vec::new()))
.err()

View File

@@ -139,12 +139,11 @@ mod tests {
use datatypes::type_id::LogicalTypeId;
use super::*;
use crate::schema::tests;
use crate::test_util::schema_util;
#[test]
fn test_region_schema() {
let region_schema = Arc::new(tests::new_region_schema(123, 1));
let region_schema = Arc::new(schema_util::new_region_schema(123, 1));
let expect_schema = schema_util::new_schema_with_version(
&[

View File

@@ -219,6 +219,7 @@ mod tests {
use super::*;
use crate::read::Batch;
use crate::schema::tests;
use crate::test_util::schema_util;
fn check_chunk_batch(chunk: &ArrowChunk<Arc<dyn Array>>, batch: &Batch) {
assert_eq!(5, chunk.columns().len());
@@ -231,7 +232,7 @@ mod tests {
#[test]
fn test_store_schema() {
let region_schema = Arc::new(tests::new_region_schema(123, 1));
let region_schema = Arc::new(schema_util::new_region_schema(123, 1));
// Checks StoreSchema.
let store_schema = region_schema.store_schema();

View File

@@ -36,7 +36,7 @@ impl Snapshot for SnapshotImpl {
let visible_sequence = self.sequence_to_read(request.sequence);
let memtable_version = self.version.memtables();
let mutables = memtable_version.mutable_memtables();
let mutables = memtable_version.mutable_memtable();
let immutables = memtable_version.immutable_memtables();
let mut builder =
@@ -46,10 +46,10 @@ impl Snapshot for SnapshotImpl {
.filters(request.filters)
.batch_size(ctx.batch_size)
.visible_sequence(visible_sequence)
.pick_memtables(mutables);
.pick_memtables(mutables.clone());
for mem_set in immutables {
builder = builder.pick_memtables(mem_set);
for memtable in immutables {
builder = builder.pick_memtables(memtable.clone());
}
let reader = builder.pick_ssts(self.version.ssts())?.build().await?;

View File

@@ -302,7 +302,7 @@ mod tests {
#[tokio::test]
async fn test_parquet_writer() {
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder {}.build(1, schema);
let memtable = DefaultMemtableBuilder::default().build(schema);
memtable_tests::write_kvs(
&*memtable,

View File

@@ -40,7 +40,7 @@ pub async fn new_store_config(
log_store,
sst_layer,
manifest,
memtable_builder: Arc::new(DefaultMemtableBuilder {}),
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
}

View File

@@ -3,6 +3,10 @@ use std::sync::Arc;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use super::descriptor_util;
use crate::metadata::RegionMetadata;
use crate::schema::RegionSchema;
/// Column definition: (name, datatype, is_nullable)
pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
@@ -39,3 +43,13 @@ pub fn new_schema_with_version(
pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> SchemaRef {
Arc::new(new_schema(column_defs, timestamp_index))
}
pub fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema {
let metadata: RegionMetadata =
descriptor_util::desc_with_value_columns("REGION_NAME", num_value_columns)
.try_into()
.unwrap();
let columns = metadata.columns;
RegionSchema::new(columns, version).unwrap()
}

View File

@@ -9,21 +9,17 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use store_api::manifest::ManifestVersion;
use store_api::storage::{SchemaRef, SequenceNumber};
use crate::memtable::{MemtableId, MemtableSet, MemtableVersion};
use crate::memtable::{MemtableId, MemtableRef, MemtableVersion};
use crate::metadata::RegionMetadataRef;
use crate::schema::RegionSchemaRef;
use crate::sst::LevelMetas;
use crate::sst::{FileHandle, FileMeta};
use crate::sync::CowCell;
/// Default bucket duration: 2 Hours.
const DEFAULT_BUCKET_DURATION: Duration = Duration::from_secs(3600 * 2);
pub const INIT_COMMITTED_SEQUENCE: u64 = 0;
/// Controls version of in memory state for a region.
@@ -79,26 +75,12 @@ impl VersionControl {
self.committed_sequence.store(value, Ordering::Relaxed);
}
/// Add mutable memtables and commit.
///
/// # Panics
/// See [MemtableVersion::add_mutable](MemtableVersion::add_mutable).
pub fn add_mutable(&self, memtables_to_add: MemtableSet) {
let mut version_to_update = self.version.lock();
let memtable_version = version_to_update.memtables();
let merged = memtable_version.add_mutable(memtables_to_add);
version_to_update.memtables = Arc::new(merged);
version_to_update.commit();
}
/// Freeze all mutable memtables.
pub fn freeze_mutable(&self) {
pub fn freeze_mutable(&self, new_memtable: MemtableRef) {
let mut version_to_update = self.version.lock();
let memtable_version = version_to_update.memtables();
let freezed = memtable_version.freeze_mutable();
let freezed = memtable_version.freeze_mutable(new_memtable);
version_to_update.memtables = Arc::new(freezed);
version_to_update.commit();
@@ -116,16 +98,15 @@ impl VersionControl {
&self,
metadata: RegionMetadataRef,
manifest_version: ManifestVersion,
mutable_memtable: MemtableRef,
) {
let mut version_to_update = self.version.lock();
let memtable_version = version_to_update.memtables();
// When applying metadata, mutable memtable set might be empty and there is no
// need to freeze it.
if !memtable_version.mutable_memtables().is_empty() {
let freezed = memtable_version.freeze_mutable();
version_to_update.memtables = Arc::new(freezed);
}
let freezed = memtable_version.freeze_mutable(mutable_memtable);
version_to_update.memtables = Arc::new(freezed);
version_to_update.apply_metadata(metadata, manifest_version);
version_to_update.commit();
@@ -170,18 +151,19 @@ pub struct Version {
impl Version {
/// Create a new `Version` with given `metadata`.
#[cfg(test)]
pub fn new(metadata: RegionMetadataRef) -> Version {
Version::with_manifest_version(metadata, 0)
pub fn new(metadata: RegionMetadataRef, memtable: MemtableRef) -> Version {
Version::with_manifest_version(metadata, 0, memtable)
}
/// Create a new `Version` with given `metadata` and initial `manifest_version`.
pub fn with_manifest_version(
metadata: RegionMetadataRef,
manifest_version: ManifestVersion,
mutable_memtable: MemtableRef,
) -> Version {
Version {
metadata,
memtables: Arc::new(MemtableVersion::new()),
memtables: Arc::new(MemtableVersion::new(mutable_memtable)),
ssts: Arc::new(LevelMetas::new()),
flushed_sequence: 0,
manifest_version,
@@ -204,8 +186,8 @@ impl Version {
}
#[inline]
pub fn mutable_memtables(&self) -> &MemtableSet {
self.memtables.mutable_memtables()
pub fn mutable_memtable(&self) -> &MemtableRef {
self.memtables.mutable_memtable()
}
#[inline]
@@ -223,11 +205,6 @@ impl Version {
self.flushed_sequence
}
/// Returns duration used to partition the memtables and ssts by time.
pub fn bucket_duration(&self) -> Duration {
DEFAULT_BUCKET_DURATION
}
pub fn apply_edit(&mut self, edit: VersionEdit) {
let flushed_sequence = edit.flushed_sequence.unwrap_or(self.flushed_sequence);
if self.flushed_sequence < flushed_sequence {
@@ -282,16 +259,17 @@ impl Version {
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::RegionMetadata;
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder};
use crate::test_util::descriptor_util::RegionDescBuilder;
fn new_version_control() -> VersionControl {
let desc = RegionDescBuilder::new("version-test")
.enable_version_column(false)
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();
let metadata: RegionMetadataRef = Arc::new(desc.try_into().unwrap());
let memtable = DefaultMemtableBuilder::default().build(metadata.schema().clone());
let version = Version::new(Arc::new(metadata));
let version = Version::new(metadata, memtable);
VersionControl::with_version(version)
}