From 695398652cfaa2bdf26a0001c33be20ac58f15de Mon Sep 17 00:00:00 2001 From: Niwaka <61189782+NiwakaDev@users.noreply.github.com> Date: Mon, 31 Jul 2023 11:55:09 +0900 Subject: [PATCH] feat: accept influxdb request without timestamp even if table doesn't exist (#2041) * feat: accept influxdb request without timestamp even if table doesn't exist * refactor: InsertRequests::try_from * feat: check row number --- src/common/grpc/src/writer.rs | 14 ++++++++++ src/servers/src/influxdb.rs | 46 +++++++++++++++++++++++++------ tests-integration/src/influxdb.rs | 41 +++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 8 deletions(-) diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 75061c5fc6..adb0d46ca4 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Display; use api::helper::values_with_capacity; use api::v1::column::SemanticType; @@ -246,6 +247,19 @@ pub enum Precision { Hour, } +impl Display for Precision { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Precision::Nanosecond => write!(f, "Precision::Nanosecond"), + Precision::Microsecond => write!(f, "Precision::Microsecond"), + Precision::Millisecond => write!(f, "Precision::Millisecond"), + Precision::Second => write!(f, "Precision::Second"), + Precision::Minute => write!(f, "Precision::Minute"), + Precision::Hour => write!(f, "Precision::Hour"), + } + } +} + #[cfg(test)] mod tests { use api::v1::column::SemanticType; diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 3086e36cbb..b848579b8f 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -16,10 +16,12 @@ use std::collections::HashMap; use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests}; use common_grpc::writer::{LinesWriter, Precision}; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; use influxdb_line_protocol::{parse_lines, FieldValue}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; -use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu}; +use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu}; pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond; @@ -90,16 +92,36 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests { } if let Some(timestamp) = line.timestamp { - let precision = if let Some(val) = &value.precision { - *val - } else { - DEFAULT_TIME_PRECISION - }; + let precision = unwarp_or_default_precision(value.precision); writer .write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision)) .context(InfluxdbLinesWriteSnafu)?; + } else { + let precision = unwarp_or_default_precision(value.precision); + let timestamp = Timestamp::current_millis(); + let unit = match precision { + Precision::Second => TimeUnit::Second, + Precision::Millisecond => TimeUnit::Millisecond, + Precision::Microsecond => TimeUnit::Microsecond, + Precision::Nanosecond => TimeUnit::Nanosecond, + _ => { + return Err(Error::NotSupported { + feat: format!("convert {precision} into TimeUnit"), + }) + } + }; + let timestamp = timestamp + .convert_to(unit) + .with_context(|| TimePrecisionSnafu { + name: precision.to_string(), + })?; + writer + .write_ts( + INFLUXDB_TIMESTAMP_COLUMN_NAME, + (timestamp.into(), precision), + ) + .context(InfluxdbLinesWriteSnafu)?; } - writer.commit(); } @@ -119,6 +141,14 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests { } } +fn unwarp_or_default_precision(precision: Option) -> Precision { + if let Some(val) = precision { + val + } else { + DEFAULT_TIME_PRECISION + } +} + #[cfg(test)] mod tests { use api::v1::column::{SemanticType, Values}; diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index 72860fd6df..b96591edfe 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -42,6 +42,47 @@ mod test { test_put_influxdb_lines(&instance.frontend()).await; } + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_put_influxdb_lines_without_time_column() { + let standalone = + tests::create_standalone_instance("test_standalone_put_influxdb_lines").await; + test_put_influxdb_lines_without_time_column(&standalone.instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_put_influxdb_lines_without_time_column() { + let instance = + tests::create_distributed_instance("test_distributed_put_influxdb_lines").await; + test_put_influxdb_lines_without_time_column(&instance.frontend()).await; + } + + async fn test_put_influxdb_lines_without_time_column(instance: &Arc) { + let lines = r" +monitor1,host=host1 cpu=66.6,memory=1024 +monitor1,host=host2 memory=1027"; + let request = InfluxdbRequest { + precision: None, + lines: lines.to_string(), + }; + assert!(instance.exec(&request, QueryContext::arc()).await.is_ok()); + + let mut output = instance + .do_query( + "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts", + QueryContext::arc(), + ) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let recordbatches: Vec<_> = recordbatches.iter().collect(); + let total = recordbatches + .into_iter() + .fold(0, |total, recordbatch| total + recordbatch.num_rows()); + assert_eq!(total, 2); + } + async fn test_put_influxdb_lines(instance: &Arc) { let lines = r" monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100