feat: add InfluxDB default merge mode config (#8134)

* feat/influxdb-default-merge-mode: add InfluxDB merge mode config

- `influxdb` config: add `default_merge_mode` parsing and defaults in `src/frontend/src/service_config/influxdb.rs` and `src/frontend/src/service_config.rs`
- auto-create behavior: apply configured `merge_mode` for InfluxDB ingestion in `src/frontend/src/instance.rs`, `src/frontend/src/instance/builder.rs`, `src/frontend/src/instance/influxdb.rs`, and `src/operator/src/insert.rs`
- config docs: document `influxdb.default_merge_mode` in `config/frontend.example.toml`, `config/standalone.example.toml`, and `config/config.md`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/influxdb-default-merge-mode: derive merge mode default

- `influxdb` config: derive `Default` for `InfluxdbMergeMode` in `src/frontend/src/service_config/influxdb.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/influxdb-default-merge-mode: update config API snapshot

- `config API`: include `default_merge_mode` in `tests-integration/tests/http.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/influxdb-default-merge-mode: avoid default context clone

- `InfluxDB merge mode`: avoid cloning `QueryContext` for default `last_non_null` in `src/frontend/src/instance/influxdb.rs`
- `InfluxDB merge mode`: cover default, configured, and explicit `MERGE_MODE_KEY` paths in `src/frontend/src/instance/influxdb.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-05-20 00:54:36 +08:00
committed by GitHub
parent 418318da51
commit f8df016623
10 changed files with 122 additions and 2 deletions

View File

@@ -65,6 +65,7 @@
| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. |
| `influxdb` | -- | -- | InfluxDB protocol options. |
| `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. |
| `influxdb.default_merge_mode` | String | `last_non_null` | Default merge mode for tables automatically created by InfluxDB protocol.<br/>Available values: "last_non_null", "last_row". |
| `jaeger` | -- | -- | Jaeger protocol options. |
| `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. |
| `prom_store` | -- | -- | Prometheus remote storage options |
@@ -286,6 +287,7 @@
| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. |
| `influxdb` | -- | -- | InfluxDB protocol options. |
| `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. |
| `influxdb.default_merge_mode` | String | `last_non_null` | Default merge mode for tables automatically created by InfluxDB protocol.<br/>Available values: "last_non_null", "last_row". |
| `jaeger` | -- | -- | Jaeger protocol options. |
| `jaeger.enable` | Bool | `true` | Whether to enable Jaeger protocol in HTTP API. |
| `prom_store` | -- | -- | Prometheus remote storage options |

View File

@@ -199,6 +199,9 @@ enable = true
[influxdb]
## Whether to enable InfluxDB protocol in HTTP API.
enable = true
## Default merge mode for tables automatically created by InfluxDB protocol.
## Available values: "last_non_null", "last_row".
default_merge_mode = "last_non_null"
## Jaeger protocol options.
[jaeger]

View File

@@ -166,6 +166,9 @@ enable = true
[influxdb]
## Whether to enable InfluxDB protocol in HTTP API.
enable = true
## Default merge mode for tables automatically created by InfluxDB protocol.
## Available values: "last_non_null", "last_row".
default_merge_mode = "last_non_null"
## Jaeger protocol options.
[jaeger]

View File

@@ -99,6 +99,7 @@ use crate::error::{
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
StatementTimeoutSnafu, TableOperationSnafu,
};
use crate::service_config::InfluxdbMergeMode;
use crate::stream_wrapper::CancellableStreamWrapper;
lazy_static! {
@@ -122,6 +123,7 @@ pub struct Instance {
event_recorder: Option<EventRecorderRef>,
process_manager: ProcessManagerRef,
slow_query_options: SlowQueryOptions,
influxdb_default_merge_mode: InfluxdbMergeMode,
suspend: Arc<AtomicBool>,
// cache for otlp metrics

View File

@@ -281,6 +281,7 @@ impl FrontendBuilder {
process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(),
slow_query_options: self.options.slow_query.clone(),
influxdb_default_merge_mode: self.options.influxdb.default_merge_mode,
suspend: Arc::new(AtomicBool::new(false)),
})
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
use async_trait::async_trait;
@@ -27,8 +29,25 @@ use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::mito_engine_options::MERGE_MODE_KEY;
use crate::instance::Instance;
use crate::service_config::influxdb::InfluxdbMergeMode;
fn ctx_with_default_merge_mode(
ctx: QueryContextRef,
default_merge_mode: InfluxdbMergeMode,
) -> QueryContextRef {
if ctx.extension(MERGE_MODE_KEY).is_none()
&& default_merge_mode != InfluxdbMergeMode::LastNonNull
{
let mut ctx = (*ctx).clone();
ctx.set_extension(MERGE_MODE_KEY, default_merge_mode.as_str());
Arc::new(ctx)
} else {
ctx
}
}
#[async_trait]
impl InfluxdbLineProtocolHandler for Instance {
@@ -57,6 +76,8 @@ impl InfluxdbLineProtocolHandler for Instance {
.post_lines_conversion(requests, ctx.clone())
.await?;
let ctx = ctx_with_default_merge_mode(ctx, self.influxdb_default_merge_mode);
self.handle_influx_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
@@ -168,3 +189,43 @@ fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Resul
TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
})
}
#[cfg(test)]
mod tests {
use session::context::QueryContext;
use store_api::mito_engine_options::MERGE_MODE_KEY;
use super::*;
use crate::service_config::influxdb::InfluxdbMergeMode;
#[test]
fn test_influxdb_default_merge_mode_reuses_default_context() {
let ctx = QueryContext::arc();
let actual = ctx_with_default_merge_mode(ctx.clone(), InfluxdbMergeMode::LastNonNull);
assert!(Arc::ptr_eq(&ctx, &actual));
assert!(actual.extension(MERGE_MODE_KEY).is_none());
}
#[test]
fn test_influxdb_non_default_merge_mode_sets_extension() {
let ctx = QueryContext::arc();
let actual = ctx_with_default_merge_mode(ctx.clone(), InfluxdbMergeMode::LastRow);
assert!(!Arc::ptr_eq(&ctx, &actual));
assert_eq!(Some("last_row"), actual.extension(MERGE_MODE_KEY));
}
#[test]
fn test_influxdb_explicit_merge_mode_keeps_context() {
let mut ctx = QueryContext::arc();
Arc::get_mut(&mut ctx)
.unwrap()
.set_extension(MERGE_MODE_KEY, "last_row");
let actual = ctx_with_default_merge_mode(ctx.clone(), InfluxdbMergeMode::LastNonNull);
assert!(Arc::ptr_eq(&ctx, &actual));
assert_eq!(Some("last_row"), actual.extension(MERGE_MODE_KEY));
}
}

View File

@@ -20,7 +20,7 @@ pub mod otlp;
pub mod postgres;
pub mod prom_store;
pub use influxdb::InfluxdbOptions;
pub use influxdb::{InfluxdbMergeMode, InfluxdbOptions};
pub use jaeger::JaegerOptions;
pub use mysql::MysqlOptions;
pub use opentsdb::OpentsdbOptions;

View File

@@ -15,13 +15,35 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct InfluxdbOptions {
pub enable: bool,
pub default_merge_mode: InfluxdbMergeMode,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum InfluxdbMergeMode {
#[default]
LastNonNull,
LastRow,
}
impl InfluxdbMergeMode {
pub fn as_str(&self) -> &'static str {
match self {
InfluxdbMergeMode::LastNonNull => "last_non_null",
InfluxdbMergeMode::LastRow => "last_row",
}
}
}
impl Default for InfluxdbOptions {
fn default() -> Self {
Self { enable: true }
Self {
enable: true,
default_merge_mode: InfluxdbMergeMode::default(),
}
}
}
@@ -33,5 +55,12 @@ mod tests {
fn test_influxdb_options() {
let default = InfluxdbOptions::default();
assert!(default.enable);
assert_eq!("last_non_null", default.default_merge_mode.as_str());
}
#[test]
fn test_influxdb_options_default_merge_mode() {
let options: InfluxdbOptions = toml::from_str("default_merge_mode = 'last_row'").unwrap();
assert_eq!("last_row", options.default_merge_mode.as_str());
}
}

View File

@@ -1096,6 +1096,8 @@ pub fn fill_table_options_for_create(
{
table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
table_options.insert(MERGE_MODE_KEY.to_string(), "last_row".to_string());
} else if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
} else {
table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
}
@@ -1376,6 +1378,22 @@ mod tests {
);
}
#[test]
fn test_last_non_null_create_options_use_configured_merge_mode() {
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
ctx.set_extension(MERGE_MODE_KEY, "last_row");
let ctx = Arc::new(ctx);
let mut table_options = Default::default();
fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx);
assert_eq!(
Some("last_row"),
table_options.get(MERGE_MODE_KEY).map(String::as_str)
);
assert!(!table_options.contains_key(APPEND_MODE_KEY));
}
#[test]
fn test_last_non_null_create_options_use_last_row_with_append_mode_true() {
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);

View File

@@ -1554,6 +1554,7 @@ enable = true
[influxdb]
enable = true
default_merge_mode = "last_non_null"
[jaeger]
enable = true