feat: align influxdb line timestamp with table time index (#7057)

* feat: align influxdb line timestamp with table time index

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-10-10 15:37:52 +08:00
committed by GitHub
parent aa84642afc
commit 3738440753
2 changed files with 208 additions and 11 deletions

View File

@@ -12,16 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::CatalogManagerRef;
use client::Output;
use common_error::ext::BoxedError;
use servers::error::{AuthSnafu, Error, OtherSnafu};
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use servers::error::{
AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu,
};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::instance::Instance;
@@ -42,6 +49,12 @@ impl InfluxdbLineProtocolHandler for Instance {
interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
let requests = request.try_into()?;
let aligner = InfluxdbLineTimestampAligner {
catalog_manager: self.catalog_manager(),
};
let requests = aligner.align_timestamps(requests, &ctx).await?;
let requests = interceptor_ref
.post_lines_conversion(requests, ctx.clone())
.await?;
@@ -64,3 +77,109 @@ impl InfluxdbLineProtocolHandler for Instance {
.context(servers::error::ExecuteGrpcQuerySnafu)
}
}
/// Align the timestamp precisions in Influxdb lines (after they are converted to the GRPC row
/// inserts) to the time index columns' time units of the created tables (if there are any).
struct InfluxdbLineTimestampAligner<'a> {
catalog_manager: &'a CatalogManagerRef,
}
impl InfluxdbLineTimestampAligner<'_> {
async fn align_timestamps(
&self,
requests: RowInsertRequests,
query_context: &QueryContextRef,
) -> servers::error::Result<RowInsertRequests> {
let mut inserts = requests.inserts;
for insert in inserts.iter_mut() {
let Some(rows) = &mut insert.rows else {
continue;
};
let Some(target_time_unit) = self
.catalog_manager
.table(
query_context.current_catalog(),
&query_context.current_schema(),
&insert.table_name,
Some(query_context),
)
.await
.context(CatalogSnafu)?
.map(|x| x.schema())
.and_then(|schema| {
schema.timestamp_column().map(|col| {
col.data_type
.as_timestamp()
.expect("Time index column is not of timestamp type?!")
.unit()
})
})
else {
continue;
};
let target_timestamp_type = match target_time_unit {
TimeUnit::Second => ColumnDataType::TimestampSecond,
TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
};
let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| {
if x.semantic_type() == SemanticType::Timestamp
&& x.datatype() != target_timestamp_type
{
Some(i)
} else {
None
}
}) else {
continue;
};
// Indexing safety: `to_be_aligned` is guaranteed to be a valid index because it's got
// from "enumerate" the schema vector above.
rows.schema[to_be_aligned].datatype = target_timestamp_type as i32;
for row in rows.rows.iter_mut() {
let Some(time_value) = row
.values
.get_mut(to_be_aligned)
.and_then(|x| x.value_data.as_mut())
else {
continue;
};
*time_value = align_time_unit(time_value, target_time_unit)?;
}
}
Ok(RowInsertRequests { inserts })
}
}
fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result<ValueData> {
let timestamp = match value {
ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x),
ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x),
ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x),
ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x),
_ => {
return UnexpectedResultSnafu {
reason: format!("Timestamp value '{:?}' is not of timestamp type!", value),
}
.fail();
}
};
let timestamp = timestamp
.convert_to(target)
.with_context(|| TimestampOverflowSnafu {
error: format!("{:?} convert to {}", timestamp, target),
})?;
Ok(match target {
TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
})
}

View File

@@ -17,7 +17,9 @@ mod test {
use std::sync::Arc;
use client::OutputData;
use common_grpc::precision::Precision;
use common_recordbatch::RecordBatches;
use common_test_util::recordbatch::check_output_stream;
use rstest::rstest;
use rstest_reuse::apply;
use servers::influxdb::InfluxdbRequest;
@@ -110,19 +112,95 @@ monitor1,host=host2 cpu=32 1663840496400340001";
)
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
let expected = "\
+-------------------------------+-------+------+--------+
| greptime_timestamp | host | cpu | memory |
+-------------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
| 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 |
+-------------------------------+-------+------+--------+"
);
+-------------------------------+-------+------+--------+";
check_output_stream(output.data, expected).await;
}
#[apply(both_instances_cases)]
async fn test_put_influxdb_lines_with_auto_aligning_timestamps(
instance: Arc<dyn MockInstance>,
) {
let instance = instance.frontend();
// First create a table with millisecond time index.
let sql = "create table monitor (
ts timestamp time index,
host string primary key,
cpu double,
memory double,
)";
instance.do_query(sql, QueryContext::arc()).await;
// Insert some influxdb lines with millisecond precision.
let lines = r"
monitor,host=127.0.0.1 cpu=0.1,memory=1.0 1719460800001
monitor,host=127.0.0.2 cpu=0.2,memory=2.0 1719460800002";
let request = InfluxdbRequest {
precision: Some(Precision::Millisecond),
lines: lines.to_string(),
};
instance.exec(request, QueryContext::arc()).await.unwrap();
// Insert some influxdb lines without precision.
// According to the specification (both v1 and v2), if precision is not set, it is default
// to "nanosecond". The lines here will be converted to insert requests as usual and then
// be aligned to millisecond time unit.
let lines = r"
monitor,host=127.0.0.1 cpu=0.3,memory=3.0 1719460800003000000
monitor,host=127.0.0.2 cpu=0.4,memory=4.0 1719460800004000000";
let request = InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
instance.exec(request, QueryContext::arc()).await.unwrap();
// Insert other influxdb lines with nanosecond precision.
let lines = r"
monitor,host=127.0.0.1 cpu=0.5,memory=5.0 1719460800005000000
monitor,host=127.0.0.2 cpu=0.6,memory=6.0 1719460800006000000";
let request = InfluxdbRequest {
precision: Some(Precision::Nanosecond),
lines: lines.to_string(),
};
instance.exec(request, QueryContext::arc()).await.unwrap();
// Insert other influxdb lines with second precision.
let lines = r"
monitor,host=127.0.0.1 cpu=0.7,memory=7.0 1719460801
monitor,host=127.0.0.2 cpu=0.8,memory=8.0 1719460802";
let request = InfluxdbRequest {
precision: Some(Precision::Second),
lines: lines.to_string(),
};
instance.exec(request, QueryContext::arc()).await.unwrap();
// Check the data.
let mut output = instance
.do_query(
"SELECT ts, host, cpu, memory FROM monitor ORDER BY ts",
QueryContext::arc(),
)
.await;
let output = output.remove(0).unwrap();
let expected = "\
+-------------------------+-----------+-----+--------+
| ts | host | cpu | memory |
+-------------------------+-----------+-----+--------+
| 2024-06-27T04:00:00.001 | 127.0.0.1 | 0.1 | 1.0 |
| 2024-06-27T04:00:00.002 | 127.0.0.2 | 0.2 | 2.0 |
| 2024-06-27T04:00:00.003 | 127.0.0.1 | 0.3 | 3.0 |
| 2024-06-27T04:00:00.004 | 127.0.0.2 | 0.4 | 4.0 |
| 2024-06-27T04:00:00.005 | 127.0.0.1 | 0.5 | 5.0 |
| 2024-06-27T04:00:00.006 | 127.0.0.2 | 0.6 | 6.0 |
| 2024-06-27T04:00:01 | 127.0.0.1 | 0.7 | 7.0 |
| 2024-06-27T04:00:02 | 127.0.0.2 | 0.8 | 8.0 |
+-------------------------+-----------+-----+--------+";
check_output_stream(output.data, expected).await;
}
}