From e0384a7d468512e1e097cd8561c4963b389e00b3 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sat, 18 Jan 2025 22:53:56 +0800 Subject: [PATCH] feat: overwrites inferred compaction window by region options (#5396) * feat: use time window in compaction options for compaction window * test: add tests for overwriting options * chore: typo * chore: fix a grammar issue in log --- src/mito2/src/engine/compaction_test.rs | 222 +++++++++++++++++++++++- src/mito2/src/region/version.rs | 25 ++- 2 files changed, 244 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 49b18c0ca5..5e5cb0de75 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -12,16 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; +use std::time::Duration; use api::v1::{ColumnSchema, Rows}; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datatypes::prelude::ScalarVector; use datatypes::vectors::TimestampMillisecondVector; use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_request::AlterKind::SetRegionOptions; use store_api::region_request::{ - RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest, + RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, + RegionOpenRequest, RegionRequest, SetRegionOption, }; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::Notify; @@ -466,3 +470,219 @@ async fn test_compaction_update_time_window() { let vec = collect_stream_ts(stream).await; assert_eq!((0..4000).map(|v| v * 1000).collect::>(), vec); } + +#[tokio::test] +async fn test_change_region_compaction_window() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "1") + .insert_option("compaction.twcs.max_active_window_files", "1") + .insert_option("compaction.twcs.max_inactive_window_runs", "1") + .insert_option("compaction.twcs.max_inactive_window_files", "1") + .build(); + let region_dir = request.region_dir.clone(); + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 2 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 + + engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + + // Put window 7200 + put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600 + + // Check compaction window. + let region = engine.get_region(region_id).unwrap(); + { + let version = region.version(); + assert_eq!( + Some(Duration::from_secs(3600)), + version.compaction_time_window, + ); + assert!(version.options.compaction.time_window().is_none()); + } + + // Change compaction window. + let request = RegionRequest::Alter(RegionAlterRequest { + schema_version: region.metadata().schema_version, + kind: SetRegionOptions { + options: vec![SetRegionOption::Twsc( + "compaction.twcs.time_window".to_string(), + "2h".to_string(), + )], + }, + }); + engine.handle_request(region_id, request).await.unwrap(); + + // Compaction again. It should compacts window 3600 and 7200 + // into 7200. + engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + // Check compaction window. + { + let region = engine.get_region(region_id).unwrap(); + let version = region.version(); + assert_eq!( + Some(Duration::from_secs(7200)), + version.compaction_time_window, + ); + assert_eq!( + Some(Duration::from_secs(7200)), + version.options.compaction.time_window() + ); + } + + // Reopen region. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: Default::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + // Check compaction window. + { + let region = engine.get_region(region_id).unwrap(); + let version = region.version(); + assert_eq!( + Some(Duration::from_secs(7200)), + version.compaction_time_window, + ); + // We open the region without options, so the time window should be None. + assert!(version.options.compaction.time_window().is_none()); + } +} + +#[tokio::test] +async fn test_open_overwrite_compaction_window() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "1") + .insert_option("compaction.twcs.max_active_window_files", "1") + .insert_option("compaction.twcs.max_inactive_window_runs", "1") + .insert_option("compaction.twcs.max_inactive_window_files", "1") + .build(); + let region_dir = request.region_dir.clone(); + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 2 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 + + engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + + // Check compaction window. + { + let region = engine.get_region(region_id).unwrap(); + let version = region.version(); + assert_eq!( + Some(Duration::from_secs(3600)), + version.compaction_time_window, + ); + assert!(version.options.compaction.time_window().is_none()); + } + + // Reopen region. + let options = HashMap::from([ + ("compaction.type".to_string(), "twcs".to_string()), + ("compaction.twcs.time_window".to_string(), "2h".to_string()), + ]); + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options, + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + // Check compaction window. + { + let region = engine.get_region(region_id).unwrap(); + let version = region.version(); + assert_eq!( + Some(Duration::from_secs(7200)), + version.compaction_time_window, + ); + assert_eq!( + Some(Duration::from_secs(7200)), + version.options.compaction.time_window() + ); + } +} diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 188c314837..cc809f61a7 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -26,6 +26,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; +use common_telemetry::info; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; @@ -253,7 +254,10 @@ pub(crate) struct Version { /// /// Used to check if it is a flush task during the truncating table. pub(crate) truncated_entry_id: Option, - /// Inferred compaction time window. + /// Inferred compaction time window from flush. + /// + /// If compaction options contain a time window, it will overwrite this value + /// when creating a new version from the [VersionBuilder]. pub(crate) compaction_time_window: Option, /// Options of the region. pub(crate) options: RegionOptions, @@ -389,7 +393,24 @@ impl VersionBuilder { } /// Builds a new [Version] from the builder. + /// It overwrites the window size by compaction option. pub(crate) fn build(self) -> Version { + let compaction_time_window = self + .options + .compaction + .time_window() + .or(self.compaction_time_window); + if self.compaction_time_window.is_some() + && compaction_time_window != self.compaction_time_window + { + info!( + "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}", + self.compaction_time_window, + compaction_time_window, + self.metadata.region_id + ); + } + Version { metadata: self.metadata, memtables: self.memtables, @@ -397,7 +418,7 @@ impl VersionBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, truncated_entry_id: self.truncated_entry_id, - compaction_time_window: self.compaction_time_window, + compaction_time_window, options: self.options, } }