feat: Partition memtables by time if compaction window is provided (#3501)

* feat: define time partitions

* feat: adapt time partitions to version

* feat: implement non write methods

* feat: add write one to memtable

* feat: implement write

* chore: fix warning

* fix: inner not set

* refactor: add collect_iter_timestamps

* test: test partitions

* chore: debug log

* chore: fix typos

* chore: log memtable id

* fix: empty check

* chore: log total parts

* chore: update comments
This commit is contained in:
Yingwen
2024-03-14 19:13:01 +08:00
committed by GitHub
parent 3a326775ee
commit 8ca9e01455
12 changed files with 784 additions and 127 deletions

View File

@@ -26,6 +26,7 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::metrics::WRITE_BUFFER_BYTES;
@@ -33,6 +34,7 @@ use crate::read::Batch;
pub mod key_values;
pub mod merge_tree;
pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
@@ -82,9 +84,12 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the id of this memtable.
fn id(&self) -> MemtableId;
/// Write key values into the memtable.
/// Writes key values into the memtable.
fn write(&self, kvs: &KeyValues) -> Result<()>;
/// Writes one key value pair into the memtable.
fn write_one(&self, key_value: KeyValue) -> Result<()>;
/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.

View File

@@ -71,7 +71,7 @@ impl KeyValues {
/// Primary key columns have the same order as region's primary key. Field
/// columns are ordered by their position in the region schema (The same order
/// as users defined while creating the region).
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct KeyValue<'a> {
row: &'a Row,
schema: &'a Vec<ColumnSchema>,

View File

@@ -36,6 +36,7 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::tree::MergeTree;
use crate::memtable::{
@@ -127,6 +128,17 @@ impl Memtable for MergeTreeMemtable {
res
}
fn write_one(&self, key_value: KeyValue) -> Result<()> {
let mut metrics = WriteMetrics::default();
let mut pk_buffer = Vec::new();
// Ensures the memtable always updates stats.
let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
self.update_stats(&metrics);
res
}
fn iter(
&self,
projection: Option<&[ColumnId]>,
@@ -290,16 +302,14 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use datatypes::vectors::Int64Vector;
use super::*;
use crate::test_util::memtable_util;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
#[test]
fn test_memtable_sorted_input() {
@@ -322,23 +332,10 @@ mod tests {
let expected_ts = kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<BTreeSet<_>>();
.collect::<Vec<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<BTreeSet<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(expected_ts, read);
let stats = memtable.stats();
@@ -386,20 +383,7 @@ mod tests {
memtable.write(&kvs).unwrap();
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
let iter = memtable.iter(None, None).unwrap();
@@ -514,20 +498,7 @@ mod tests {
let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(expect, read);
}
@@ -564,20 +535,7 @@ mod tests {
let iter = memtable
.iter(None, Some(Predicate::new(vec![expr.into()])))
.unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(timestamps, read);
}
}

View File

@@ -148,6 +148,54 @@ impl MergeTree {
Ok(())
}
/// Write one key value pair into the tree.
///
/// # Panics
/// Panics if the tree is immutable (frozen).
pub fn write_one(
&self,
kv: KeyValue,
pk_buffer: &mut Vec<u8>,
metrics: &mut WriteMetrics,
) -> Result<()> {
let has_pk = !self.metadata.primary_key.is_empty();
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
metrics.max_ts = metrics.max_ts.max(ts);
metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
if !has_pk {
// No primary key.
return self.write_no_key(kv);
}
// Encode primary key.
pk_buffer.clear();
if self.is_partitioned {
// Use sparse encoder for metric engine.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
} else {
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
}
// Write rows with
self.write_with_key(pk_buffer, kv, metrics)?;
metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
Ok(())
}
/// Scans the tree.
pub fn read(
&self,

View File

@@ -0,0 +1,551 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Partitions memtables by time.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_telemetry::debug;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use snafu::OptionExt;
use store_api::metadata::RegionMetadataRef;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
/// A partition holds rows with timestamps between `[min, max)`.
#[derive(Debug, Clone)]
pub struct TimePartition {
/// Memtable of the partition.
memtable: MemtableRef,
/// Time range of the partition. `None` means there is no time range. The time
/// range is `None` if and only if the [TimePartitions::part_duration] is `None`.
time_range: Option<PartTimeRange>,
}
impl TimePartition {
/// Returns whether the `ts` belongs to the partition.
fn contains_timestamp(&self, ts: Timestamp) -> bool {
let Some(range) = self.time_range else {
return true;
};
range.contains_timestamp(ts)
}
/// Write rows to the part.
fn write(&self, kvs: &KeyValues) -> Result<()> {
self.memtable.write(kvs)
}
}
type PartitionVec = SmallVec<[TimePartition; 2]>;
/// Partitions.
#[derive(Debug)]
pub struct TimePartitions {
/// Mutable data of partitions.
inner: Mutex<PartitionsInner>,
/// Duration of a partition.
///
/// `None` means there is only one partition and the [TimePartition::time_range] is
/// also `None`.
part_duration: Option<Duration>,
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Builder of memtables.
builder: MemtableBuilderRef,
}
pub type TimePartitionsRef = Arc<TimePartitions>;
impl TimePartitions {
/// Returns a new empty partition list with optional duration.
pub fn new(
metadata: RegionMetadataRef,
builder: MemtableBuilderRef,
next_memtable_id: MemtableId,
part_duration: Option<Duration>,
) -> Self {
let mut inner = PartitionsInner::new(next_memtable_id);
if part_duration.is_none() {
// If `part_duration` is None, then we create a partition with `None` time
// range so we will write all rows to that partition.
let memtable = builder.build(inner.alloc_memtable_id(), &metadata);
debug!(
"Creates a time partition for all timestamps, region: {}, memtable_id: {}",
metadata.region_id,
memtable.id(),
);
let part = TimePartition {
memtable,
time_range: None,
};
inner.parts.push(part);
}
Self {
inner: Mutex::new(inner),
part_duration,
metadata,
builder,
}
}
/// Write key values to memtables.
///
/// It creates new partitions if necessary.
pub fn write(&self, kvs: &KeyValues) -> Result<()> {
// Get all parts.
let parts = self.list_partitions();
// Checks whether all rows belongs to a single part. Checks in reverse order as we usually
// put to latest part.
for part in parts.iter().rev() {
let mut all_in_partition = true;
for kv in kvs.iter() {
// Safety: We checked the schema in the write request.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
if !part.contains_timestamp(ts) {
all_in_partition = false;
break;
}
}
if !all_in_partition {
continue;
}
// We can write all rows to this part.
return part.write(kvs);
}
// Slow path: We have to split kvs by partitions.
self.write_multi_parts(kvs, &parts)
}
/// Append memtables in partitions to `memtables`.
pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
let inner = self.inner.lock().unwrap();
memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
}
/// Returns the number of partitions.
pub fn num_partitions(&self) -> usize {
let inner = self.inner.lock().unwrap();
inner.parts.len()
}
/// Returns true if all memtables are empty.
pub fn is_empty(&self) -> bool {
let inner = self.inner.lock().unwrap();
inner.parts.iter().all(|part| part.memtable.is_empty())
}
/// Freezes all memtables.
pub fn freeze(&self) -> Result<()> {
let inner = self.inner.lock().unwrap();
for part in &*inner.parts {
part.memtable.freeze()?;
}
Ok(())
}
/// Forks latest partition.
pub fn fork(&self, metadata: &RegionMetadataRef) -> Self {
let mut inner = self.inner.lock().unwrap();
let latest_part = inner
.parts
.iter()
.max_by_key(|part| part.time_range.map(|range| range.min_timestamp))
.cloned();
let Some(old_part) = latest_part else {
return Self::new(
metadata.clone(),
self.builder.clone(),
inner.next_memtable_id,
self.part_duration,
);
};
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
let new_part = TimePartition {
memtable,
time_range: old_part.time_range,
};
Self {
inner: Mutex::new(PartitionsInner::with_partition(
new_part,
inner.next_memtable_id,
)),
part_duration: self.part_duration,
metadata: metadata.clone(),
builder: self.builder.clone(),
}
}
/// Returns partition duration.
pub(crate) fn part_duration(&self) -> Option<Duration> {
self.part_duration
}
/// Returns memory usage.
pub(crate) fn memory_usage(&self) -> usize {
let inner = self.inner.lock().unwrap();
inner
.parts
.iter()
.map(|part| part.memtable.stats().estimated_bytes)
.sum()
}
/// Append memtables in partitions to small vec.
pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
let inner = self.inner.lock().unwrap();
memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
}
/// Returns the next memtable id.
pub(crate) fn next_memtable_id(&self) -> MemtableId {
let inner = self.inner.lock().unwrap();
inner.next_memtable_id
}
/// Returns all partitions.
fn list_partitions(&self) -> PartitionVec {
let inner = self.inner.lock().unwrap();
inner.parts.clone()
}
/// Write to multiple partitions.
fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
// If part duration is `None` then there is always one partition and all rows
// will be put in that partition before invoking this method.
debug_assert!(self.part_duration.is_some());
let mut parts_to_write = HashMap::new();
let mut missing_parts = HashMap::new();
for kv in kvs.iter() {
let mut part_found = false;
// Safety: We used the timestamp before.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
for part in parts {
if part.contains_timestamp(ts) {
// Safety: Since part duration is `Some` so all time range should be `Some`.
parts_to_write
.entry(part.time_range.unwrap().min_timestamp)
.or_insert_with(|| PartitionToWrite {
partition: part.clone(),
key_values: Vec::new(),
})
.key_values
.push(kv);
part_found = true;
break;
}
}
if !part_found {
// We need to write it to a new part.
// Safety: `new()` ensures duration is always Some if we do to this method.
let part_duration = self.part_duration.unwrap();
let part_start =
partition_start_timestamp(ts, part_duration).with_context(|| {
InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"timestamp {ts:?} and bucket {part_duration:?} are out of range"
),
}
})?;
missing_parts
.entry(part_start)
.or_insert_with(Vec::new)
.push(kv);
}
}
// Writes rows to existing parts.
for part_to_write in parts_to_write.into_values() {
for kv in part_to_write.key_values {
part_to_write.partition.memtable.write_one(kv)?;
}
}
let part_duration = self.part_duration.unwrap();
// Creates new parts and writes to them. Acquires the lock to avoid others create
// the same partition.
let mut inner = self.inner.lock().unwrap();
for (part_start, key_values) in missing_parts {
let part_pos = match inner
.parts
.iter()
.position(|part| part.time_range.unwrap().min_timestamp == part_start)
{
Some(pos) => pos,
None => {
let range = PartTimeRange::from_start_duration(part_start, part_duration)
.with_context(|| InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
),
})?;
let memtable = self
.builder
.build(inner.alloc_memtable_id(), &self.metadata);
debug!(
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
range,
self.metadata.region_id,
part_duration,
memtable.id(),
inner.parts.len() + 1
);
let pos = inner.parts.len();
inner.parts.push(TimePartition {
memtable,
time_range: Some(range),
});
pos
}
};
let memtable = &inner.parts[part_pos].memtable;
for kv in key_values {
memtable.write_one(kv)?;
}
}
Ok(())
}
}
/// Computes the start timestamp of the partition for `ts`.
///
/// It always use bucket size in seconds which should fit all timestamp resolution.
fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
// Safety: We convert it to seconds so it never returns `None`.
let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
start_sec.convert_to(ts.unit())
}
#[derive(Debug)]
struct PartitionsInner {
/// All partitions.
parts: PartitionVec,
/// Next memtable id.
next_memtable_id: MemtableId,
}
impl PartitionsInner {
fn new(next_memtable_id: MemtableId) -> Self {
Self {
parts: Default::default(),
next_memtable_id,
}
}
fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
Self {
parts: smallvec![part],
next_memtable_id,
}
}
fn alloc_memtable_id(&mut self) -> MemtableId {
let id = self.next_memtable_id;
self.next_memtable_id += 1;
id
}
}
/// Time range of a partition.
#[derive(Debug, Clone, Copy)]
struct PartTimeRange {
/// Inclusive min timestamp of rows in the partition.
min_timestamp: Timestamp,
/// Exclusive max timestamp of rows in the partition.
max_timestamp: Timestamp,
}
impl PartTimeRange {
fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
let start_sec = start.convert_to(TimeUnit::Second)?;
let end_sec = start_sec.add_duration(duration).ok()?;
let min_timestamp = start_sec.convert_to(start.unit())?;
let max_timestamp = end_sec.convert_to(start.unit())?;
Some(Self {
min_timestamp,
max_timestamp,
})
}
/// Returns whether the `ts` belongs to the partition.
fn contains_timestamp(&self, ts: Timestamp) -> bool {
self.min_timestamp <= ts && ts < self.max_timestamp
}
}
struct PartitionToWrite<'a> {
partition: TimePartition,
key_values: Vec<KeyValue<'a>>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
#[test]
fn test_no_duration() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
assert_eq!(1, partitions.num_partitions());
assert!(partitions.is_empty());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[1000, 3000, 7000, 5000, 6000],
0, // sequence 0, 1, 2, 3, 4
);
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
assert!(!partitions.is_empty());
let mut memtables = Vec::new();
partitions.list_memtables(&mut memtables);
let iter = memtables[0].iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
}
#[test]
fn test_write_single_part() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
assert_eq!(0, partitions.num_partitions());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[5000, 2000, 0],
0, // sequence 0, 1, 2
);
// It should creates a new partition.
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[3000, 7000, 4000],
3, // sequence 3, 4, 5
);
// Still writes to the same partition.
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
let mut memtables = Vec::new();
partitions.list_memtables(&mut memtables);
let iter = memtables[0].iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
let parts = partitions.list_partitions();
assert_eq!(
Timestamp::new_millisecond(0),
parts[0].time_range.unwrap().min_timestamp
);
assert_eq!(
Timestamp::new_millisecond(10000),
parts[0].time_range.unwrap().max_timestamp
);
}
#[test]
fn test_write_multi_parts() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
assert_eq!(0, partitions.num_partitions());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[2000, 0],
0, // sequence 0, 1
);
// It should creates a new partition.
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[3000, 7000, 4000, 5000],
2, // sequence 2, 3, 4, 5
);
// Writes 2 rows to the old partition and 1 row to a new partition.
partitions.write(&kvs).unwrap();
assert_eq!(2, partitions.num_partitions());
let parts = partitions.list_partitions();
let iter = parts[0].memtable.iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(
Timestamp::new_millisecond(0),
parts[0].time_range.unwrap().min_timestamp
);
assert_eq!(
Timestamp::new_millisecond(5000),
parts[0].time_range.unwrap().max_timestamp
);
assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
let iter = parts[1].memtable.iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[5000, 7000], &timestamps[..]);
assert_eq!(
Timestamp::new_millisecond(5000),
parts[1].time_range.unwrap().min_timestamp
);
assert_eq!(
Timestamp::new_millisecond(10000),
parts[1].time_range.unwrap().max_timestamp
);
}
}

View File

@@ -38,6 +38,7 @@ use table::predicate::Predicate;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
@@ -110,49 +111,75 @@ impl TimeSeriesMemtable {
}
/// Updates memtable stats.
fn update_stats(&self, request_size: usize, min: i64, max: i64) {
self.alloc_tracker.on_allocation(request_size);
fn update_stats(&self, stats: LocalStats) {
self.alloc_tracker.on_allocation(stats.allocated);
loop {
let current_min = self.min_timestamp.load(Ordering::Relaxed);
if min >= current_min {
if stats.min_ts >= current_min {
break;
}
let Err(updated) = self.min_timestamp.compare_exchange(
current_min,
min,
stats.min_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == min {
if updated == stats.min_ts {
break;
}
}
loop {
let current_max = self.max_timestamp.load(Ordering::Relaxed);
if max <= current_max {
if stats.max_ts <= current_max {
break;
}
let Err(updated) = self.max_timestamp.compare_exchange(
current_max,
max,
stats.max_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == max {
if updated == stats.max_ts {
break;
}
}
}
fn write_key_value(&self, kv: KeyValue, stats: &mut LocalStats) -> Result<()> {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys()
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();
stats.allocated += fields.iter().map(|v| v.data_size()).sum::<usize>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
stats.allocated += series_allocated;
// safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
stats.min_ts = stats.min_ts.min(ts);
stats.max_ts = stats.max_ts.max(ts);
let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
Ok(())
}
}
impl Debug for TimeSeriesMemtable {
@@ -167,43 +194,30 @@ impl Memtable for TimeSeriesMemtable {
}
fn write(&self, kvs: &KeyValues) -> Result<()> {
let mut allocated = 0;
let mut min_ts = i64::MAX;
let mut max_ts = i64::MIN;
let mut local_stats = LocalStats::default();
for kv in kvs.iter() {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys()
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();
allocated += fields.iter().map(|v| v.data_size()).sum::<usize>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
allocated += series_allocated;
// safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
min_ts = min_ts.min(ts);
max_ts = max_ts.max(ts);
let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
self.write_key_value(kv, &mut local_stats)?;
}
allocated += kvs.num_rows() * std::mem::size_of::<Timestamp>();
allocated += kvs.num_rows() * std::mem::size_of::<OpType>();
local_stats.allocated += kvs.num_rows() * std::mem::size_of::<Timestamp>();
local_stats.allocated += kvs.num_rows() * std::mem::size_of::<OpType>();
// TODO(hl): this maybe inaccurate since for-iteration may return early.
// We may lift the primary key length check out of Memtable::write
// so that we can ensure writing to memtable will succeed.
self.update_stats(allocated, min_ts, max_ts);
self.update_stats(local_stats);
Ok(())
}
fn write_one(&self, key_value: KeyValue) -> Result<()> {
let mut local_stats = LocalStats::default();
let res = self.write_key_value(key_value, &mut local_stats);
local_stats.allocated += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
self.update_stats(local_stats);
res
}
fn iter(
&self,
projection: Option<&[ColumnId]>,
@@ -267,6 +281,22 @@ impl Memtable for TimeSeriesMemtable {
}
}
struct LocalStats {
allocated: usize,
min_ts: i64,
max_ts: i64,
}
impl Default for LocalStats {
fn default() -> Self {
LocalStats {
allocated: 0,
min_ts: i64::MAX,
max_ts: i64::MIN,
}
}
}
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
struct SeriesSet {

View File

@@ -20,26 +20,29 @@ use smallvec::SmallVec;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::time_partition::TimePartitionsRef;
use crate::memtable::{MemtableId, MemtableRef};
pub(crate) type SmallMemtableVec = SmallVec<[MemtableRef; 2]>;
/// A version of current memtables in a region.
#[derive(Debug, Clone)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
pub(crate) mutable: MemtableRef,
pub(crate) mutable: TimePartitionsRef,
/// Immutable memtables.
///
/// We only allow one flush job per region but if a flush job failed, then we
/// might need to store more than one immutable memtable on the next time we
/// flush the region.
immutables: SmallVec<[MemtableRef; 2]>,
immutables: SmallMemtableVec,
}
pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
impl MemtableVersion {
/// Returns a new [MemtableVersion] with specific mutable memtable.
pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion {
pub(crate) fn new(mutable: TimePartitionsRef) -> MemtableVersion {
MemtableVersion {
mutable,
immutables: SmallVec::new(),
@@ -53,8 +56,8 @@ impl MemtableVersion {
/// Lists mutable and immutable memtables.
pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
let mut mems = Vec::with_capacity(self.immutables.len() + 1);
mems.push(self.mutable.clone());
let mut mems = Vec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
self.mutable.list_memtables(&mut mems);
mems.extend_from_slice(&self.immutables);
mems
}
@@ -76,15 +79,13 @@ impl MemtableVersion {
// soft limit.
self.mutable.freeze()?;
// Fork the memtable.
let mutable = self.mutable.fork(self.next_memtable_id(), metadata);
let mutable = Arc::new(self.mutable.fork(metadata));
// Pushes the mutable memtable to immutable list.
let immutables = self
.immutables
.iter()
.cloned()
.chain([self.mutable.clone()])
.collect();
let mut immutables =
SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
self.mutable.list_memtables_to_small_vec(&mut immutables);
immutables.extend(self.immutables.iter().cloned());
Ok(Some(MemtableVersion {
mutable,
immutables,
@@ -103,7 +104,7 @@ impl MemtableVersion {
/// Returns the memory usage of the mutable memtable.
pub(crate) fn mutable_usage(&self) -> usize {
self.mutable.stats().estimated_bytes
self.mutable.memory_usage()
}
/// Returns the memory usage of the immutable memtables.
@@ -121,9 +122,4 @@ impl MemtableVersion {
pub(crate) fn is_empty(&self) -> bool {
self.mutable.is_empty() && self.immutables.is_empty()
}
/// Returns the next memtable id.
pub(crate) fn next_memtable_id(&self) -> MemtableId {
self.mutable.id() + 1
}
}

View File

@@ -37,6 +37,7 @@ use crate::error::{
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderRef;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
@@ -169,7 +170,13 @@ impl RegionOpener {
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
// Initial memtable id is 0.
let mutable = self.memtable_builder.build(0, &metadata);
let part_duration = options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
self.memtable_builder,
0,
part_duration,
));
debug!("Create region {} with options: {:?}", region_id, options);
@@ -265,7 +272,13 @@ impl RegionOpener {
self.cache_manager.clone(),
));
// Initial memtable id is 0.
let mutable = self.memtable_builder.build(0, &metadata);
let part_duration = region_options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
self.memtable_builder.clone(),
0,
part_duration,
));
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)

View File

@@ -94,6 +94,14 @@ pub enum CompactionOptions {
Twcs(TwcsOptions),
}
impl CompactionOptions {
pub(crate) fn time_window(&self) -> Option<Duration> {
match self {
CompactionOptions::Twcs(opts) => opts.time_window,
}
}
}
impl Default for CompactionOptions {
fn default() -> Self {
Self::Twcs(TwcsOptions::default())

View File

@@ -31,8 +31,9 @@ use store_api::storage::SequenceNumber;
use crate::error::Result;
use crate::manifest::action::RegionEdit;
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
use crate::memtable::{MemtableBuilderRef, MemtableId};
use crate::region::options::RegionOptions;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::FilePurgerRef;
@@ -122,8 +123,14 @@ impl VersionControl {
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
let version = self.current().version;
let new_mutable =
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
next_memtable_id,
part_duration,
));
let mut data = self.data.write().unwrap();
data.is_dropped = true;
@@ -140,7 +147,14 @@ impl VersionControl {
/// new schema. Memtables of the version must be empty.
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
let version = self.current().version;
let new_mutable = builder.build(version.memtables.next_memtable_id(), &metadata);
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
metadata.clone(),
builder.clone(),
next_memtable_id,
part_duration,
));
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
@@ -163,8 +177,14 @@ impl VersionControl {
) {
let version = self.current().version;
let new_mutable =
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
next_memtable_id,
part_duration,
));
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(truncated_entry_id)
@@ -242,7 +262,7 @@ pub(crate) struct VersionBuilder {
impl VersionBuilder {
/// Returns a new builder.
pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> Self {
pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
VersionBuilder {
metadata,
memtables: Arc::new(MemtableVersion::new(mutable)),

View File

@@ -21,7 +21,9 @@ use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use datatypes::arrow::array::UInt64Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use table::predicate::Predicate;
@@ -58,6 +60,10 @@ impl Memtable for EmptyMemtable {
Ok(())
}
fn write_one(&self, _key_value: KeyValue) -> Result<()> {
Ok(())
}
fn iter(
&self,
_projection: Option<&[ColumnId]>,
@@ -303,3 +309,20 @@ pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec<u8> {
]);
row_codec.encode(key_value.primary_keys()).unwrap()
}
/// Collects timestamps from the batch iter.
pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec<i64> {
iter.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect()
}

View File

@@ -25,7 +25,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}
use store_api::storage::RegionId;
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableBuilder;
use crate::memtable::time_partition::TimePartitions;
use crate::region::version::{Version, VersionBuilder, VersionControl};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
@@ -101,7 +101,12 @@ impl VersionControlBuilder {
pub(crate) fn build_version(&self) -> Version {
let metadata = Arc::new(self.metadata.clone());
let mutable = self.memtable_builder.build(0, &metadata);
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
self.memtable_builder.clone(),
0,
None,
));
VersionBuilder::new(metadata, mutable)
.add_files(self.file_purger.clone(), self.files.values().cloned())
.build()