From 42ad842434c847a12e0222b7a241dc33a2d70203 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 25 Feb 2026 12:11:23 +0800 Subject: [PATCH] feat: support changing table's append_mode to true (#7669) * feat: support alter append_mode to true Signed-off-by: evenyag * test: add sqlness test Signed-off-by: evenyag * chore: remove comment Signed-off-by: evenyag * chore: fix compiler errors Signed-off-by: evenyag * fix: clear merge mode in mito when setting append mode Signed-off-by: evenyag * fix: sanitize open request and options with both append/merge mode Signed-off-by: evenyag * feat: clear merge mode when append mode is true Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/engine/alter_test.rs | 281 ++++++++++++++++++ src/mito2/src/engine/append_mode_test.rs | 147 ++++++++- .../src/engine/apply_staging_manifest_test.rs | 1 + src/mito2/src/engine/staging_test.rs | 1 + src/mito2/src/manifest/action.rs | 15 + src/mito2/src/manifest/manager.rs | 6 +- src/mito2/src/region.rs | 4 + src/mito2/src/region/opener.rs | 30 ++ src/mito2/src/remap_manifest.rs | 2 + src/mito2/src/worker/handle_alter.rs | 31 +- src/mito2/src/worker/handle_open.rs | 5 +- src/store-api/src/region_request.rs | 11 +- src/table/src/metadata.rs | 91 +++++- .../common/alter/alter_append_mode.result | 192 ++++++++++++ .../common/alter/alter_append_mode.sql | 84 ++++++ 15 files changed, 890 insertions(+), 11 deletions(-) create mode 100644 tests/cases/standalone/common/alter/alter_append_mode.result create mode 100644 tests/cases/standalone/common/alter/alter_append_mode.sql diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 2aa26ba204..e710e08688 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -39,6 +39,7 @@ use crate::engine::MitoEngine; use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener}; use crate::error; use crate::sst::FormatType; +use crate::test_util::batch_util::sort_batches_and_print; use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, @@ -1229,3 +1230,283 @@ async fn test_alter_region_sst_format_without_flush() { let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected_all_data, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_alter_region_append_mode_with_flush() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // Create a region with append_mode=false (default) + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + let table_dir = request.table_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let check_append_mode = |engine: &MitoEngine, expected: bool| { + let append_mode = engine + .get_region(region_id) + .unwrap() + .version() + .options + .append_mode; + assert_eq!(append_mode, expected); + }; + check_append_mode(&engine, false); + + // Inserts some data before alter (memtable not empty, alter will trigger flush) + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Alters append_mode from false to true (this triggers internal flush) + let alter_request = RegionAlterRequest { + kind: AlterKind::SetRegionOptions { + options: vec![SetRegionOption::AppendMode(true)], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(alter_request)) + .await + .unwrap(); + + check_append_mode(&engine, true); + + // Inserts duplicate data after alter (same as rows 0, 1, 2) + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flushes again + flush_region(&engine, region_id, None).await; + + // After append_mode=true, duplicates should be preserved + let expected_all_data = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + expected_all_data, + sort_batches_and_print(&batches, &["tag_0", "ts"]) + ); + + // Reopens region to verify append_mode persists + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + table_dir, + path_type: PathType::Bare, + options: HashMap::default(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + + check_append_mode(&engine, true); + + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + expected_all_data, + sort_batches_and_print(&batches, &["tag_0", "ts"]) + ); +} + +#[tokio::test] +async fn test_alter_region_append_mode_without_flush() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // Create a region with append_mode=false (default) + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + let table_dir = request.table_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let check_append_mode = |engine: &MitoEngine, expected: bool| { + let append_mode = engine + .get_region(region_id) + .unwrap() + .version() + .options + .append_mode; + assert_eq!(append_mode, expected); + }; + check_append_mode(&engine, false); + + // Alters append_mode from false to true immediately (no data, no flush needed) + let alter_request = RegionAlterRequest { + kind: AlterKind::SetRegionOptions { + options: vec![SetRegionOption::AppendMode(true)], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(alter_request)) + .await + .unwrap(); + + check_append_mode(&engine, true); + + // Inserts duplicate data + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Insert same data again + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flushes + flush_region(&engine, region_id, None).await; + + // Duplicates should be preserved + let expected_all_data = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + expected_all_data, + sort_batches_and_print(&batches, &["tag_0", "ts"]) + ); + + // Reopens region to verify append_mode persists + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + table_dir, + path_type: PathType::Bare, + options: HashMap::default(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + + check_append_mode(&engine, true); + + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + expected_all_data, + sort_batches_and_print(&batches, &["tag_0", "ts"]) + ); +} + +#[tokio::test] +async fn test_alter_region_append_mode_invalid() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // Create a region with append_mode=true + let request = CreateRequestBuilder::new() + .insert_option("append_mode", "true") + .build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let check_append_mode = |engine: &MitoEngine, expected: bool| { + let append_mode = engine + .get_region(region_id) + .unwrap() + .version() + .options + .append_mode; + assert_eq!(append_mode, expected); + }; + check_append_mode(&engine, true); + + // Try to alter append_mode from true to false (should fail) + let alter_request = RegionAlterRequest { + kind: AlterKind::SetRegionOptions { + options: vec![SetRegionOption::AppendMode(false)], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(alter_request)) + .await + .unwrap_err(); + + // append_mode should still be true + check_append_mode(&engine, true); +} diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 85d4f24fe3..61488b6592 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -14,10 +14,15 @@ //! Tests for append mode. +use std::collections::HashMap; + use api::v1::Rows; use common_recordbatch::RecordBatches; use store_api::region_engine::RegionEngine; -use store_api::region_request::{RegionCompactRequest, RegionRequest}; +use store_api::region_request::{ + AlterKind, PathType, RegionAlterRequest, RegionCompactRequest, RegionOpenRequest, + RegionRequest, SetRegionOption, +}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; @@ -221,6 +226,146 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) { assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); } +#[tokio::test] +async fn test_alter_append_mode_clears_merge_mode() { + test_alter_append_mode_clears_merge_mode_with_format(false).await; + test_alter_append_mode_clears_merge_mode_with_format(true).await; +} + +async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + // Create a region with merge_mode=last_non_null. + let request = CreateRequestBuilder::new() + .insert_option("merge_mode", "last_non_null") + .build(); + let column_schemas = rows_schema(&request); + let table_dir = request.table_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Verify initial options. + let options = &engine.get_region(region_id).unwrap().version().options; + assert!(!options.append_mode); + assert!(options.merge_mode.is_some()); + + // Insert some data. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Alter append_mode to true (triggers flush since memtable is not empty). + let alter_request = RegionAlterRequest { + kind: AlterKind::SetRegionOptions { + options: vec![SetRegionOption::AppendMode(true)], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(alter_request)) + .await + .unwrap(); + + // Verify append_mode is true and merge_mode is cleared. + let options = &engine.get_region(region_id).unwrap().version().options; + assert!(options.append_mode); + assert!(options.merge_mode.is_none()); + + // Insert duplicate data (should be preserved in append mode). + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flush to persist. + flush_region(&engine, region_id, None).await; + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + + let stream = engine + .scan_to_stream(region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); + + // Reopen engine and region to verify persistence. + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; + let mut options = HashMap::default(); + options.insert("append_mode".to_string(), "true".to_string()); + options.insert("merge_mode".to_string(), "last_non_null".to_string()); + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + table_dir, + path_type: PathType::Bare, + options, + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + + // Verify options persist after reopen. + let options = &engine.get_region(region_id).unwrap().version().options; + assert!(options.append_mode); + assert!(options.merge_mode.is_none()); + + // Verify data persists (duplicates preserved). + let stream = engine + .scan_to_stream(region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); +} + #[tokio::test] async fn test_put_single_range() { test_put_single_range_with_format(false).await; diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs index bb2af0bec4..cf569f6fb4 100644 --- a/src/mito2/src/engine/apply_staging_manifest_test.rs +++ b/src/mito2/src/engine/apply_staging_manifest_test.rs @@ -543,6 +543,7 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo RegionMetaAction::Change(RegionChange { metadata: Arc::new(changed_metadata), sst_format: FormatType::PrimaryKey, + append_mode: None, }), RegionMetaAction::Edit(RegionEdit { files_to_add: Vec::new(), diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 424e3306cc..a390eea681 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -925,6 +925,7 @@ async fn test_staging_exit_conflict_partition_expr_change_and_change_with_format RegionMetaAction::Change(RegionChange { metadata: Arc::new(changed_metadata), sst_format: FormatType::PrimaryKey, + append_mode: None, }), RegionMetaAction::Edit(RegionEdit { files_to_add: Vec::new(), diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 177bb86873..12781bd1a6 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -78,6 +78,9 @@ pub struct RegionChange { /// Format of the SST. #[serde(default)] pub sst_format: FormatType, + /// Whether the region is in append mode. + #[serde(default)] + pub append_mode: Option, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -160,6 +163,9 @@ pub struct RegionManifest { /// Format of the SST file. #[serde(default)] pub sst_format: FormatType, + /// Whether the region is in append mode. + #[serde(default)] + pub append_mode: Option, } #[cfg(test)] @@ -188,6 +194,7 @@ pub struct RegionManifestBuilder { compaction_time_window: Option, committed_sequence: Option, sst_format: FormatType, + append_mode: Option, } impl RegionManifestBuilder { @@ -205,6 +212,7 @@ impl RegionManifestBuilder { compaction_time_window: s.compaction_time_window, committed_sequence: s.committed_sequence, sst_format: s.sst_format, + append_mode: s.append_mode, } } else { Default::default() @@ -215,6 +223,8 @@ impl RegionManifestBuilder { self.metadata = Some(change.metadata); self.manifest_version = manifest_version; self.sst_format = change.sst_format; + // Only update append_mode if the change specifies a value. + self.append_mode = change.append_mode.or(self.append_mode); } /// Applies a partition-expression-only metadata change. @@ -344,6 +354,7 @@ impl RegionManifestBuilder { truncated_entry_id: self.truncated_entry_id, compaction_time_window: self.compaction_time_window, sst_format: self.sst_format, + append_mode: self.append_mode, }) } } @@ -896,6 +907,7 @@ mod tests { }], }, sst_format: FormatType::PrimaryKey, + append_mode: None, }; let json = serde_json::to_string(&manifest).unwrap(); @@ -998,6 +1010,7 @@ mod tests { truncated_entry_id: None, compaction_time_window: None, sst_format: FormatType::PrimaryKey, + append_mode: None, } ); @@ -1012,6 +1025,7 @@ mod tests { truncated_entry_id: None, compaction_time_window: None, sst_format: FormatType::PrimaryKey, + append_mode: None, }; let json = serde_json::to_string(&new_manifest).unwrap(); let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap(); @@ -1111,6 +1125,7 @@ mod tests { let region_change = RegionChange { metadata: region_change.metadata.clone(), sst_format: FormatType::Flat, + append_mode: None, }; let serialized = serde_json::to_string(®ion_change).unwrap(); diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index f16fd85e0b..e489c6756d 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -194,6 +194,7 @@ impl RegionManifestManager { RegionChange { metadata: metadata.clone(), sst_format, + append_mode: None, }, ); let manifest = manifest_builder.try_build()?; @@ -207,6 +208,7 @@ impl RegionManifestManager { let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata, sst_format, + append_mode: None, })]; if flushed_entry_id > 0 { actions.push(RegionMetaAction::Edit(RegionEdit { @@ -893,6 +895,7 @@ mod test { RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata: new_metadata.clone(), sst_format: FormatType::PrimaryKey, + append_mode: None, })); let current_version = manager.update(action_list, false).await.unwrap(); @@ -956,6 +959,7 @@ mod test { RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata: new_metadata.clone(), sst_format: FormatType::PrimaryKey, + append_mode: None, })); let current_version = manager.update(action_list, false).await.unwrap(); @@ -1005,6 +1009,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1378); + assert_eq!(manifest_size, 1397); } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index a75484256b..156d972497 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -492,6 +492,7 @@ impl MitoRegion { let action = RegionMetaAction::Change(RegionChange { metadata: current_meta.clone(), sst_format: current_version.options.sst_format.unwrap_or_default(), + append_mode: None, }); let result = manager .update(RegionMetaActionList::with_action(action), false) @@ -1518,6 +1519,7 @@ mod tests { RegionMetaAction::Change(RegionChange { metadata: Arc::new(changed_metadata), sst_format: FormatType::PrimaryKey, + append_mode: None, }), RegionMetaAction::Edit(empty_edit()), ]), @@ -1556,6 +1558,7 @@ mod tests { RegionMetaAction::Change(RegionChange { metadata: Arc::new(changed_metadata), sst_format: FormatType::PrimaryKey, + append_mode: None, }), RegionMetaAction::Edit(empty_edit()), ]), @@ -1588,6 +1591,7 @@ mod tests { RegionMetaAction::Change(RegionChange { metadata: Arc::new(changed_metadata), sst_format: FormatType::PrimaryKey, + append_mode: None, }), RegionMetaAction::Edit(empty_edit()), ]), diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ce2d94e829..d30c1f5e10 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -619,6 +619,36 @@ pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut R ); options.sst_format = Some(manifest.sst_format); } + if let Some(manifest_append_mode) = manifest.append_mode + && options.append_mode != manifest_append_mode + { + common_telemetry::warn!( + "Overriding append_mode from {} to {} for region {}", + options.append_mode, + manifest_append_mode, + manifest.metadata.region_id, + ); + options.append_mode = manifest_append_mode; + } + if options.append_mode && options.merge_mode.take().is_some() { + common_telemetry::warn!( + "Ignoring merge_mode because append_mode is enabled for region {}", + manifest.metadata.region_id, + ); + } +} + +/// Sanitizes open request options before parsing. +pub(crate) fn sanitize_open_request_options(options: &mut HashMap) { + let append_mode_enabled = options + .get("append_mode") + .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1")); + + if append_mode_enabled && options.remove("merge_mode").is_some() { + common_telemetry::warn!( + "Ignoring merge_mode in open request options because append_mode is enabled" + ); + } } /// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store. diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index e6a524e8d1..49676be8be 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -115,6 +115,7 @@ impl RemapManifest { truncated_entry_id: None, compaction_time_window: None, sst_format, + append_mode: None, }; new_manifests.insert(*region_id, manifest); @@ -464,6 +465,7 @@ mod tests { compaction_time_window: None, committed_sequence: None, sst_format: FormatType::PrimaryKey, + append_mode: None, } } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 40354ff9d1..459aa8dd32 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -161,13 +161,11 @@ impl RegionWorkerLoop { } }; // Persist the metadata to region's manifest. + let options = new_options.as_ref().unwrap_or(&version.options); let change = RegionChange { metadata: new_meta, - sst_format: new_options - .as_ref() - .unwrap_or(&version.options) - .sst_format - .unwrap_or_default(), + sst_format: options.sst_format.unwrap_or_default(), + append_mode: Some(options.append_mode), }; self.handle_manifest_region_change(region, change, need_index, new_options, sender); } @@ -229,6 +227,22 @@ impl RegionWorkerLoop { ); } } + SetRegionOption::AppendMode(new_append_mode) => { + // If the append mode is unchanged, we consider the option is altered. + if new_append_mode != current_options.append_mode { + // Validates: only allow changing from false to true. + ensure!( + !current_options.append_mode && new_append_mode, + store_api::metadata::InvalidRegionRequestSnafu { + region_id: region.region_id, + err: "Only allow changing append_mode from false to true", + } + ); + // Clear merge_mode since it's incompatible with append_mode. + current_options.merge_mode = None; + all_options_altered = false; + } + } } } region.version_control.alter_options(current_options); @@ -264,6 +278,13 @@ fn new_region_options_on_empty_memtable( current_options.sst_format = Some(new_format); } + SetRegionOption::AppendMode(new_append_mode) => { + // Safety: handle_alter_region_options_fast() has validated this. + assert!(*new_append_mode && !current_options.append_mode); + + current_options.append_mode = true; + current_options.merge_mode = None; + } } } Some(current_options) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index e50e166d47..73bdca775c 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -28,7 +28,7 @@ use table::requests::STORAGE_KEY; use crate::error::{ ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result, }; -use crate::region::opener::RegionOpener; +use crate::region::opener::{RegionOpener, sanitize_open_request_options}; use crate::request::OptionOutputTx; use crate::sst::location::region_dir_from_table_dir; use crate::wal::entry_distributor::WalEntryReceiver; @@ -73,7 +73,7 @@ impl RegionWorkerLoop { pub(crate) async fn handle_open_request( &mut self, region_id: RegionId, - request: RegionOpenRequest, + mut request: RegionOpenRequest, wal_entry_receiver: Option, sender: OptionOutputTx, ) { @@ -92,6 +92,7 @@ impl RegionWorkerLoop { return; } info!("Try to open region {}, worker: {}", region_id, self.id); + sanitize_open_request_options(&mut request.options); // Open region from specific region dir. let opener = match RegionOpener::new( diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 544afdee3f..ace8fd1edd 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -52,7 +52,8 @@ use crate::metadata::{ use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use crate::metrics; use crate::mito_engine_options::{ - SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM, + APPEND_MODE_KEY, SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, + TWCS_TRIGGER_FILE_NUM, }; use crate::path_utils::table_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; @@ -1316,6 +1317,8 @@ pub enum SetRegionOption { Twsc(String, String), // Modifying the SST format. Format(String), + // Modifying the append mode. + AppendMode(bool), } impl TryFrom<&PbOption> for SetRegionOption { @@ -1334,6 +1337,12 @@ impl TryFrom<&PbOption> for SetRegionOption { Ok(Self::Twsc(key.clone(), value.clone())) } SST_FORMAT_KEY => Ok(Self::Format(value.clone())), + APPEND_MODE_KEY => { + let append_mode = value + .parse::() + .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?; + Ok(Self::AppendMode(append_mode)) + } _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(), } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index bb90b82e57..ca8bc30aa1 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -28,7 +28,9 @@ use derive_builder::Builder; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; -use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY}; +use store_api::mito_engine_options::{ + APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, SST_FORMAT_KEY, +}; use store_api::region_request::{SetRegionOption, UnsetRegionOption}; use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId}; @@ -364,6 +366,14 @@ impl TableMeta { .extra_options .insert(SST_FORMAT_KEY.to_string(), value.clone()); } + SetRegionOption::AppendMode(value) => { + new_options + .extra_options + .insert(APPEND_MODE_KEY.to_string(), value.to_string()); + if *value { + new_options.extra_options.remove(MERGE_MODE_KEY); + } + } } } let mut builder = self.new_meta_builder(); @@ -1503,6 +1513,85 @@ mod tests { assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]); } + #[test] + fn test_set_append_mode_true_clears_merge_mode_option() { + let schema = Arc::new(new_test_schema()); + let mut table_options = TableOptions::default(); + table_options + .extra_options + .insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string()); + let meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .options(table_options) + .build() + .unwrap(); + + let alter_kind = AlterKind::SetTableOptions { + options: vec![SetRegionOption::AppendMode(true)], + }; + let new_meta = meta + .builder_with_alter_kind("my_table", &alter_kind) + .unwrap() + .build() + .unwrap(); + + assert_eq!( + Some("true"), + new_meta + .options + .extra_options + .get(APPEND_MODE_KEY) + .map(String::as_str) + ); + assert!(!new_meta.options.extra_options.contains_key(MERGE_MODE_KEY)); + } + + #[test] + fn test_set_append_mode_false_keeps_merge_mode_option() { + let schema = Arc::new(new_test_schema()); + let mut table_options = TableOptions::default(); + table_options + .extra_options + .insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string()); + let meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .options(table_options) + .build() + .unwrap(); + + let alter_kind = AlterKind::SetTableOptions { + options: vec![SetRegionOption::AppendMode(false)], + }; + let new_meta = meta + .builder_with_alter_kind("my_table", &alter_kind) + .unwrap() + .build() + .unwrap(); + + assert_eq!( + Some("false"), + new_meta + .options + .extra_options + .get(APPEND_MODE_KEY) + .map(String::as_str) + ); + assert_eq!( + Some("last_non_null"), + new_meta + .options + .extra_options + .get(MERGE_MODE_KEY) + .map(String::as_str) + ); + } + #[test] fn test_add_columns_multiple_times() { let schema = Arc::new(new_test_schema()); diff --git a/tests/cases/standalone/common/alter/alter_append_mode.result b/tests/cases/standalone/common/alter/alter_append_mode.result new file mode 100644 index 0000000000..12136ca348 --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_append_mode.result @@ -0,0 +1,192 @@ +-- Test altering append_mode from false to true +-- Create a table with append_mode=false (default) +CREATE TABLE test_alter_append_mode( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito; + +Affected Rows: 0 + +-- Insert some data +INSERT INTO test_alter_append_mode VALUES ('host1', 0, 1.0), ('host2', 1, 2.0); + +Affected Rows: 2 + +-- Insert duplicate data (should be deduplicated since append_mode=false) +INSERT INTO test_alter_append_mode VALUES ('host1', 0, 10.0), ('host2', 1, 20.0); + +Affected Rows: 2 + +-- Query should show deduplicated data (latest values) +SELECT * FROM test_alter_append_mode ORDER BY host, ts; + ++-------+-------------------------+------+ +| host | ts | cpu | ++-------+-------------------------+------+ +| host1 | 1970-01-01T00:00:00 | 10.0 | +| host2 | 1970-01-01T00:00:00.001 | 20.0 | ++-------+-------------------------+------+ + +-- Alter append_mode from false to true +ALTER TABLE test_alter_append_mode SET 'append_mode' = 'true'; + +Affected Rows: 0 + +-- Verify append_mode is set via SHOW CREATE TABLE +SHOW CREATE TABLE test_alter_append_mode; + ++------------------------+-------------------------------------------------------+ +| Table | Create Table | ++------------------------+-------------------------------------------------------+ +| test_alter_append_mode | CREATE TABLE IF NOT EXISTS "test_alter_append_mode" ( | +| | "host" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "cpu" DOUBLE NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | append_mode = 'true' | +| | ) | ++------------------------+-------------------------------------------------------+ + +-- Insert duplicate data again (should be preserved since append_mode=true now) +INSERT INTO test_alter_append_mode VALUES ('host1', 0, 100.0), ('host2', 1, 200.0); + +Affected Rows: 2 + +-- Query should show the new duplicates preserved +SELECT * FROM test_alter_append_mode ORDER BY host, ts, cpu; + ++-------+-------------------------+-------+ +| host | ts | cpu | ++-------+-------------------------+-------+ +| host1 | 1970-01-01T00:00:00 | 10.0 | +| host1 | 1970-01-01T00:00:00 | 100.0 | +| host2 | 1970-01-01T00:00:00.001 | 20.0 | +| host2 | 1970-01-01T00:00:00.001 | 200.0 | ++-------+-------------------------+-------+ + +-- Try to alter append_mode from true to false (should fail) +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE test_alter_append_mode SET 'append_mode' = 'false'; + +Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing append_mode from false to true + +-- Clean up +DROP TABLE test_alter_append_mode; + +Affected Rows: 0 + +-- Test creating with append_mode=true and trying to alter to false +CREATE TABLE test_append_mode_true( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito WITH('append_mode'='true'); + +Affected Rows: 0 + +-- Try to alter append_mode from true to false (should fail) +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE test_append_mode_true SET 'append_mode' = 'false'; + +Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing append_mode from false to true + +-- Altering to the same value should succeed +ALTER TABLE test_append_mode_true SET 'append_mode' = 'true'; + +Affected Rows: 0 + +-- Clean up +DROP TABLE test_append_mode_true; + +Affected Rows: 0 + +-- Test altering append_mode on a table with merge_mode set +CREATE TABLE test_alter_append_with_merge( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito WITH('merge_mode'='last_non_null'); + +Affected Rows: 0 + +-- Verify merge_mode is set +SHOW CREATE TABLE test_alter_append_with_merge; + ++------------------------------+-------------------------------------------------------------+ +| Table | Create Table | ++------------------------------+-------------------------------------------------------------+ +| test_alter_append_with_merge | CREATE TABLE IF NOT EXISTS "test_alter_append_with_merge" ( | +| | "host" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "cpu" DOUBLE NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | merge_mode = 'last_non_null' | +| | ) | ++------------------------------+-------------------------------------------------------------+ + +-- Insert some data +INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 1.0), ('host2', 1, 2.0); + +Affected Rows: 2 + +-- Alter append_mode to true (should succeed and clear merge_mode) +ALTER TABLE test_alter_append_with_merge SET 'append_mode' = 'true'; + +Affected Rows: 0 + +-- Verify merge_mode is cleared and append_mode is set +SHOW CREATE TABLE test_alter_append_with_merge; + ++------------------------------+-------------------------------------------------------------+ +| Table | Create Table | ++------------------------------+-------------------------------------------------------------+ +| test_alter_append_with_merge | CREATE TABLE IF NOT EXISTS "test_alter_append_with_merge" ( | +| | "host" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "cpu" DOUBLE NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | append_mode = 'true' | +| | ) | ++------------------------------+-------------------------------------------------------------+ + +-- Insert duplicate data (should be preserved since append_mode=true now) +INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 100.0), ('host2', 1, 200.0); + +Affected Rows: 2 + +-- Query should show the new duplicates preserved +SELECT * FROM test_alter_append_with_merge ORDER BY host, ts, cpu; + ++-------+-------------------------+-------+ +| host | ts | cpu | ++-------+-------------------------+-------+ +| host1 | 1970-01-01T00:00:00 | 1.0 | +| host1 | 1970-01-01T00:00:00 | 100.0 | +| host2 | 1970-01-01T00:00:00.001 | 2.0 | +| host2 | 1970-01-01T00:00:00.001 | 200.0 | ++-------+-------------------------+-------+ + +-- Clean up +DROP TABLE test_alter_append_with_merge; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/alter_append_mode.sql b/tests/cases/standalone/common/alter/alter_append_mode.sql new file mode 100644 index 0000000000..b57ead737f --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_append_mode.sql @@ -0,0 +1,84 @@ +-- Test altering append_mode from false to true + +-- Create a table with append_mode=false (default) +CREATE TABLE test_alter_append_mode( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito; + +-- Insert some data +INSERT INTO test_alter_append_mode VALUES ('host1', 0, 1.0), ('host2', 1, 2.0); + +-- Insert duplicate data (should be deduplicated since append_mode=false) +INSERT INTO test_alter_append_mode VALUES ('host1', 0, 10.0), ('host2', 1, 20.0); + +-- Query should show deduplicated data (latest values) +SELECT * FROM test_alter_append_mode ORDER BY host, ts; + +-- Alter append_mode from false to true +ALTER TABLE test_alter_append_mode SET 'append_mode' = 'true'; + +-- Verify append_mode is set via SHOW CREATE TABLE +SHOW CREATE TABLE test_alter_append_mode; + +-- Insert duplicate data again (should be preserved since append_mode=true now) +INSERT INTO test_alter_append_mode VALUES ('host1', 0, 100.0), ('host2', 1, 200.0); + +-- Query should show the new duplicates preserved +SELECT * FROM test_alter_append_mode ORDER BY host, ts, cpu; + +-- Try to alter append_mode from true to false (should fail) +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE test_alter_append_mode SET 'append_mode' = 'false'; + +-- Clean up +DROP TABLE test_alter_append_mode; + +-- Test creating with append_mode=true and trying to alter to false +CREATE TABLE test_append_mode_true( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito WITH('append_mode'='true'); + +-- Try to alter append_mode from true to false (should fail) +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE test_append_mode_true SET 'append_mode' = 'false'; + +-- Altering to the same value should succeed +ALTER TABLE test_append_mode_true SET 'append_mode' = 'true'; + +-- Clean up +DROP TABLE test_append_mode_true; + +-- Test altering append_mode on a table with merge_mode set +CREATE TABLE test_alter_append_with_merge( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito WITH('merge_mode'='last_non_null'); + +-- Verify merge_mode is set +SHOW CREATE TABLE test_alter_append_with_merge; + +-- Insert some data +INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 1.0), ('host2', 1, 2.0); + +-- Alter append_mode to true (should succeed and clear merge_mode) +ALTER TABLE test_alter_append_with_merge SET 'append_mode' = 'true'; + +-- Verify merge_mode is cleared and append_mode is set +SHOW CREATE TABLE test_alter_append_with_merge; + +-- Insert duplicate data (should be preserved since append_mode=true now) +INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 100.0), ('host2', 1, 200.0); + +-- Query should show the new duplicates preserved +SELECT * FROM test_alter_append_with_merge ORDER BY host, ts, cpu; + +-- Clean up +DROP TABLE test_alter_append_with_merge;