From 796aae3d9fca58abf1d4dbdc5fbb68f3f28f5468 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 7 May 2026 15:21:37 +0800 Subject: [PATCH] feat(operator): allow last_row merge mode with append mode (#8065) * feat(operator): allow last_row merge_mode when append_mode is enabled - Update RegionOptions::validate to allow last_row merge_mode with append_mode. - Update fill_table_options_for_create to automatically set merge_mode to last_row when append_mode is enabled for LastNonNull table type. - Add unit tests in mito2 and operator to verify options validation and table creation. - Add integration test for InfluxDB write with append mode hint. Signed-off-by: Lei, HUANG * fix(operator): simplify append mode options Group `LastNonNull` auto-create options in a single append-mode branch. Files: - `src/operator/src/insert.rs` Signed-off-by: Lei, HUANG * fix: sqlness Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/mito2/src/region/options.rs | 17 ++++- src/operator/src/insert.rs | 62 ++++++++++++++++++- tests-integration/tests/http.rs | 37 +++++++++++ .../common/insert/merge_mode.result | 2 +- 4 files changed, 114 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index fcf68a9216..7827800295 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -96,9 +96,10 @@ impl RegionOptions { pub fn validate(&self) -> Result<()> { if self.append_mode { ensure!( - self.merge_mode.is_none(), + self.merge_mode + .is_none_or(|mode| mode == MergeMode::LastRow), InvalidRegionOptionsSnafu { - reason: "merge_mode is not allowed when append_mode is enabled", + reason: "only last_row merge_mode is allowed when append_mode is enabled", } ); } @@ -622,6 +623,18 @@ mod tests { assert_eq!(StatusCode::InvalidArguments, err.status_code()); } + #[test] + fn test_append_mode_allows_last_row_merge_mode() { + let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_row")]); + let options = RegionOptions::try_from(&map).unwrap(); + assert!(options.append_mode); + assert_eq!(MergeMode::LastRow, options.merge_mode()); + + let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_non_null")]); + let err = RegionOptions::try_from(&map).unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } + #[test] fn test_with_all() { let wal_options = WalOptions::Kafka(KafkaWalOptions { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 78ad9e4339..81df09e4bc 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -1090,7 +1090,15 @@ pub fn fill_table_options_for_create( table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string()); } AutoCreateTableType::LastNonNull => { - table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string()); + if ctx + .extension(APPEND_MODE_KEY) + .is_some_and(|value| value.eq_ignore_ascii_case("true")) + { + table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string()); + table_options.insert(MERGE_MODE_KEY.to_string(), "last_row".to_string()); + } else { + table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string()); + } } AutoCreateTableType::Trace => { table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string()); @@ -1334,4 +1342,56 @@ mod tests { assert_eq!(req_schema[0].column_name, ts_name); assert_eq!(req_schema[1].column_name, field_name); } + + #[test] + fn test_last_non_null_create_options_preserve_default_without_append_mode() { + let ctx = Arc::new(QueryContext::with( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + )); + let mut table_options = Default::default(); + + fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx); + + assert_eq!( + Some("last_non_null"), + table_options.get(MERGE_MODE_KEY).map(String::as_str) + ); + assert!(!table_options.contains_key(APPEND_MODE_KEY)); + } + + #[test] + fn test_last_non_null_create_options_preserve_default_with_append_mode_false() { + let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + ctx.set_extension(APPEND_MODE_KEY, "false"); + let ctx = Arc::new(ctx); + let mut table_options = Default::default(); + + fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx); + + assert!(!table_options.contains_key(APPEND_MODE_KEY)); + assert_eq!( + Some("last_non_null"), + table_options.get(MERGE_MODE_KEY).map(String::as_str) + ); + } + + #[test] + fn test_last_non_null_create_options_use_last_row_with_append_mode_true() { + let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + ctx.set_extension(APPEND_MODE_KEY, "true"); + let ctx = Arc::new(ctx); + let mut table_options = Default::default(); + + fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx); + + assert_eq!( + Some("true"), + table_options.get(APPEND_MODE_KEY).map(String::as_str) + ); + assert_eq!( + Some("last_row"), + table_options.get(MERGE_MODE_KEY).map(String::as_str) + ); + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 189ca8a40a..e6c7aefe26 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -153,6 +153,7 @@ macro_rules! http_tests { test_influxdb_write, test_influxdb_write_with_hints, + test_influxdb_write_with_append_mode_hint, test_http_memory_limit, ); )* @@ -3775,6 +3776,42 @@ pub async fn test_influxdb_write_with_hints(storage_type: StorageType) { guard.remove_all().await; } +pub async fn test_influxdb_write_with_append_mode_hint(storage_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend( + storage_type, + "test_influxdb_write_with_append_mode_hint", + ) + .await; + + let client = TestClient::new(app).await; + + let result = client + .post("/v1/influxdb/write?db=public") + .header("x-greptime-hint-append_mode", "true") + .body("append_mode_table,host=host1 cpu=1.2 1664370459457010101") + .send() + .await; + assert_eq!(result.status(), 204); + + let res = client + .get("/v1/sql?sql=show create table append_mode_table") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let resp = res.text().await; + assert!( + resp.contains("append_mode = 'true'"), + "expected append_mode = 'true' in SHOW CREATE TABLE output, got: {resp}" + ); + assert!( + resp.contains("merge_mode = 'last_row'"), + "expected merge_mode = 'last_row' in SHOW CREATE TABLE output, got: {resp}" + ); + + guard.remove_all().await; +} + /// Test one-to-many VRL pipeline expansion. /// This test verifies that a VRL processor can return an array, which results in /// multiple output rows from a single input row. diff --git a/tests/cases/standalone/common/insert/merge_mode.result b/tests/cases/standalone/common/insert/merge_mode.result index a98f6b6e38..a5e03ae8b9 100644 --- a/tests/cases/standalone/common/insert/merge_mode.result +++ b/tests/cases/standalone/common/insert/merge_mode.result @@ -181,5 +181,5 @@ create table if not exists invalid_merge_mode( engine=mito with('merge_mode'='last_non_null', 'append_mode'='true'); -Error: 1004(InvalidArguments), Invalid region options, merge_mode is not allowed when append_mode is enabled +Error: 1004(InvalidArguments), Invalid region options, only last_row merge_mode is allowed when append_mode is enabled