feat: support table-level auto_flush_interval (#8357)

Make auto_flush_interval configurable per table (region) in addition to
the global Mito engine config, mirroring how ttl supports a global default
with a per-table override.

- Add auto_flush_interval to RegionOptions, parsed from the table option
  with the same humantime format as the global config; reject a
  non-positive value.
- The periodic flush logic resolves the effective interval per region,
  falling back to the global config when unset, using a saturating
  conversion to avoid overflow on extreme values.
- Accept the option key in is_mito_engine_option_key so it can be set at
  CREATE TABLE via WITH ('auto_flush_interval' = '5m').

Ref #8340

Signed-off-by: raphaelroshan <raphaelroshan@gmail.com>
This commit is contained in:
raphaelroshan
2026-06-30 15:40:54 +08:00
committed by GitHub
parent aa563628fb
commit 4c28afd16f
4 changed files with 71 additions and 2 deletions

View File

@@ -243,6 +243,7 @@ mod tests {
ssts: Arc::new(ssts),
options: RegionOptions {
ttl: ttl.map(|t| t.into()),
auto_flush_interval: None,
compaction: Default::default(),
compaction_override: false,
storage: None,

View File

@@ -82,6 +82,10 @@ pub enum MergeMode {
pub struct RegionOptions {
/// Region SST files TTL.
pub ttl: Option<TimeToLive>,
/// Per-region auto flush interval override. Falls back to the global
/// `auto_flush_interval` engine config when unset.
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Option<Duration>,
/// Compaction options.
pub compaction: CompactionOptions,
pub compaction_override: bool,
@@ -117,6 +121,14 @@ impl RegionOptions {
}
);
}
if let Some(auto_flush_interval) = self.auto_flush_interval {
ensure!(
auto_flush_interval > Duration::ZERO,
InvalidRegionOptionsSnafu {
reason: "auto_flush_interval must be greater than 0",
}
);
}
Ok(())
}
@@ -130,6 +142,11 @@ impl RegionOptions {
self.merge_mode.unwrap_or_default()
}
/// Returns the `auto_flush_interval` if it is set, otherwise returns `default`.
pub fn auto_flush_interval_or(&self, default: Duration) -> Duration {
self.auto_flush_interval.unwrap_or(default)
}
/// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
self.primary_key_encoding.unwrap_or_default()
@@ -228,6 +245,7 @@ impl RegionOptions {
let opts = RegionOptions {
ttl: options.ttl,
auto_flush_interval: options.auto_flush_interval,
compaction,
compaction_override,
storage: options.storage,
@@ -338,6 +356,8 @@ impl Default for TwcsOptions {
struct RegionOptionsWithoutEnum {
/// Region SST files TTL.
ttl: Option<TimeToLive>,
#[serde(with = "humantime_serde")]
auto_flush_interval: Option<Duration>,
storage: Option<String>,
#[serde_as(as = "DisplayFromStr")]
append_mode: bool,
@@ -352,6 +372,7 @@ impl Default for RegionOptionsWithoutEnum {
let options = RegionOptions::default();
RegionOptionsWithoutEnum {
ttl: options.ttl,
auto_flush_interval: options.auto_flush_interval,
storage: options.storage,
append_mode: options.append_mode,
merge_mode: options.merge_mode,
@@ -527,6 +548,27 @@ mod tests {
assert_eq!(expect, options);
}
#[test]
fn test_with_auto_flush_interval() {
let map = make_map(&[("auto_flush_interval", "5m")]);
let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
let expect = RegionOptions {
auto_flush_interval: Some(Duration::from_secs(5 * 60)),
..Default::default()
};
assert_eq!(expect, options);
}
#[test]
fn test_with_zero_auto_flush_interval() {
let map = make_map(&[("auto_flush_interval", "0s")]);
let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
assert!(
err.to_string().contains("auto_flush_interval"),
"unexpected error: {err}"
);
}
#[test]
fn test_with_storage() {
let map = make_map(&[("storage", "S3")]);
@@ -829,6 +871,7 @@ mod tests {
let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
auto_flush_interval: None,
compaction: CompactionOptions::Twcs(TwcsOptions {
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
@@ -863,6 +906,7 @@ mod tests {
fn test_region_options_serde() {
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
auto_flush_interval: None,
compaction: CompactionOptions::Twcs(TwcsOptions {
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
@@ -919,6 +963,7 @@ mod tests {
let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
auto_flush_interval: None,
compaction: CompactionOptions::Twcs(TwcsOptions {
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),

View File

@@ -76,7 +76,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
fn flush_regions_on_engine_full(&mut self) -> Result<()> {
let regions = self.regions.list_regions();
let now = self.time_provider.current_time_millis();
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
let mut pending_regions = vec![];
for region in &regions {
@@ -89,6 +88,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let region_memtable_size =
version.memtables.mutable_usage() + version.memtables.immutables_usage();
let auto_flush_interval = version
.options
.auto_flush_interval_or(self.config.auto_flush_interval);
let min_last_flush_time = now.saturating_sub(
auto_flush_interval
.as_millis()
.try_into()
.unwrap_or(i64::MAX),
);
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task =
@@ -207,7 +216,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) fn flush_periodically(&mut self) -> Result<()> {
let regions = self.regions.list_regions();
let now = self.time_provider.current_time_millis();
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
for region in &regions {
if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
@@ -216,6 +224,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
self.update_topic_latest_entry_id(region);
let auto_flush_interval = region
.version()
.options
.auto_flush_interval_or(self.config.auto_flush_interval);
let min_last_flush_time = now.saturating_sub(
auto_flush_interval
.as_millis()
.try_into()
.unwrap_or(i64::MAX),
);
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task = self.new_flush_task(

View File

@@ -23,6 +23,8 @@ pub const APPEND_MODE_KEY: &str = "append_mode";
pub const MERGE_MODE_KEY: &str = "merge_mode";
/// Option key for TTL(time-to-live)
pub const TTL_KEY: &str = "ttl";
/// Option key for the per-table auto flush interval.
pub const AUTO_FLUSH_INTERVAL_KEY: &str = "auto_flush_interval";
/// Option key for snapshot read.
pub const SNAPSHOT_READ: &str = "snapshot_read";
/// Option key for compaction type.
@@ -70,6 +72,7 @@ pub const SST_FORMAT_KEY: &str = "sst_format";
pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
AUTO_FLUSH_INTERVAL_KEY,
COMPACTION_TYPE,
COMPACTION_OVERRIDE,
TWCS_TRIGGER_FILE_NUM,
@@ -104,6 +107,7 @@ mod tests {
#[test]
fn test_is_mito_engine_option_key() {
assert!(is_mito_engine_option_key("ttl"));
assert!(is_mito_engine_option_key("auto_flush_interval"));
assert!(is_mito_engine_option_key("compaction.type"));
assert!(is_mito_engine_option_key("compaction.override"));
assert!(is_mito_engine_option_key(