mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
feat: update partition duration of memtable using compaction window (#5197)
* feat: update partition duration of memtable using compaction window * chore: only use provided duration if it is not None * test: more tests * test: test compaction apply window * style: fix clippy
This commit is contained in:
@@ -374,3 +374,90 @@ async fn test_readonly_during_compaction() {
|
||||
let vec = collect_stream_ts(stream).await;
|
||||
assert_eq!((0..20).map(|v| v * 1000).collect::<Vec<_>>(), vec);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_update_time_window() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_active_window_files", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_files", "2")
|
||||
.build();
|
||||
|
||||
let column_schemas = request
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(column_metadata_to_column_schema)
|
||||
.collect::<Vec<_>>();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
// Flush 3 SSTs for compaction.
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(0, scanner.num_memtables());
|
||||
// We keep at most two files.
|
||||
assert_eq!(
|
||||
2,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
|
||||
// Flush a new SST and the time window is applied.
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
|
||||
|
||||
// Puts window 7200.
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 3600, 4000, 0),
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(1, scanner.num_memtables());
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let vec = collect_stream_ts(stream).await;
|
||||
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);
|
||||
|
||||
// Puts window 3600.
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 2400, 3600, 0),
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(2, scanner.num_memtables());
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let vec = collect_stream_ts(stream).await;
|
||||
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ impl Default for MemtableConfig {
|
||||
pub struct MemtableStats {
|
||||
/// The estimated bytes allocated by this memtable from heap.
|
||||
estimated_bytes: usize,
|
||||
/// The time range that this memtable contains. It is None if
|
||||
/// The inclusive time range that this memtable contains. It is None if
|
||||
/// and only if the memtable is empty.
|
||||
time_range: Option<(Timestamp, Timestamp)>,
|
||||
/// Total rows in memtable
|
||||
|
||||
@@ -168,8 +168,11 @@ impl TimePartitions {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Forks latest partition.
|
||||
pub fn fork(&self, metadata: &RegionMetadataRef) -> Self {
|
||||
/// Forks latest partition and updates the partition duration if `part_duration` is Some.
|
||||
pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
|
||||
// Fall back to the existing partition duration.
|
||||
let part_duration = part_duration.or(self.part_duration);
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let latest_part = inner
|
||||
.parts
|
||||
@@ -178,24 +181,39 @@ impl TimePartitions {
|
||||
.cloned();
|
||||
|
||||
let Some(old_part) = latest_part else {
|
||||
// If there is no partition, then we create a new partition with the new duration.
|
||||
return Self::new(
|
||||
metadata.clone(),
|
||||
self.builder.clone(),
|
||||
inner.next_memtable_id,
|
||||
self.part_duration,
|
||||
part_duration,
|
||||
);
|
||||
};
|
||||
|
||||
let old_stats = old_part.memtable.stats();
|
||||
// Use the max timestamp to compute the new time range for the memtable.
|
||||
// If `part_duration` is None, the new range will be None.
|
||||
let new_time_range =
|
||||
old_stats
|
||||
.time_range()
|
||||
.zip(part_duration)
|
||||
.and_then(|(range, bucket)| {
|
||||
partition_start_timestamp(range.1, bucket)
|
||||
.and_then(|start| PartTimeRange::from_start_duration(start, bucket))
|
||||
});
|
||||
// Forks the latest partition, but compute the time range based on the new duration.
|
||||
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
|
||||
let new_part = TimePartition {
|
||||
memtable,
|
||||
time_range: old_part.time_range,
|
||||
time_range: new_time_range,
|
||||
};
|
||||
|
||||
Self {
|
||||
inner: Mutex::new(PartitionsInner::with_partition(
|
||||
new_part,
|
||||
inner.next_memtable_id,
|
||||
)),
|
||||
part_duration: self.part_duration,
|
||||
part_duration,
|
||||
metadata: metadata.clone(),
|
||||
builder: self.builder.clone(),
|
||||
}
|
||||
@@ -238,6 +256,19 @@ impl TimePartitions {
|
||||
inner.next_memtable_id
|
||||
}
|
||||
|
||||
/// Creates a new empty partition list from this list and a `part_duration`.
|
||||
/// It falls back to the old partition duration if `part_duration` is `None`.
|
||||
pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
|
||||
debug_assert!(self.is_empty());
|
||||
|
||||
Self::new(
|
||||
self.metadata.clone(),
|
||||
self.builder.clone(),
|
||||
self.next_memtable_id(),
|
||||
part_duration.or(self.part_duration),
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns all partitions.
|
||||
fn list_partitions(&self) -> PartitionVec {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
@@ -447,9 +478,9 @@ mod tests {
|
||||
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
assert!(!partitions.is_empty());
|
||||
assert!(!partitions.is_empty());
|
||||
let mut memtables = Vec::new();
|
||||
partitions.list_memtables(&mut memtables);
|
||||
assert_eq!(0, memtables[0].id());
|
||||
|
||||
let iter = memtables[0].iter(None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
@@ -503,16 +534,14 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_multi_parts() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions =
|
||||
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
|
||||
assert_eq!(0, partitions.num_partitions());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
&metadata,
|
||||
metadata,
|
||||
"hello".to_string(),
|
||||
0,
|
||||
&[2000, 0],
|
||||
@@ -524,7 +553,7 @@ mod tests {
|
||||
assert!(!partitions.is_empty());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
&metadata,
|
||||
metadata,
|
||||
"hello".to_string(),
|
||||
0,
|
||||
&[3000, 7000, 4000, 5000],
|
||||
@@ -534,9 +563,18 @@ mod tests {
|
||||
partitions.write(&kvs).unwrap();
|
||||
assert_eq!(2, partitions.num_partitions());
|
||||
|
||||
partitions
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_multi_parts() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let partitions = new_multi_partitions(&metadata);
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
let iter = parts[0].memtable.iter(None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(0, parts[0].memtable.id());
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(0),
|
||||
parts[0].time_range.unwrap().min_timestamp
|
||||
@@ -547,6 +585,7 @@ mod tests {
|
||||
);
|
||||
assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
|
||||
let iter = parts[1].memtable.iter(None, None).unwrap();
|
||||
assert_eq!(1, parts[1].memtable.id());
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[5000, 7000], ×tamps[..]);
|
||||
assert_eq!(
|
||||
@@ -558,4 +597,85 @@ mod tests {
|
||||
parts[1].time_range.unwrap().max_timestamp
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_with_part_duration() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
|
||||
|
||||
let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
|
||||
// Won't update the duration if it's None.
|
||||
let new_parts = new_parts.new_with_part_duration(None);
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
// Don't need to create new memtables.
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
|
||||
let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
// Don't need to create new memtables.
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
|
||||
// Need to build a new memtable as duration is still None.
|
||||
let new_parts = partitions.new_with_part_duration(None);
|
||||
assert!(new_parts.part_duration().is_none());
|
||||
assert_eq!(2, new_parts.next_memtable_id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fork_empty() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
|
||||
partitions.freeze().unwrap();
|
||||
let new_parts = partitions.fork(&metadata, None);
|
||||
assert!(new_parts.part_duration().is_none());
|
||||
assert_eq!(1, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(2, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(3, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, None);
|
||||
// Won't update the duration.
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(4, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(4, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(5, new_parts.next_memtable_id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fork_non_empty_none() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let partitions = new_multi_partitions(&metadata);
|
||||
partitions.freeze().unwrap();
|
||||
|
||||
// Won't update the duration.
|
||||
let new_parts = partitions.fork(&metadata, None);
|
||||
assert!(new_parts.is_empty());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(3, new_parts.next_memtable_id());
|
||||
|
||||
// Although we don't fork a memtable multiple times, we still add a test for it.
|
||||
let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
|
||||
assert!(new_parts.is_empty());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(4, new_parts.next_memtable_id());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Memtable version.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use smallvec::SmallVec;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -65,27 +66,53 @@ impl MemtableVersion {
|
||||
/// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable
|
||||
/// memtable.
|
||||
///
|
||||
/// It will switch to use the `time_window` provided.
|
||||
///
|
||||
/// Returns `None` if the mutable memtable is empty.
|
||||
pub(crate) fn freeze_mutable(
|
||||
&self,
|
||||
metadata: &RegionMetadataRef,
|
||||
time_window: Option<Duration>,
|
||||
) -> Result<Option<MemtableVersion>> {
|
||||
if self.mutable.is_empty() {
|
||||
// No need to freeze the mutable memtable.
|
||||
return Ok(None);
|
||||
// No need to freeze the mutable memtable, but we need to check the time window.
|
||||
if self.mutable.part_duration() == time_window {
|
||||
// If the time window is the same, we don't need to update it.
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Update the time window.
|
||||
let mutable = self.mutable.new_with_part_duration(time_window);
|
||||
common_telemetry::debug!(
|
||||
"Freeze empty memtable, update partition duration from {:?} to {:?}",
|
||||
self.mutable.part_duration(),
|
||||
time_window
|
||||
);
|
||||
return Ok(Some(MemtableVersion {
|
||||
mutable: Arc::new(mutable),
|
||||
immutables: self.immutables.clone(),
|
||||
}));
|
||||
}
|
||||
|
||||
// Marks the mutable memtable as immutable so it can free the memory usage from our
|
||||
// soft limit.
|
||||
self.mutable.freeze()?;
|
||||
// Fork the memtable.
|
||||
let mutable = Arc::new(self.mutable.fork(metadata));
|
||||
if self.mutable.part_duration() != time_window {
|
||||
common_telemetry::debug!(
|
||||
"Fork memtable, update partition duration from {:?}, to {:?}",
|
||||
self.mutable.part_duration(),
|
||||
time_window
|
||||
);
|
||||
}
|
||||
let mutable = Arc::new(self.mutable.fork(metadata, time_window));
|
||||
|
||||
// Pushes the mutable memtable to immutable list.
|
||||
let mut immutables =
|
||||
SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
|
||||
self.mutable.list_memtables_to_small_vec(&mut immutables);
|
||||
immutables.extend(self.immutables.iter().cloned());
|
||||
// Pushes the mutable memtable to immutable list.
|
||||
self.mutable.list_memtables_to_small_vec(&mut immutables);
|
||||
|
||||
Ok(Some(MemtableVersion {
|
||||
mutable,
|
||||
immutables,
|
||||
|
||||
@@ -80,8 +80,12 @@ impl VersionControl {
|
||||
/// Freezes the mutable memtable if it is not empty.
|
||||
pub(crate) fn freeze_mutable(&self) -> Result<()> {
|
||||
let version = self.current().version;
|
||||
let time_window = version.compaction_time_window;
|
||||
|
||||
let Some(new_memtables) = version.memtables.freeze_mutable(&version.metadata)? else {
|
||||
let Some(new_memtables) = version
|
||||
.memtables
|
||||
.freeze_mutable(&version.metadata, time_window)?
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user