diff --git a/config/config.md b/config/config.md index 35d78a0788..b1630d97ad 100644 --- a/config/config.md +++ b/config/config.md @@ -65,6 +65,7 @@ | `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. | | `influxdb` | -- | -- | InfluxDB protocol options. | | `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. | +| `influxdb.default_merge_mode` | String | `last_non_null` | Default merge mode for tables automatically created by InfluxDB protocol.
Available values: "last_non_null", "last_row". | | `jaeger` | -- | -- | Jaeger protocol options. | | `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. | | `prom_store` | -- | -- | Prometheus remote storage options | @@ -286,6 +287,7 @@ | `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. | | `influxdb` | -- | -- | InfluxDB protocol options. | | `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. | +| `influxdb.default_merge_mode` | String | `last_non_null` | Default merge mode for tables automatically created by InfluxDB protocol.
Available values: "last_non_null", "last_row". | | `jaeger` | -- | -- | Jaeger protocol options. | | `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. | | `prom_store` | -- | -- | Prometheus remote storage options | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 60115e93bb..39f38fbef9 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -199,6 +199,9 @@ enable = true [influxdb] ## Whether to enable InfluxDB protocol in HTTP API. enable = true +## Default merge mode for tables automatically created by InfluxDB protocol. +## Available values: "last_non_null", "last_row". +default_merge_mode = "last_non_null" ## Jaeger protocol options. [jaeger] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 5e790749fe..24249270b2 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -166,6 +166,9 @@ enable = true [influxdb] ## Whether to enable InfluxDB protocol in HTTP API. enable = true +## Default merge mode for tables automatically created by InfluxDB protocol. +## Available values: "last_non_null", "last_row". +default_merge_mode = "last_non_null" ## Jaeger protocol options. [jaeger] diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ac0c52fccd..359d59ccf8 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -99,6 +99,7 @@ use crate::error::{ ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StatementTimeoutSnafu, TableOperationSnafu, }; +use crate::service_config::InfluxdbMergeMode; use crate::stream_wrapper::CancellableStreamWrapper; lazy_static! { @@ -122,6 +123,7 @@ pub struct Instance { event_recorder: Option, process_manager: ProcessManagerRef, slow_query_options: SlowQueryOptions, + influxdb_default_merge_mode: InfluxdbMergeMode, suspend: Arc, // cache for otlp metrics diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 6ab1427067..ff857ed768 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -281,6 +281,7 @@ impl FrontendBuilder { process_manager, otlp_metrics_table_legacy_cache: DashMap::new(), slow_query_options: self.options.slow_query.clone(), + influxdb_default_merge_mode: self.options.influxdb.default_merge_mode, suspend: Arc::new(AtomicBool::new(false)), }) } diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index fe5fdeac77..ab63bfb6b2 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests, SemanticType}; use async_trait::async_trait; @@ -27,8 +29,25 @@ use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef}; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; +use store_api::mito_engine_options::MERGE_MODE_KEY; use crate::instance::Instance; +use crate::service_config::influxdb::InfluxdbMergeMode; + +fn ctx_with_default_merge_mode( + ctx: QueryContextRef, + default_merge_mode: InfluxdbMergeMode, +) -> QueryContextRef { + if ctx.extension(MERGE_MODE_KEY).is_none() + && default_merge_mode != InfluxdbMergeMode::LastNonNull + { + let mut ctx = (*ctx).clone(); + ctx.set_extension(MERGE_MODE_KEY, default_merge_mode.as_str()); + Arc::new(ctx) + } else { + ctx + } +} #[async_trait] impl InfluxdbLineProtocolHandler for Instance { @@ -57,6 +76,8 @@ impl InfluxdbLineProtocolHandler for Instance { .post_lines_conversion(requests, ctx.clone()) .await?; + let ctx = ctx_with_default_merge_mode(ctx, self.influxdb_default_merge_mode); + self.handle_influx_row_inserts(requests, ctx) .await .map_err(BoxedError::new) @@ -168,3 +189,43 @@ fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Resul TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()), }) } + +#[cfg(test)] +mod tests { + use session::context::QueryContext; + use store_api::mito_engine_options::MERGE_MODE_KEY; + + use super::*; + use crate::service_config::influxdb::InfluxdbMergeMode; + + #[test] + fn test_influxdb_default_merge_mode_reuses_default_context() { + let ctx = QueryContext::arc(); + let actual = ctx_with_default_merge_mode(ctx.clone(), InfluxdbMergeMode::LastNonNull); + + assert!(Arc::ptr_eq(&ctx, &actual)); + assert!(actual.extension(MERGE_MODE_KEY).is_none()); + } + + #[test] + fn test_influxdb_non_default_merge_mode_sets_extension() { + let ctx = QueryContext::arc(); + let actual = ctx_with_default_merge_mode(ctx.clone(), InfluxdbMergeMode::LastRow); + + assert!(!Arc::ptr_eq(&ctx, &actual)); + assert_eq!(Some("last_row"), actual.extension(MERGE_MODE_KEY)); + } + + #[test] + fn test_influxdb_explicit_merge_mode_keeps_context() { + let mut ctx = QueryContext::arc(); + Arc::get_mut(&mut ctx) + .unwrap() + .set_extension(MERGE_MODE_KEY, "last_row"); + + let actual = ctx_with_default_merge_mode(ctx.clone(), InfluxdbMergeMode::LastNonNull); + + assert!(Arc::ptr_eq(&ctx, &actual)); + assert_eq!(Some("last_row"), actual.extension(MERGE_MODE_KEY)); + } +} diff --git a/src/frontend/src/service_config.rs b/src/frontend/src/service_config.rs index cd4c0dabe9..03ec12e2c9 100644 --- a/src/frontend/src/service_config.rs +++ b/src/frontend/src/service_config.rs @@ -20,7 +20,7 @@ pub mod otlp; pub mod postgres; pub mod prom_store; -pub use influxdb::InfluxdbOptions; +pub use influxdb::{InfluxdbMergeMode, InfluxdbOptions}; pub use jaeger::JaegerOptions; pub use mysql::MysqlOptions; pub use opentsdb::OpentsdbOptions; diff --git a/src/frontend/src/service_config/influxdb.rs b/src/frontend/src/service_config/influxdb.rs index 177ee9bc35..912e342471 100644 --- a/src/frontend/src/service_config/influxdb.rs +++ b/src/frontend/src/service_config/influxdb.rs @@ -15,13 +15,35 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct InfluxdbOptions { pub enable: bool, + pub default_merge_mode: InfluxdbMergeMode, +} + +#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum InfluxdbMergeMode { + #[default] + LastNonNull, + LastRow, +} + +impl InfluxdbMergeMode { + pub fn as_str(&self) -> &'static str { + match self { + InfluxdbMergeMode::LastNonNull => "last_non_null", + InfluxdbMergeMode::LastRow => "last_row", + } + } } impl Default for InfluxdbOptions { fn default() -> Self { - Self { enable: true } + Self { + enable: true, + default_merge_mode: InfluxdbMergeMode::default(), + } } } @@ -33,5 +55,12 @@ mod tests { fn test_influxdb_options() { let default = InfluxdbOptions::default(); assert!(default.enable); + assert_eq!("last_non_null", default.default_merge_mode.as_str()); + } + + #[test] + fn test_influxdb_options_default_merge_mode() { + let options: InfluxdbOptions = toml::from_str("default_merge_mode = 'last_row'").unwrap(); + assert_eq!("last_row", options.default_merge_mode.as_str()); } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 81df09e4bc..ff8ed2b78b 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -1096,6 +1096,8 @@ pub fn fill_table_options_for_create( { table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string()); table_options.insert(MERGE_MODE_KEY.to_string(), "last_row".to_string()); + } else if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) { + table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string()); } else { table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string()); } @@ -1376,6 +1378,22 @@ mod tests { ); } + #[test] + fn test_last_non_null_create_options_use_configured_merge_mode() { + let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + ctx.set_extension(MERGE_MODE_KEY, "last_row"); + let ctx = Arc::new(ctx); + let mut table_options = Default::default(); + + fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx); + + assert_eq!( + Some("last_row"), + table_options.get(MERGE_MODE_KEY).map(String::as_str) + ); + assert!(!table_options.contains_key(APPEND_MODE_KEY)); + } + #[test] fn test_last_non_null_create_options_use_last_row_with_append_mode_true() { let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 4cdb4db56d..f17b78a7e5 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1554,6 +1554,7 @@ enable = true [influxdb] enable = true +default_merge_mode = "last_non_null" [jaeger] enable = true