From 4c28afd16fb1f57bd07d680197cb0d7db8bc6e8c Mon Sep 17 00:00:00 2001 From: raphaelroshan <49832307+raphaelroshan@users.noreply.github.com> Date: Tue, 30 Jun 2026 15:40:54 +0800 Subject: [PATCH] feat: support table-level auto_flush_interval (#8357) Make auto_flush_interval configurable per table (region) in addition to the global Mito engine config, mirroring how ttl supports a global default with a per-table override. - Add auto_flush_interval to RegionOptions, parsed from the table option with the same humantime format as the global config; reject a non-positive value. - The periodic flush logic resolves the effective interval per region, falling back to the global config when unset, using a saturating conversion to avoid overflow on extreme values. - Accept the option key in is_mito_engine_option_key so it can be set at CREATE TABLE via WITH ('auto_flush_interval' = '5m'). Ref #8340 Signed-off-by: raphaelroshan --- src/mito2/src/compaction/window.rs | 1 + src/mito2/src/region/options.rs | 45 ++++++++++++++++++++++++ src/mito2/src/worker/handle_flush.rs | 23 ++++++++++-- src/store-api/src/mito_engine_options.rs | 4 +++ 4 files changed, 71 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 34d7363312..49e43f41fd 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -243,6 +243,7 @@ mod tests { ssts: Arc::new(ssts), options: RegionOptions { ttl: ttl.map(|t| t.into()), + auto_flush_interval: None, compaction: Default::default(), compaction_override: false, storage: None, diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index cd37e682d3..92dfd460fe 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -82,6 +82,10 @@ pub enum MergeMode { pub struct RegionOptions { /// Region SST files TTL. pub ttl: Option, + /// Per-region auto flush interval override. Falls back to the global + /// `auto_flush_interval` engine config when unset. + #[serde(with = "humantime_serde")] + pub auto_flush_interval: Option, /// Compaction options. pub compaction: CompactionOptions, pub compaction_override: bool, @@ -117,6 +121,14 @@ impl RegionOptions { } ); } + if let Some(auto_flush_interval) = self.auto_flush_interval { + ensure!( + auto_flush_interval > Duration::ZERO, + InvalidRegionOptionsSnafu { + reason: "auto_flush_interval must be greater than 0", + } + ); + } Ok(()) } @@ -130,6 +142,11 @@ impl RegionOptions { self.merge_mode.unwrap_or_default() } + /// Returns the `auto_flush_interval` if it is set, otherwise returns `default`. + pub fn auto_flush_interval_or(&self, default: Duration) -> Duration { + self.auto_flush_interval.unwrap_or(default) + } + /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`]. pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding { self.primary_key_encoding.unwrap_or_default() @@ -228,6 +245,7 @@ impl RegionOptions { let opts = RegionOptions { ttl: options.ttl, + auto_flush_interval: options.auto_flush_interval, compaction, compaction_override, storage: options.storage, @@ -338,6 +356,8 @@ impl Default for TwcsOptions { struct RegionOptionsWithoutEnum { /// Region SST files TTL. ttl: Option, + #[serde(with = "humantime_serde")] + auto_flush_interval: Option, storage: Option, #[serde_as(as = "DisplayFromStr")] append_mode: bool, @@ -352,6 +372,7 @@ impl Default for RegionOptionsWithoutEnum { let options = RegionOptions::default(); RegionOptionsWithoutEnum { ttl: options.ttl, + auto_flush_interval: options.auto_flush_interval, storage: options.storage, append_mode: options.append_mode, merge_mode: options.merge_mode, @@ -527,6 +548,27 @@ mod tests { assert_eq!(expect, options); } + #[test] + fn test_with_auto_flush_interval() { + let map = make_map(&[("auto_flush_interval", "5m")]); + let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap(); + let expect = RegionOptions { + auto_flush_interval: Some(Duration::from_secs(5 * 60)), + ..Default::default() + }; + assert_eq!(expect, options); + } + + #[test] + fn test_with_zero_auto_flush_interval() { + let map = make_map(&[("auto_flush_interval", "0s")]); + let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err(); + assert!( + err.to_string().contains("auto_flush_interval"), + "unexpected error: {err}" + ); + } + #[test] fn test_with_storage() { let map = make_map(&[("storage", "S3")]); @@ -829,6 +871,7 @@ mod tests { let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap(); let expect = RegionOptions { ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), + auto_flush_interval: None, compaction: CompactionOptions::Twcs(TwcsOptions { trigger_file_num: 8, time_window: Some(Duration::from_secs(3600 * 2)), @@ -863,6 +906,7 @@ mod tests { fn test_region_options_serde() { let options = RegionOptions { ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), + auto_flush_interval: None, compaction: CompactionOptions::Twcs(TwcsOptions { trigger_file_num: 8, time_window: Some(Duration::from_secs(3600 * 2)), @@ -919,6 +963,7 @@ mod tests { let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap(); let options = RegionOptions { ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), + auto_flush_interval: None, compaction: CompactionOptions::Twcs(TwcsOptions { trigger_file_num: 8, time_window: Some(Duration::from_secs(3600 * 2)), diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 00aa53b367..ef7d9da9e2 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -76,7 +76,6 @@ impl RegionWorkerLoop { fn flush_regions_on_engine_full(&mut self) -> Result<()> { let regions = self.regions.list_regions(); let now = self.time_provider.current_time_millis(); - let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; let mut pending_regions = vec![]; for region in ®ions { @@ -89,6 +88,16 @@ impl RegionWorkerLoop { let region_memtable_size = version.memtables.mutable_usage() + version.memtables.immutables_usage(); + let auto_flush_interval = version + .options + .auto_flush_interval_or(self.config.auto_flush_interval); + let min_last_flush_time = now.saturating_sub( + auto_flush_interval + .as_millis() + .try_into() + .unwrap_or(i64::MAX), + ); + if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. let task = @@ -207,7 +216,6 @@ impl RegionWorkerLoop { pub(crate) fn flush_periodically(&mut self) -> Result<()> { let regions = self.regions.list_regions(); let now = self.time_provider.current_time_millis(); - let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; for region in ®ions { if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() { @@ -216,6 +224,17 @@ impl RegionWorkerLoop { } self.update_topic_latest_entry_id(region); + let auto_flush_interval = region + .version() + .options + .auto_flush_interval_or(self.config.auto_flush_interval); + let min_last_flush_time = now.saturating_sub( + auto_flush_interval + .as_millis() + .try_into() + .unwrap_or(i64::MAX), + ); + if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. let task = self.new_flush_task( diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 8d51a6a4a6..8762ed6977 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -23,6 +23,8 @@ pub const APPEND_MODE_KEY: &str = "append_mode"; pub const MERGE_MODE_KEY: &str = "merge_mode"; /// Option key for TTL(time-to-live) pub const TTL_KEY: &str = "ttl"; +/// Option key for the per-table auto flush interval. +pub const AUTO_FLUSH_INTERVAL_KEY: &str = "auto_flush_interval"; /// Option key for snapshot read. pub const SNAPSHOT_READ: &str = "snapshot_read"; /// Option key for compaction type. @@ -70,6 +72,7 @@ pub const SST_FORMAT_KEY: &str = "sst_format"; pub fn is_mito_engine_option_key(key: &str) -> bool { [ "ttl", + AUTO_FLUSH_INTERVAL_KEY, COMPACTION_TYPE, COMPACTION_OVERRIDE, TWCS_TRIGGER_FILE_NUM, @@ -104,6 +107,7 @@ mod tests { #[test] fn test_is_mito_engine_option_key() { assert!(is_mito_engine_option_key("ttl")); + assert!(is_mito_engine_option_key("auto_flush_interval")); assert!(is_mito_engine_option_key("compaction.type")); assert!(is_mito_engine_option_key("compaction.override")); assert!(is_mito_engine_option_key(