diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 01c526aba9..1b8e0c9f46 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -12,16 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, RowInsertRequests, SemanticType}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; +use catalog::CatalogManagerRef; use client::Output; use common_error::ext::BoxedError; -use servers::error::{AuthSnafu, Error, OtherSnafu}; +use common_time::Timestamp; +use common_time::timestamp::TimeUnit; +use servers::error::{ + AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu, +}; use servers::influxdb::InfluxdbRequest; use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef}; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::instance::Instance; @@ -42,6 +49,12 @@ impl InfluxdbLineProtocolHandler for Instance { interceptor_ref.pre_execute(&request.lines, ctx.clone())?; let requests = request.try_into()?; + + let aligner = InfluxdbLineTimestampAligner { + catalog_manager: self.catalog_manager(), + }; + let requests = aligner.align_timestamps(requests, &ctx).await?; + let requests = interceptor_ref .post_lines_conversion(requests, ctx.clone()) .await?; @@ -64,3 +77,109 @@ impl InfluxdbLineProtocolHandler for Instance { .context(servers::error::ExecuteGrpcQuerySnafu) } } + +/// Align the timestamp precisions in Influxdb lines (after they are converted to the GRPC row +/// inserts) to the time index columns' time units of the created tables (if there are any). +struct InfluxdbLineTimestampAligner<'a> { + catalog_manager: &'a CatalogManagerRef, +} + +impl InfluxdbLineTimestampAligner<'_> { + async fn align_timestamps( + &self, + requests: RowInsertRequests, + query_context: &QueryContextRef, + ) -> servers::error::Result { + let mut inserts = requests.inserts; + for insert in inserts.iter_mut() { + let Some(rows) = &mut insert.rows else { + continue; + }; + + let Some(target_time_unit) = self + .catalog_manager + .table( + query_context.current_catalog(), + &query_context.current_schema(), + &insert.table_name, + Some(query_context), + ) + .await + .context(CatalogSnafu)? + .map(|x| x.schema()) + .and_then(|schema| { + schema.timestamp_column().map(|col| { + col.data_type + .as_timestamp() + .expect("Time index column is not of timestamp type?!") + .unit() + }) + }) + else { + continue; + }; + + let target_timestamp_type = match target_time_unit { + TimeUnit::Second => ColumnDataType::TimestampSecond, + TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond, + TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond, + TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond, + }; + let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| { + if x.semantic_type() == SemanticType::Timestamp + && x.datatype() != target_timestamp_type + { + Some(i) + } else { + None + } + }) else { + continue; + }; + + // Indexing safety: `to_be_aligned` is guaranteed to be a valid index because it's got + // from "enumerate" the schema vector above. + rows.schema[to_be_aligned].datatype = target_timestamp_type as i32; + + for row in rows.rows.iter_mut() { + let Some(time_value) = row + .values + .get_mut(to_be_aligned) + .and_then(|x| x.value_data.as_mut()) + else { + continue; + }; + *time_value = align_time_unit(time_value, target_time_unit)?; + } + } + Ok(RowInsertRequests { inserts }) + } +} + +fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result { + let timestamp = match value { + ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x), + ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x), + ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x), + ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x), + _ => { + return UnexpectedResultSnafu { + reason: format!("Timestamp value '{:?}' is not of timestamp type!", value), + } + .fail(); + } + }; + + let timestamp = timestamp + .convert_to(target) + .with_context(|| TimestampOverflowSnafu { + error: format!("{:?} convert to {}", timestamp, target), + })?; + + Ok(match target { + TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()), + TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()), + TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()), + TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()), + }) +} diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index e0eb7192c6..c047cc4c01 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -17,7 +17,9 @@ mod test { use std::sync::Arc; use client::OutputData; + use common_grpc::precision::Precision; use common_recordbatch::RecordBatches; + use common_test_util::recordbatch::check_output_stream; use rstest::rstest; use rstest_reuse::apply; use servers::influxdb::InfluxdbRequest; @@ -110,19 +112,95 @@ monitor1,host=host2 cpu=32 1663840496400340001"; ) .await; let output = output.remove(0).unwrap(); - let OutputData::Stream(stream) = output.data else { - unreachable!() - }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - assert_eq!( - recordbatches.pretty_print().unwrap(), - "\ + let expected = "\ +-------------------------------+-------+------+--------+ | greptime_timestamp | host | cpu | memory | +-------------------------------+-------+------+--------+ | 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 | | 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 | -+-------------------------------+-------+------+--------+" - ); ++-------------------------------+-------+------+--------+"; + check_output_stream(output.data, expected).await; + } + + #[apply(both_instances_cases)] + async fn test_put_influxdb_lines_with_auto_aligning_timestamps( + instance: Arc, + ) { + let instance = instance.frontend(); + + // First create a table with millisecond time index. + let sql = "create table monitor ( + ts timestamp time index, + host string primary key, + cpu double, + memory double, + )"; + instance.do_query(sql, QueryContext::arc()).await; + + // Insert some influxdb lines with millisecond precision. + let lines = r" +monitor,host=127.0.0.1 cpu=0.1,memory=1.0 1719460800001 +monitor,host=127.0.0.2 cpu=0.2,memory=2.0 1719460800002"; + let request = InfluxdbRequest { + precision: Some(Precision::Millisecond), + lines: lines.to_string(), + }; + instance.exec(request, QueryContext::arc()).await.unwrap(); + + // Insert some influxdb lines without precision. + // According to the specification (both v1 and v2), if precision is not set, it is default + // to "nanosecond". The lines here will be converted to insert requests as usual and then + // be aligned to millisecond time unit. + let lines = r" +monitor,host=127.0.0.1 cpu=0.3,memory=3.0 1719460800003000000 +monitor,host=127.0.0.2 cpu=0.4,memory=4.0 1719460800004000000"; + let request = InfluxdbRequest { + precision: None, + lines: lines.to_string(), + }; + instance.exec(request, QueryContext::arc()).await.unwrap(); + + // Insert other influxdb lines with nanosecond precision. + let lines = r" +monitor,host=127.0.0.1 cpu=0.5,memory=5.0 1719460800005000000 +monitor,host=127.0.0.2 cpu=0.6,memory=6.0 1719460800006000000"; + let request = InfluxdbRequest { + precision: Some(Precision::Nanosecond), + lines: lines.to_string(), + }; + instance.exec(request, QueryContext::arc()).await.unwrap(); + + // Insert other influxdb lines with second precision. + let lines = r" +monitor,host=127.0.0.1 cpu=0.7,memory=7.0 1719460801 +monitor,host=127.0.0.2 cpu=0.8,memory=8.0 1719460802"; + let request = InfluxdbRequest { + precision: Some(Precision::Second), + lines: lines.to_string(), + }; + instance.exec(request, QueryContext::arc()).await.unwrap(); + + // Check the data. + let mut output = instance + .do_query( + "SELECT ts, host, cpu, memory FROM monitor ORDER BY ts", + QueryContext::arc(), + ) + .await; + let output = output.remove(0).unwrap(); + let expected = "\ ++-------------------------+-----------+-----+--------+ +| ts | host | cpu | memory | ++-------------------------+-----------+-----+--------+ +| 2024-06-27T04:00:00.001 | 127.0.0.1 | 0.1 | 1.0 | +| 2024-06-27T04:00:00.002 | 127.0.0.2 | 0.2 | 2.0 | +| 2024-06-27T04:00:00.003 | 127.0.0.1 | 0.3 | 3.0 | +| 2024-06-27T04:00:00.004 | 127.0.0.2 | 0.4 | 4.0 | +| 2024-06-27T04:00:00.005 | 127.0.0.1 | 0.5 | 5.0 | +| 2024-06-27T04:00:00.006 | 127.0.0.2 | 0.6 | 6.0 | +| 2024-06-27T04:00:01 | 127.0.0.1 | 0.7 | 7.0 | +| 2024-06-27T04:00:02 | 127.0.0.2 | 0.8 | 8.0 | ++-------------------------+-----------+-----+--------+"; + check_output_stream(output.data, expected).await; } }