diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 2ce1ceeac9..dbdf80ee1d 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -452,7 +452,6 @@ impl MitoEngineInner { .write_buffer_size .map(|s| s.0 as usize), ttl: table_info.meta.options.ttl, - compaction_time_window: table_info.meta.options.compaction_time_window, }; debug!( @@ -532,7 +531,6 @@ impl MitoEngineInner { .write_buffer_size .map(|s| s.0 as usize), ttl: table_info.meta.options.ttl, - compaction_time_window: table_info.meta.options.compaction_time_window, }; // TODO(weny): Returns an error earlier if the target region does not exist in the meta. diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index f6b246eb40..563278430b 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -228,18 +228,15 @@ impl TableCreator { let table_options = &self.data.request.table_options; let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize); let ttl = table_options.ttl; - let compaction_time_window = table_options.compaction_time_window; let open_opts = OpenOptions { parent_dir: table_dir.to_string(), write_buffer_size, ttl, - compaction_time_window, }; let create_opts = CreateOptions { parent_dir: table_dir.to_string(), write_buffer_size, ttl, - compaction_time_window, }; let primary_key_indices = &self.data.request.primary_key_indices; @@ -285,7 +282,6 @@ impl TableCreator { .name(region_name.clone()) .row_key(row_key.clone()) .default_cf(default_cf.clone()) - .compaction_time_window(compaction_time_window) .build() .context(BuildRegionDescriptorSnafu { table_name: &self.data.request.table_name, diff --git a/src/query/src/sql/show.rs b/src/query/src/sql/show.rs index 51160e350b..8b28c77db1 100644 --- a/src/query/src/sql/show.rs +++ b/src/query/src/sql/show.rs @@ -71,10 +71,6 @@ fn create_sql_options(table_meta: &TableMeta) -> Vec { )); } - if let Some(w) = table_opts.compaction_time_window { - options.push(sql_option("compaction_time_window", number_value(w))); - } - for (k, v) in table_opts .extra_options .iter() diff --git a/src/storage/benches/memtable/util/regiondesc_util.rs b/src/storage/benches/memtable/util/regiondesc_util.rs index 3fb6510bfe..e7b77b92c2 100644 --- a/src/storage/benches/memtable/util/regiondesc_util.rs +++ b/src/storage/benches/memtable/util/regiondesc_util.rs @@ -62,7 +62,6 @@ impl RegionDescBuilder { row_key: self.key_builder.build().unwrap(), default_cf: self.default_cf_builder.build().unwrap(), extra_cfs: Vec::new(), - compaction_time_window: None, } } diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 3ed8841aef..06d15ab38c 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -120,6 +120,7 @@ impl Picker for SimplePicker { } let ctx = &PickerContext::with(req.compaction_time_window); + for level_num in 0..levels.level_num() { let level = levels.level(level_num as u8); let (compaction_time_window, outputs) = self.strategy.pick(ctx, level); @@ -130,8 +131,8 @@ impl Picker for SimplePicker { } debug!( - "Found SST files to compact {:?} on level: {}", - outputs, level_num + "Found SST files to compact {:?} on level: {}, compaction window: {:?}", + outputs, level_num, compaction_time_window, ); return Ok(Some(CompactionTaskImpl { schema: req.schema(), diff --git a/src/storage/src/compaction/strategy.rs b/src/storage/src/compaction/strategy.rs index 48ecce1fc6..0c95051307 100644 --- a/src/storage/src/compaction/strategy.rs +++ b/src/storage/src/compaction/strategy.rs @@ -47,19 +47,24 @@ impl Strategy for SimpleTimeWindowStrategy { if files.is_empty() { return (None, vec![]); } - let time_bucket = ctx - .compaction_time_window() - .unwrap_or_else(|| infer_time_bucket(&files)); - let buckets = calculate_time_buckets(time_bucket, &files); - debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets); + let time_window = ctx.compaction_time_window().unwrap_or_else(|| { + let inferred = infer_time_bucket(&files); + debug!( + "Compaction window is not present, inferring from files: {:?}", + inferred + ); + inferred + }); + let buckets = calculate_time_buckets(time_window, &files); + debug!("File bucket:{}, file groups: {:?}", time_window, buckets); ( - Some(time_bucket), + Some(time_window), buckets .into_iter() .map(|(bound, files)| CompactionOutput { output_level: 1, bucket_bound: bound, - bucket: time_bucket, + bucket: time_window, inputs: files, }) .collect(), diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 233cf1a2b7..18670327f4 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -102,7 +102,6 @@ impl CompactionTaskImpl { } /// Writes updated SST info into manifest. - // TODO(etolbakov): we are not persisting inferred compaction_time_window (#1083)[https://github.com/GreptimeTeam/greptimedb/pull/1083] async fn write_manifest_and_apply( &self, output: HashSet, @@ -116,6 +115,7 @@ impl CompactionTaskImpl { flushed_sequence: None, files_to_add: Vec::from_iter(output.into_iter()), files_to_remove: Vec::from_iter(input.into_iter()), + compaction_time_window: self.compaction_time_window, }; debug!( "Compacted region: {}, region edit: {:?}", @@ -151,7 +151,10 @@ impl CompactionTask for CompactionTaskImpl { let input_ids = compacted.iter().map(|f| f.file_id).collect::>(); let output_ids = output.iter().map(|f| f.file_id).collect::>(); - info!("Compacting SST files, input: {input_ids:?}, output: {output_ids:?}"); + info!( + "Compacting SST files, input: {:?}, output: {:?}, window: {:?}", + input_ids, output_ids, self.compaction_time_window + ); self.write_manifest_and_apply(output, compacted) .await .map_err(|e| { diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 87631612e3..3494332809 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -89,7 +89,7 @@ impl StorageEngine for EngineImpl { async fn drop_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> { region.drop_region().await?; - self.inner.remove_reigon(region.name()); + self.inner.remove_region(region.name()); Ok(()) } @@ -395,7 +395,6 @@ impl EngineInner { name, &self.config, opts.ttl, - opts.compaction_time_window, ) .await?; @@ -441,7 +440,6 @@ impl EngineInner { ®ion_name, &self.config, opts.ttl, - opts.compaction_time_window, ) .await?; @@ -462,7 +460,7 @@ impl EngineInner { self.regions.get_region(name) } - fn remove_reigon(&self, name: &str) { + fn remove_region(&self, name: &str) { self.regions.remove(name) } @@ -473,7 +471,6 @@ impl EngineInner { region_name: &str, config: &EngineConfig, region_ttl: Option, - compaction_time_window: Option, ) -> Result> { let parent_dir = util::normalize_dir(parent_dir); @@ -504,7 +501,6 @@ impl EngineInner { engine_config: self.config.clone(), file_purger: self.file_purger.clone(), ttl, - compaction_time_window, write_buffer_size: write_buffer_size .unwrap_or(self.config.region_write_buffer_size.as_bytes() as usize), }) diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 9dd9361914..8989f21d0f 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -312,6 +312,7 @@ impl FlushJob { flushed_sequence: Some(self.flush_sequence), files_to_add: file_metas.to_vec(), files_to_remove: Vec::default(), + compaction_time_window: None, }; self.writer diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index b361bc9329..c091b7d475 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -38,8 +38,6 @@ pub struct RawRegionMetadata { pub columns: RawColumnsMetadata, pub column_families: RawColumnFamiliesMetadata, pub version: VersionNumber, - /// Time window for compaction - pub compaction_time_window: Option, } /// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata). @@ -78,6 +76,7 @@ pub struct RegionEdit { pub flushed_sequence: Option, pub files_to_add: Vec, pub files_to_remove: Vec, + pub compaction_time_window: Option, } /// The region version checkpoint @@ -382,6 +381,7 @@ mod tests { flushed_sequence: Some(99), files_to_add: files.clone(), files_to_remove: vec![], + compaction_time_window: None, }, ); builder.apply_edit( @@ -391,6 +391,7 @@ mod tests { flushed_sequence: Some(100), files_to_add: vec![], files_to_remove: vec![files[0].clone()], + compaction_time_window: None, }, ); diff --git a/src/storage/src/manifest/test_utils.rs b/src/storage/src/manifest/test_utils.rs index f44c3e8b2f..6a13635f5f 100644 --- a/src/storage/src/manifest/test_utils.rs +++ b/src/storage/src/manifest/test_utils.rs @@ -71,5 +71,6 @@ pub fn build_region_edit( file_size: DEFAULT_TEST_FILE_SIZE, }) .collect(), + compaction_time_window: None, } } diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index 4b9a3489ad..e6c91bcd3a 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -194,8 +194,6 @@ pub struct RegionMetadata { pub columns: ColumnsMetadataRef, column_families: ColumnFamiliesMetadata, version: VersionNumber, - /// Time window for compaction - compaction_time_window: Option, } impl RegionMetadata { @@ -214,11 +212,6 @@ impl RegionMetadata { &self.schema } - #[inline] - pub fn compaction_time_window(&self) -> Option { - self.compaction_time_window - } - #[inline] pub fn user_schema(&self) -> &SchemaRef { self.schema.user_schema() @@ -320,8 +313,7 @@ impl RegionMetadata { let mut builder = RegionDescriptorBuilder::default() .id(self.id) .name(&self.name) - .row_key(row_key) - .compaction_time_window(self.compaction_time_window); + .row_key(row_key); for (cf_id, cf) in &self.column_families.id_to_cfs { let mut cf_builder = ColumnFamilyDescriptorBuilder::default() @@ -354,7 +346,6 @@ impl From<&RegionMetadata> for RawRegionMetadata { columns: RawColumnsMetadata::from(&*data.columns), column_families: RawColumnFamiliesMetadata::from(&data.column_families), version: data.version, - compaction_time_window: data.compaction_time_window, } } } @@ -373,7 +364,6 @@ impl TryFrom for RegionMetadata { columns, column_families: raw.column_families.into(), version: raw.version, - compaction_time_window: raw.compaction_time_window, }) } } @@ -631,7 +621,6 @@ impl TryFrom for RegionMetadataBuilder { .name(desc.name) .id(desc.id) .row_key(desc.row_key)? - .compaction_time_window(desc.compaction_time_window) .add_column_family(desc.default_cf)?; for cf in desc.extra_cfs { builder = builder.add_column_family(cf)?; @@ -778,7 +767,6 @@ struct RegionMetadataBuilder { columns_meta_builder: ColumnsMetadataBuilder, cfs_meta_builder: ColumnFamiliesMetadataBuilder, version: VersionNumber, - compaction_time_window: Option, } impl Default for RegionMetadataBuilder { @@ -795,7 +783,6 @@ impl RegionMetadataBuilder { columns_meta_builder: ColumnsMetadataBuilder::default(), cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(), version: Schema::INITIAL_VERSION, - compaction_time_window: None, } } @@ -820,11 +807,6 @@ impl RegionMetadataBuilder { Ok(self) } - fn compaction_time_window(mut self, compaction_time_window: Option) -> Self { - self.compaction_time_window = compaction_time_window; - self - } - fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result { let column_index_start = self.columns_meta_builder.columns.len(); let column_index_end = column_index_start + cf.columns.len(); @@ -855,7 +837,6 @@ impl RegionMetadataBuilder { columns, column_families: self.cfs_meta_builder.build(), version: self.version, - compaction_time_window: self.compaction_time_window, }) } } @@ -1047,7 +1028,6 @@ mod tests { .unwrap(); RegionMetadataBuilder::new() .name(TEST_REGION) - .compaction_time_window(None) .row_key(row_key) .unwrap() .add_column_family(cf) diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 0c21183290..a26ed012ce 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -160,11 +160,10 @@ pub struct StoreConfig { pub engine_config: Arc, pub file_purger: FilePurgerRef, pub ttl: Option, - pub compaction_time_window: Option, pub write_buffer_size: usize, } -pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); +pub type RecoveredMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); pub type RecoveredMetadataMap = BTreeMap; #[derive(Debug)] @@ -244,7 +243,6 @@ impl RegionImpl { store_config.memtable_builder, store_config.engine_config.clone(), store_config.ttl, - store_config.compaction_time_window, store_config.write_buffer_size, )), wal, @@ -264,7 +262,7 @@ impl RegionImpl { pub async fn open( name: String, store_config: StoreConfig, - opts: &OpenOptions, + _opts: &OpenOptions, ) -> Result>> { // Load version meta data from manifest. let (version, mut recovered_metadata) = match Self::recover_from_manifest( @@ -328,14 +326,11 @@ impl RegionImpl { version_control, last_flush_millis: AtomicI64::new(0), }); - let compaction_time_window = store_config - .compaction_time_window - .or(opts.compaction_time_window); + let writer = Arc::new(RegionWriter::new( store_config.memtable_builder, store_config.engine_config.clone(), store_config.ttl, - compaction_time_window, store_config.write_buffer_size, )); let writer_ctx = WriterContext { @@ -521,6 +516,7 @@ impl RegionImpl { flushed_sequence: e.flushed_sequence, manifest_version, max_memtable_id: None, + compaction_time_window: e.compaction_time_window, }; version.map(|mut v| { v.apply_edit(edit); diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 59dc935a3d..d5953e00fe 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -556,7 +556,6 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig, purge_handler: MockFilePurgeHandler, object_store: ObjectStore, + store_dir: String, + engine_config: EngineConfig, + flush_strategy: FlushStrategyRef, } impl CompactionTester { @@ -165,7 +168,7 @@ impl CompactionTester { store_dir, engine_config.clone(), purge_handler.clone(), - flush_strategy, + flush_strategy.clone(), s3_bucket, ) .await; @@ -174,6 +177,9 @@ impl CompactionTester { base: Some(FileTesterBase::with_region(region)), purge_handler, object_store, + store_dir: store_dir.to_string(), + engine_config, + flush_strategy, } } @@ -221,6 +227,42 @@ impl CompactionTester { self.object_store.remove_all("/").await.unwrap(); } + + async fn reopen(&mut self) -> Result { + // Close the old region. + if let Some(base) = self.base.take() { + base.close().await; + } + + // Reopen the region. + let object_store = new_object_store(&self.store_dir, None); + let (mut store_config, _) = config_util::new_store_config_with_object_store( + REGION_NAME, + &self.store_dir, + object_store.clone(), + ) + .await; + store_config.engine_config = Arc::new(self.engine_config.clone()); + store_config.flush_strategy = self.flush_strategy.clone(); + + let picker = SimplePicker::default(); + let handler = CompactionHandler::new(picker); + let config = SchedulerConfig::default(); + // Overwrite test compaction scheduler and file purger. + store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler)); + store_config.file_purger = Arc::new(LocalScheduler::new( + SchedulerConfig { + max_inflight_tasks: store_config.engine_config.max_purge_tasks, + }, + MockFilePurgeHandler::default(), + )); + + let Some(region) = RegionImpl::open(REGION_NAME.to_string(), store_config, &OpenOptions::default()).await? else { + return Ok(false); + }; + self.base = Some(FileTesterBase::with_region(region)); + Ok(true) + } } async fn compact_during_read(s3_bucket: Option) { @@ -290,3 +332,110 @@ async fn test_compact_during_read_on_s3() { } } } + +#[tokio::test] +async fn test_persist_region_compaction_time_window() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("put-delete-scan"); + let store_dir = dir.path().to_str().unwrap(); + let mut tester = CompactionTester::new( + store_dir, + EngineConfig { + max_files_in_l0: 100, + ..Default::default() + }, + // Disable auto-flush. + Arc::new(FlushSwitch::default()), + None, + ) + .await; + + // initially the time window is not present since no compaction ever happened. + assert_eq!( + None, + tester + .base + .as_ref() + .unwrap() + .region + .inner + .shared + .version_control + .current() + .ssts() + .compaction_time_window() + ); + + // write some data with one hour span + for idx in 0..10 { + tester + .put(&[(idx * 1000, Some(idx)), ((idx + 360) * 1000, Some(idx))]) + .await; + tester.flush(Some(true)).await; + } + + tester.compact().await; + // the inferred and persisted compaction time window should be 3600 seconds. + assert_eq!( + 3600, + tester + .base + .as_ref() + .unwrap() + .region + .inner + .shared + .version_control + .current() + .ssts() + .compaction_time_window() + .unwrap() + ); + + // try write data with a larger time window + for idx in 0..10 { + tester + .put(&[ + (idx * 1000, Some(idx)), + ((idx + 2 * 60 * 60) * 1000, Some(idx)), + ]) + .await; + tester.flush(Some(true)).await; + } + tester.compact().await; + + // but we won't changed persisted compaction window for now, so it remains unchanged. + assert_eq!( + 3600, + tester + .base + .as_ref() + .unwrap() + .region + .inner + .shared + .version_control + .current() + .ssts() + .compaction_time_window() + .unwrap() + ); + + let reopened = tester.reopen().await.unwrap(); + assert!(reopened); + assert_eq!( + 3600, + tester + .base + .as_ref() + .unwrap() + .region + .inner + .shared + .version_control + .current() + .ssts() + .compaction_time_window() + .unwrap() + ); +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index fcaae37216..9a85762c49 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -42,7 +42,7 @@ use crate::metadata::RegionMetadataRef; use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED}; use crate::proto::wal::WalHeader; use crate::region::{ - CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef, + CompactContext, RecoveredMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef, }; use crate::schema::compat::CompatWrite; use crate::sst::AccessLayerRef; @@ -72,7 +72,6 @@ impl RegionWriter { memtable_builder: MemtableBuilderRef, config: Arc, ttl: Option, - compaction_time_window: Option, write_buffer_size: usize, ) -> RegionWriter { RegionWriter { @@ -80,7 +79,6 @@ impl RegionWriter { memtable_builder, config, ttl, - compaction_time_window, write_buffer_size, )), version_mutex: Mutex::new(()), @@ -141,7 +139,7 @@ impl RegionWriter { let files_to_add = edit.files_to_add.clone(); let files_to_remove = edit.files_to_remove.clone(); let flushed_sequence = edit.flushed_sequence; - + let compaction_time_window = edit.compaction_time_window; // Persist the meta action. let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); action_list.set_prev_version(prev_version); @@ -158,6 +156,7 @@ impl RegionWriter { flushed_sequence, manifest_version, max_memtable_id, + compaction_time_window, }; // We could tolerate failure during persisting manifest version to the WAL, since it won't @@ -456,7 +455,6 @@ struct WriterInner { closed: bool, engine_config: Arc, ttl: Option, - compaction_time_window: Option, /// Size in bytes to freeze the mutable memtable. write_buffer_size: usize, } @@ -466,7 +464,6 @@ impl WriterInner { memtable_builder: MemtableBuilderRef, engine_config: Arc, ttl: Option, - compaction_time_window: Option, write_buffer_size: usize, ) -> WriterInner { WriterInner { @@ -475,7 +472,6 @@ impl WriterInner { engine_config, closed: false, ttl, - compaction_time_window, write_buffer_size, } } @@ -641,7 +637,7 @@ impl WriterInner { &self, writer_ctx: &WriterContext<'_, S>, sequence: SequenceNumber, - mut metadata: Option, + mut metadata: Option, version_control: &VersionControl, ) -> Result<()> { // It's safe to unwrap here, it's checked outside. @@ -776,7 +772,7 @@ impl WriterInner { manifest: ctx.manifest.clone(), engine_config: self.engine_config.clone(), ttl: self.ttl, - compaction_time_window: self.compaction_time_window, + compaction_time_window: current_version.ssts().compaction_time_window(), }; let flush_handle = ctx @@ -798,6 +794,12 @@ impl WriterInner { sst_write_buffer_size: ReadableSize, ) -> Result<()> { let region_id = writer_ctx.shared.id(); + let compaction_time_window = writer_ctx + .shared + .version_control + .current() + .ssts() + .compaction_time_window(); let mut compaction_request = CompactionRequestImpl { region_id, sst_layer: writer_ctx.sst_layer.clone(), @@ -806,7 +808,7 @@ impl WriterInner { manifest: writer_ctx.manifest.clone(), wal: writer_ctx.wal.clone(), ttl: self.ttl, - compaction_time_window: self.compaction_time_window, + compaction_time_window, sender: None, sst_write_buffer_size, }; diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 25f738678c..b7342f3fcf 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -65,12 +65,15 @@ pub struct LevelMetas { levels: LevelMetaVec, sst_layer: AccessLayerRef, file_purger: FilePurgerRef, + /// Compaction time window in seconds + compaction_time_window: Option, } impl std::fmt::Debug for LevelMetas { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LevelMetas") .field("levels", &self.levels) + .field("compaction_time_window", &self.compaction_time_window) .finish() } } @@ -82,6 +85,7 @@ impl LevelMetas { levels: new_level_meta_vec(), sst_layer, file_purger, + compaction_time_window: Default::default(), } } @@ -91,6 +95,10 @@ impl LevelMetas { self.levels.len() } + pub fn compaction_time_window(&self) -> Option { + self.compaction_time_window + } + #[inline] pub fn level(&self, level: Level) -> &LevelMeta { &self.levels[level as usize] @@ -104,6 +112,7 @@ impl LevelMetas { &self, files_to_add: impl Iterator, files_to_remove: impl Iterator, + compaction_time_window: Option, ) -> LevelMetas { let mut merged = self.clone(); for file in files_to_add { @@ -118,6 +127,11 @@ impl LevelMetas { removed_file.mark_deleted(); } } + // we only update region's compaction time window iff region's window is not set and VersionEdit's + // compaction time window is present. + if let Some(window) = compaction_time_window { + merged.compaction_time_window.get_or_insert(window); + } merged } @@ -726,6 +740,7 @@ mod tests { ] .into_iter(), vec![].into_iter(), + None, ); assert_eq!( @@ -740,6 +755,7 @@ mod tests { ] .into_iter(), vec![].into_iter(), + None, ); assert_eq!( HashSet::from([file_ids[0], file_ids[1]]), @@ -758,6 +774,7 @@ mod tests { create_file_meta(file_ids[2], 0), ] .into_iter(), + None, ); assert_eq!( HashSet::from([file_ids[1]]), @@ -776,6 +793,7 @@ mod tests { create_file_meta(file_ids[3], 1), ] .into_iter(), + None, ); assert_eq!( HashSet::from([file_ids[1]]), diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index a9db4f8f2b..4fc1597b01 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -95,6 +95,7 @@ pub async fn new_store_config_with_object_store( ..Default::default() }; let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); // We use an empty region map so actually the background worker of the picker is disabled. let regions = Arc::new(RegionMap::new()); @@ -123,7 +124,6 @@ pub async fn new_store_config_with_object_store( engine_config: Arc::new(engine_config), file_purger, ttl: None, - compaction_time_window: None, write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize, }, regions, diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index 34ea83ba56..23e92e6532 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -89,7 +89,6 @@ impl RegionDescBuilder { row_key: self.key_builder.build().unwrap(), default_cf: self.default_cf_builder.build().unwrap(), extra_cfs: Vec::new(), - compaction_time_window: None, } } diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 1be8fffa34..d5d3d4bf3e 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -134,6 +134,7 @@ pub struct VersionEdit { pub flushed_sequence: Option, pub manifest_version: ManifestVersion, pub max_memtable_id: Option, + pub compaction_time_window: Option, } pub type VersionControlRef = Arc; @@ -235,7 +236,7 @@ impl Version { ) { self.flushed_sequence = flushed_sequence.unwrap_or(self.flushed_sequence); self.manifest_version = manifest_version; - let ssts = self.ssts.merge(files, std::iter::empty()); + let ssts = self.ssts.merge(files, std::iter::empty(), None); info!( "After applying checkpoint, region: {}, id: {}, flushed_sequence: {}, manifest_version: {}", self.metadata.name(), @@ -264,15 +265,17 @@ impl Version { } let handles_to_add = edit.files_to_add.into_iter(); - let merged_ssts = self - .ssts - .merge(handles_to_add, edit.files_to_remove.into_iter()); + let merged_ssts = self.ssts.merge( + handles_to_add, + edit.files_to_remove.into_iter(), + edit.compaction_time_window, + ); debug!( "After applying edit, region: {}, id: {}, SST files: {:?}", self.metadata.name(), self.metadata.id(), - merged_ssts + merged_ssts, ); self.ssts = Arc::new(merged_ssts); } diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 56d4d78971..38598b271b 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -144,8 +144,6 @@ pub struct RegionDescriptor { /// Extra column families defined by user. #[builder(default, setter(each(name = "push_extra_column_family")))] pub extra_cfs: Vec, - /// Time window for compaction - pub compaction_time_window: Option, } impl RowKeyDescriptorBuilder { diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index c1d5337de3..99a99421a1 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -92,7 +92,6 @@ pub struct CreateOptions { pub write_buffer_size: Option, /// Region SST files TTL pub ttl: Option, - pub compaction_time_window: Option, } /// Options to open a region. @@ -104,7 +103,6 @@ pub struct OpenOptions { pub write_buffer_size: Option, /// Region SST files TTL pub ttl: Option, - pub compaction_time_window: Option, } /// Options to close a region. diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 159f7b983a..dbb495a576 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -170,7 +170,6 @@ mod tests { .name("test") .row_key(row_key) .default_cf(default_cf) - .compaction_time_window(Some(1677652502)) .build() .unwrap() } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 1519581464..c71cb7382b 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -77,14 +77,11 @@ pub struct TableOptions { pub ttl: Option, /// Extra options that may not applicable to all table engines. pub extra_options: HashMap, - /// Time window for compaction - pub compaction_time_window: Option, } pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size"; pub const TTL_KEY: &str = "ttl"; pub const REGIONS_KEY: &str = "regions"; -pub const COMPACTION_TIME_WINDOW_KEY: &str = "compaction_time_window"; impl TryFrom<&HashMap> for TableOptions { type Error = error::Error; @@ -115,24 +112,8 @@ impl TryFrom<&HashMap> for TableOptions { .into(); options.ttl = Some(ttl_value); } - if let Some(compaction_time_window) = value.get(COMPACTION_TIME_WINDOW_KEY) { - options.compaction_time_window = match compaction_time_window.parse::() { - Ok(t) => Some(t), - Err(_) => { - return ParseTableOptionSnafu { - key: COMPACTION_TIME_WINDOW_KEY, - value: compaction_time_window, - } - .fail() - } - }; - } options.extra_options = HashMap::from_iter(value.iter().filter_map(|(k, v)| { - if k != WRITE_BUFFER_SIZE_KEY - && k != REGIONS_KEY - && k != TTL_KEY - && k != COMPACTION_TIME_WINDOW_KEY - { + if k != WRITE_BUFFER_SIZE_KEY && k != REGIONS_KEY && k != TTL_KEY { Some((k.clone(), v.clone())) } else { None @@ -155,12 +136,6 @@ impl From<&TableOptions> for HashMap { let ttl_str = humantime::format_duration(ttl).to_string(); res.insert(TTL_KEY.to_string(), ttl_str); } - if let Some(compaction_time_window) = opts.compaction_time_window { - res.insert( - COMPACTION_TIME_WINDOW_KEY.to_string(), - compaction_time_window.to_string(), - ); - } res.extend( opts.extra_options .iter() @@ -328,7 +303,6 @@ mod tests { write_buffer_size: None, ttl: Some(Duration::from_secs(1000)), extra_options: HashMap::new(), - compaction_time_window: Some(1677652502), }; let serialized = serde_json::to_string(&options).unwrap(); let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap(); @@ -341,7 +315,6 @@ mod tests { write_buffer_size: Some(ReadableSize::mb(128)), ttl: Some(Duration::from_secs(1000)), extra_options: HashMap::new(), - compaction_time_window: Some(1677652502), }; let serialized_map = HashMap::from(&options); let serialized = TableOptions::try_from(&serialized_map).unwrap(); @@ -351,7 +324,6 @@ mod tests { write_buffer_size: None, ttl: None, extra_options: HashMap::new(), - compaction_time_window: None, }; let serialized_map = HashMap::from(&options); let serialized = TableOptions::try_from(&serialized_map).unwrap(); @@ -361,7 +333,6 @@ mod tests { write_buffer_size: Some(ReadableSize::mb(128)), ttl: Some(Duration::from_secs(1000)), extra_options: HashMap::from([("a".to_string(), "A".to_string())]), - compaction_time_window: Some(1677652502), }; let serialized_map = HashMap::from(&options); let serialized = TableOptions::try_from(&serialized_map).unwrap();