fix!: use the right precision (#3794)

* fix: use write precision

* chore: rename error

* chore: add doc

* chore: by comment

* chore: right test result

* chore: typo

* chore: add doc
This commit is contained in:
Jeremyhi
2024-04-25 11:20:10 +08:00
committed by GitHub
parent e0b5f52c2a
commit 9524ec83bc
15 changed files with 300 additions and 751 deletions

View File

@@ -15,7 +15,7 @@
pub mod channel_manager;
pub mod error;
pub mod flight;
pub mod precision;
pub mod select;
pub mod writer;
pub use error::Error;

View File

@@ -0,0 +1,141 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use common_time::timestamp::TimeUnit;
use crate::Error;
/// Precision represents the precision of a timestamp.
/// It is used to convert timestamps between different precisions.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Precision {
Nanosecond,
Microsecond,
Millisecond,
Second,
Minute,
Hour,
}
impl Precision {
pub fn to_nanos(&self, amount: i64) -> Option<i64> {
match self {
Precision::Nanosecond => Some(amount),
Precision::Microsecond => amount.checked_mul(1_000),
Precision::Millisecond => amount.checked_mul(1_000_000),
Precision::Second => amount.checked_mul(1_000_000_000),
Precision::Minute => amount
.checked_mul(60)
.and_then(|a| a.checked_mul(1_000_000_000)),
Precision::Hour => amount
.checked_mul(3600)
.and_then(|a| a.checked_mul(1_000_000_000)),
}
}
pub fn to_millis(&self, amount: i64) -> Option<i64> {
match self {
Precision::Nanosecond => amount.checked_div(1_000_000),
Precision::Microsecond => amount.checked_div(1_000),
Precision::Millisecond => Some(amount),
Precision::Second => amount.checked_mul(1_000),
Precision::Minute => amount.checked_mul(60_000),
Precision::Hour => amount.checked_mul(3_600_000),
}
}
}
impl Display for Precision {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Precision::Nanosecond => write!(f, "Precision::Nanosecond"),
Precision::Microsecond => write!(f, "Precision::Microsecond"),
Precision::Millisecond => write!(f, "Precision::Millisecond"),
Precision::Second => write!(f, "Precision::Second"),
Precision::Minute => write!(f, "Precision::Minute"),
Precision::Hour => write!(f, "Precision::Hour"),
}
}
}
impl TryFrom<Precision> for TimeUnit {
type Error = Error;
fn try_from(precision: Precision) -> Result<Self, Self::Error> {
Ok(match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
})
}
}
#[cfg(test)]
mod tests {
use crate::precision::Precision;
#[test]
fn test_to_nanos() {
assert_eq!(Precision::Nanosecond.to_nanos(1).unwrap(), 1);
assert_eq!(Precision::Microsecond.to_nanos(1).unwrap(), 1_000);
assert_eq!(Precision::Millisecond.to_nanos(1).unwrap(), 1_000_000);
assert_eq!(Precision::Second.to_nanos(1).unwrap(), 1_000_000_000);
assert_eq!(Precision::Minute.to_nanos(1).unwrap(), 60 * 1_000_000_000);
assert_eq!(
Precision::Hour.to_nanos(1).unwrap(),
60 * 60 * 1_000_000_000
);
}
#[test]
fn test_to_millis() {
assert_eq!(Precision::Nanosecond.to_millis(1_000_000).unwrap(), 1);
assert_eq!(Precision::Microsecond.to_millis(1_000).unwrap(), 1);
assert_eq!(Precision::Millisecond.to_millis(1).unwrap(), 1);
assert_eq!(Precision::Second.to_millis(1).unwrap(), 1_000);
assert_eq!(Precision::Minute.to_millis(1).unwrap(), 60 * 1_000);
assert_eq!(Precision::Hour.to_millis(1).unwrap(), 60 * 60 * 1_000);
}
#[test]
fn test_to_nanos_basic() {
assert_eq!(Precision::Second.to_nanos(1), Some(1_000_000_000));
assert_eq!(Precision::Minute.to_nanos(1), Some(60 * 1_000_000_000));
}
#[test]
fn test_to_millis_basic() {
assert_eq!(Precision::Second.to_millis(1), Some(1_000));
assert_eq!(Precision::Minute.to_millis(1), Some(60_000));
}
#[test]
fn test_to_nanos_overflow() {
assert_eq!(Precision::Hour.to_nanos(i64::MAX / 100), None);
}
#[test]
fn test_zero_input() {
assert_eq!(Precision::Second.to_nanos(0), Some(0));
assert_eq!(Precision::Minute.to_millis(0), Some(0));
}
}

View File

@@ -1,441 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Display;
use api::helper::values_with_capacity;
use api::v1::{Column, ColumnDataType, ColumnDataTypeExtension, SemanticType};
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use snafu::ensure;
use crate::error::{Result, TypeMismatchSnafu};
use crate::Error;
type ColumnName = String;
type RowCount = u32;
// TODO(fys): will remove in the future.
#[derive(Default)]
pub struct LinesWriter {
column_name_index: HashMap<ColumnName, usize>,
null_masks: Vec<BitVec>,
batch: (Vec<Column>, RowCount),
lines: usize,
}
impl LinesWriter {
pub fn with_lines(lines: usize) -> Self {
Self {
lines,
..Default::default()
}
}
pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) -> Result<()> {
let (idx, column) = self.mut_column(
column_name,
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
None,
);
ensure!(
column.datatype == ColumnDataType::TimestampMillisecond as i32,
TypeMismatchSnafu {
column_name,
expected: "timestamp",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values
.timestamp_millisecond_values
.push(to_ms_ts(value.1, value.0));
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> {
let (idx, column) =
self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag, None);
ensure!(
column.datatype == ColumnDataType::String as i32,
TypeMismatchSnafu {
column_name,
expected: "string",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.string_values.push(value.to_string());
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_u64(&mut self, column_name: &str, value: u64) -> Result<()> {
let (idx, column) = self.mut_column(
column_name,
ColumnDataType::Uint64,
SemanticType::Field,
None,
);
ensure!(
column.datatype == ColumnDataType::Uint64 as i32,
TypeMismatchSnafu {
column_name,
expected: "u64",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.u64_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_i64(&mut self, column_name: &str, value: i64) -> Result<()> {
let (idx, column) = self.mut_column(
column_name,
ColumnDataType::Int64,
SemanticType::Field,
None,
);
ensure!(
column.datatype == ColumnDataType::Int64 as i32,
TypeMismatchSnafu {
column_name,
expected: "i64",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.i64_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_f64(&mut self, column_name: &str, value: f64) -> Result<()> {
let (idx, column) = self.mut_column(
column_name,
ColumnDataType::Float64,
SemanticType::Field,
None,
);
ensure!(
column.datatype == ColumnDataType::Float64 as i32,
TypeMismatchSnafu {
column_name,
expected: "f64",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.f64_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_string(&mut self, column_name: &str, value: &str) -> Result<()> {
let (idx, column) = self.mut_column(
column_name,
ColumnDataType::String,
SemanticType::Field,
None,
);
ensure!(
column.datatype == ColumnDataType::String as i32,
TypeMismatchSnafu {
column_name,
expected: "string",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.string_values.push(value.to_string());
self.null_masks[idx].push(false);
Ok(())
}
pub fn write_bool(&mut self, column_name: &str, value: bool) -> Result<()> {
let (idx, column) = self.mut_column(
column_name,
ColumnDataType::Boolean,
SemanticType::Field,
None,
);
ensure!(
column.datatype == ColumnDataType::Boolean as i32,
TypeMismatchSnafu {
column_name,
expected: "boolean",
actual: format!("{:?}", column.datatype)
}
);
// It is safe to use unwrap here, because values has been initialized in mut_column()
let values = column.values.as_mut().unwrap();
values.bool_values.push(value);
self.null_masks[idx].push(false);
Ok(())
}
pub fn commit(&mut self) {
let batch = &mut self.batch;
batch.1 += 1;
for i in 0..batch.0.len() {
let null_mask = &mut self.null_masks[i];
if batch.1 as usize > null_mask.len() {
null_mask.push(true);
}
}
}
pub fn finish(mut self) -> (Vec<Column>, RowCount) {
let null_masks = self.null_masks;
for (i, null_mask) in null_masks.into_iter().enumerate() {
let columns = &mut self.batch.0;
columns[i].null_mask = null_mask.into_vec();
}
self.batch
}
fn mut_column(
&mut self,
column_name: &str,
datatype: ColumnDataType,
semantic_type: SemanticType,
datatype_extension: Option<ColumnDataTypeExtension>,
) -> (usize, &mut Column) {
let column_names = &mut self.column_name_index;
let column_idx = match column_names.get(column_name) {
Some(i) => *i,
None => {
let new_idx = column_names.len();
let batch = &mut self.batch;
let to_insert = self.lines;
let mut null_mask = BitVec::with_capacity(to_insert);
null_mask.extend(BitVec::repeat(true, batch.1 as usize));
self.null_masks.push(null_mask);
batch.0.push(Column {
column_name: column_name.to_string(),
semantic_type: semantic_type.into(),
values: Some(values_with_capacity(datatype, to_insert)),
datatype: datatype as i32,
null_mask: Vec::default(),
datatype_extension,
});
let _ = column_names.insert(column_name.to_string(), new_idx);
new_idx
}
};
(column_idx, &mut self.batch.0[column_idx])
}
}
pub fn to_ms_ts(p: Precision, ts: i64) -> i64 {
match p {
Precision::Nanosecond => ts / 1_000_000,
Precision::Microsecond => ts / 1000,
Precision::Millisecond => ts,
Precision::Second => ts * 1000,
Precision::Minute => ts * 1000 * 60,
Precision::Hour => ts * 1000 * 60 * 60,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Precision {
Nanosecond,
Microsecond,
Millisecond,
Second,
Minute,
Hour,
}
impl Display for Precision {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Precision::Nanosecond => write!(f, "Precision::Nanosecond"),
Precision::Microsecond => write!(f, "Precision::Microsecond"),
Precision::Millisecond => write!(f, "Precision::Millisecond"),
Precision::Second => write!(f, "Precision::Second"),
Precision::Minute => write!(f, "Precision::Minute"),
Precision::Hour => write!(f, "Precision::Hour"),
}
}
}
impl TryFrom<Precision> for TimeUnit {
type Error = Error;
fn try_from(precision: Precision) -> std::result::Result<Self, Self::Error> {
Ok(match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
})
}
}
#[cfg(test)]
mod tests {
use api::v1::{ColumnDataType, SemanticType};
use common_base::BitVec;
use super::LinesWriter;
use crate::writer::{to_ms_ts, Precision};
#[test]
fn test_lines_writer() {
let mut writer = LinesWriter::with_lines(3);
writer.write_tag("host", "host1").unwrap();
writer.write_f64("cpu", 0.5).unwrap();
writer.write_f64("memory", 0.4).unwrap();
writer.write_string("name", "name1").unwrap();
writer
.write_ts("ts", (101011000, Precision::Millisecond))
.unwrap();
writer.commit();
writer.write_tag("host", "host2").unwrap();
writer
.write_ts("ts", (102011001, Precision::Millisecond))
.unwrap();
writer.write_bool("enable_reboot", true).unwrap();
writer.write_u64("year_of_service", 2).unwrap();
writer.write_i64("temperature", 4).unwrap();
writer.commit();
writer.write_tag("host", "host3").unwrap();
writer.write_f64("cpu", 0.4).unwrap();
writer.write_u64("cpu_core_num", 16).unwrap();
writer
.write_ts("ts", (103011002, Precision::Millisecond))
.unwrap();
writer.commit();
let insert_batch = writer.finish();
assert_eq!(3, insert_batch.1);
let columns = insert_batch.0;
assert_eq!(9, columns.len());
let column = &columns[0];
assert_eq!("host", columns[0].column_name);
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
assert_eq!(
vec!["host1", "host2", "host3"],
column.values.as_ref().unwrap().string_values
);
verify_null_mask(&column.null_mask, vec![false, false, false]);
let column = &columns[1];
assert_eq!("cpu", column.column_name);
assert_eq!(ColumnDataType::Float64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![0.5, 0.4], column.values.as_ref().unwrap().f64_values);
verify_null_mask(&column.null_mask, vec![false, true, false]);
let column = &columns[2];
assert_eq!("memory", column.column_name);
assert_eq!(ColumnDataType::Float64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![0.4], column.values.as_ref().unwrap().f64_values);
verify_null_mask(&column.null_mask, vec![false, true, true]);
let column = &columns[3];
assert_eq!("name", column.column_name);
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec!["name1"], column.values.as_ref().unwrap().string_values);
verify_null_mask(&column.null_mask, vec![false, true, true]);
let column = &columns[4];
assert_eq!("ts", column.column_name);
assert_eq!(ColumnDataType::TimestampMillisecond as i32, column.datatype);
assert_eq!(SemanticType::Timestamp as i32, column.semantic_type);
assert_eq!(
vec![101011000, 102011001, 103011002],
column.values.as_ref().unwrap().timestamp_millisecond_values
);
verify_null_mask(&column.null_mask, vec![false, false, false]);
let column = &columns[5];
assert_eq!("enable_reboot", column.column_name);
assert_eq!(ColumnDataType::Boolean as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![true], column.values.as_ref().unwrap().bool_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[6];
assert_eq!("year_of_service", column.column_name);
assert_eq!(ColumnDataType::Uint64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![2], column.values.as_ref().unwrap().u64_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[7];
assert_eq!("temperature", column.column_name);
assert_eq!(ColumnDataType::Int64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![4], column.values.as_ref().unwrap().i64_values);
verify_null_mask(&column.null_mask, vec![true, false, true]);
let column = &columns[8];
assert_eq!("cpu_core_num", column.column_name);
assert_eq!(ColumnDataType::Uint64 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
assert_eq!(vec![16], column.values.as_ref().unwrap().u64_values);
verify_null_mask(&column.null_mask, vec![true, true, false]);
}
fn verify_null_mask(data: &[u8], expected: Vec<bool>) {
let bitvec = BitVec::from_slice(data);
for (idx, b) in expected.iter().enumerate() {
assert_eq!(b, bitvec.get(idx).unwrap())
}
}
#[test]
fn test_to_ms() {
assert_eq!(100, to_ms_ts(Precision::Nanosecond, 100110000));
assert_eq!(100110, to_ms_ts(Precision::Microsecond, 100110000));
assert_eq!(100110000, to_ms_ts(Precision::Millisecond, 100110000));
assert_eq!(
100110000 * 1000 * 60,
to_ms_ts(Precision::Minute, 100110000)
);
assert_eq!(
100110000 * 1000 * 60 * 60,
to_ms_ts(Precision::Hour, 100110000)
);
}
}

View File

@@ -161,8 +161,8 @@ pub enum Error {
error: influxdb_line_protocol::Error,
},
#[snafu(display("Failed to write InfluxDB line protocol"))]
InfluxdbLinesWrite {
#[snafu(display("Failed to write row"))]
RowWriter {
location: Location,
source: common_grpc::error::Error,
},
@@ -462,6 +462,9 @@ pub enum Error {
#[snafu(source)]
error: notify::Error,
},
#[snafu(display("Timestamp overflow: {}", error))]
TimestampOverflow { error: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -524,9 +527,10 @@ impl ErrorExt for Error {
| IncompatibleSchema { .. }
| MissingQueryContext { .. }
| MysqlValueConversion { .. }
| UnexpectedPhysicalTable { .. } => StatusCode::InvalidArguments,
| UnexpectedPhysicalTable { .. }
| TimestampOverflow { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. }
RowWriter { source, .. }
| PromSeriesWrite { source, .. }
| OtlpMetricsWrite { source, .. } => source.status_code(),
@@ -659,7 +663,7 @@ impl IntoResponse for Error {
let error_msg = self.output_msg();
let status = match self {
Error::InfluxdbLineProtocol { .. }
| Error::InfluxdbLinesWrite { .. }
| Error::RowWriter { .. }
| Error::PromSeriesWrite { .. }
| Error::InvalidOpentsdbLine { .. }
| Error::InvalidOpentsdbJsonRequest { .. }

View File

@@ -19,7 +19,7 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::Extension;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use common_grpc::precision::Precision;
use common_telemetry::tracing;
use session::context::QueryContextRef;
@@ -123,7 +123,7 @@ fn parse_time_precision(value: &str) -> Result<Precision> {
#[cfg(test)]
mod tests {
use common_grpc::writer::Precision;
use common_grpc::precision::Precision;
use crate::http::influxdb::parse_time_precision;

View File

@@ -14,7 +14,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::writer::Precision;
use common_grpc::precision::Precision;
use hyper::Request;
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
@@ -52,7 +52,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
.context(InfluxdbLineProtocolSnafu)?;
let mut multi_table_data = MultiTableData::new();
let precision = unwrap_or_default_precision(value.precision);
for line in &lines {
let table_name = line.series.measurement.as_str();
let tags = &line.series.tag_set;
@@ -87,8 +87,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
row_writer::write_fields(table_data, fields, &mut one_row)?;
// timestamp
let precision = unwrap_or_default_precision(value.precision);
row_writer::write_ts_precision(
row_writer::write_ts_to_nanos(
table_data,
INFLUXDB_TIMESTAMP_COLUMN_NAME,
ts,
@@ -195,7 +194,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
}
"ts" => {
assert_eq!(
ColumnDataType::TimestampMillisecond as i32,
ColumnDataType::TimestampNanosecond as i32,
column_schema.datatype
);
assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
@@ -204,12 +203,12 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(
1663840496100023100 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
1663840496100023100,
extract_ts_nanos_value(v.as_ref().unwrap())
),
1 => assert_eq!(
1663840496400340001 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
1663840496400340001,
extract_ts_nanos_value(v.as_ref().unwrap())
),
_ => panic!(),
}
@@ -270,7 +269,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
}
"ts" => {
assert_eq!(
ColumnDataType::TimestampMillisecond as i32,
ColumnDataType::TimestampNanosecond as i32,
column_schema.datatype
);
assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
@@ -279,12 +278,12 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let v = row.values[i].value_data.as_ref();
match j {
0 => assert_eq!(
1663840496100023102 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
1663840496100023102,
extract_ts_nanos_value(v.as_ref().unwrap())
),
1 => assert_eq!(
1663840496400340003 / 1_000_000,
extract_ts_millis_value(v.as_ref().unwrap())
1663840496400340003,
extract_ts_nanos_value(v.as_ref().unwrap())
),
_ => panic!(),
}
@@ -298,21 +297,21 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
fn extract_string_value(value: &ValueData) -> &str {
match value {
ValueData::StringValue(v) => v,
_ => panic!(),
_ => unreachable!(),
}
}
fn extract_f64_value(value: &ValueData) -> f64 {
match value {
ValueData::F64Value(v) => *v,
_ => panic!(),
_ => unreachable!(),
}
}
fn extract_ts_millis_value(value: &ValueData) -> i64 {
fn extract_ts_nanos_value(value: &ValueData) -> i64 {
match value {
ValueData::TimestampMillisecondValue(v) => *v,
_ => panic!(),
ValueData::TimestampNanosecondValue(v) => *v,
_ => unreachable!(),
}
}
}

View File

@@ -29,7 +29,6 @@ pub mod heartbeat_options;
pub mod http;
pub mod influxdb;
pub mod interceptor;
pub mod line_writer;
mod metrics;
pub mod metrics_handler;
pub mod mysql;

View File

@@ -1,224 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_grpc::writer::{to_ms_ts, Precision};
use common_time::timestamp::TimeUnit::Millisecond;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::{TimestampMillisecondType, TimestampType};
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, VectorRef};
use table::requests::InsertRequest;
type ColumnLen = usize;
type ColumnName = String;
pub struct LineWriter {
db: String,
table_name: String,
expected_rows: usize,
current_rows: usize,
columns_builders: HashMap<ColumnName, (Box<dyn MutableVector>, ColumnLen)>,
}
impl LineWriter {
pub fn with_lines(db: impl Into<String>, table_name: impl Into<String>, lines: usize) -> Self {
Self {
db: db.into(),
table_name: table_name.into(),
expected_rows: lines,
current_rows: 0,
columns_builders: Default::default(),
}
}
pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) {
let (val, precision) = value;
let datatype =
ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType));
let ts_val = Value::Timestamp(Timestamp::new(to_ms_ts(precision, val), Millisecond));
self.write(column_name, datatype, ts_val);
}
pub fn write_tag(&mut self, column_name: &str, value: &str) {
self.write(
column_name,
ConcreteDataType::string_datatype(),
Value::String(value.into()),
);
}
pub fn write_u64(&mut self, column_name: &str, value: u64) {
self.write(
column_name,
ConcreteDataType::uint64_datatype(),
Value::UInt64(value),
);
}
pub fn write_i64(&mut self, column_name: &str, value: i64) {
self.write(
column_name,
ConcreteDataType::int64_datatype(),
Value::Int64(value),
);
}
pub fn write_f64(&mut self, column_name: &str, value: f64) {
self.write(
column_name,
ConcreteDataType::float64_datatype(),
Value::Float64(value.into()),
);
}
pub fn write_string(&mut self, column_name: &str, value: &str) {
self.write(
column_name,
ConcreteDataType::string_datatype(),
Value::String(value.into()),
);
}
pub fn write_bool(&mut self, column_name: &str, value: bool) {
self.write(
column_name,
ConcreteDataType::boolean_datatype(),
Value::Boolean(value),
);
}
fn write(&mut self, column_name: &str, datatype: ConcreteDataType, value: Value) {
let or_insert = || {
let rows = self.current_rows;
let mut builder = datatype.create_mutable_vector(self.expected_rows);
(0..rows).for_each(|_| builder.push_null());
(builder, rows)
};
let (builder, column_len) = self
.columns_builders
.entry(column_name.to_string())
.or_insert_with(or_insert);
builder.push_value_ref(value.as_value_ref());
*column_len += 1;
}
pub fn commit(&mut self) {
self.current_rows += 1;
self.columns_builders
.values_mut()
.for_each(|(builder, len)| {
if self.current_rows > *len {
builder.push_null()
}
});
}
pub fn finish(self) -> InsertRequest {
let columns_values: HashMap<ColumnName, VectorRef> = self
.columns_builders
.into_iter()
.map(|(column_name, (mut builder, _))| (column_name, builder.to_vector()))
.collect();
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: self.db,
table_name: self.table_name,
columns_values,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_time::Timestamp;
use datatypes::value::Value;
use datatypes::vectors::Vector;
use crate::line_writer::{LineWriter, Precision};
#[test]
fn test_writer() {
let mut writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, "demo".to_string(), 4);
writer.write_ts("ts", (1665893727685, Precision::Millisecond));
writer.write_tag("host", "host-1");
writer.write_i64("memory", 10_i64);
writer.commit();
writer.write_ts("ts", (1665893727686, Precision::Millisecond));
writer.write_tag("host", "host-2");
writer.write_tag("region", "region-2");
writer.write_i64("memory", 9_i64);
writer.commit();
writer.write_ts("ts", (1665893727689, Precision::Millisecond));
writer.write_tag("host", "host-3");
writer.write_tag("region", "region-3");
writer.write_i64("cpu", 19_i64);
writer.commit();
let insert_request = writer.finish();
assert_eq!("demo", insert_request.table_name);
assert_eq!(DEFAULT_SCHEMA_NAME, insert_request.schema_name);
let columns = insert_request.columns_values;
assert_eq!(5, columns.len());
assert!(columns.contains_key("ts"));
assert!(columns.contains_key("host"));
assert!(columns.contains_key("memory"));
assert!(columns.contains_key("region"));
assert!(columns.contains_key("cpu"));
let ts = columns.get("ts").unwrap();
let host = columns.get("host").unwrap();
let memory = columns.get("memory").unwrap();
let region = columns.get("region").unwrap();
let cpu = columns.get("cpu").unwrap();
let expected: Vec<Value> = vec![
Value::Timestamp(Timestamp::new_millisecond(1665893727685_i64)),
Value::Timestamp(Timestamp::new_millisecond(1665893727686_i64)),
Value::Timestamp(Timestamp::new_millisecond(1665893727689_i64)),
];
assert_vector(&expected, ts);
let expected: Vec<Value> = vec!["host-1".into(), "host-2".into(), "host-3".into()];
assert_vector(&expected, host);
let expected: Vec<Value> = vec![10_i64.into(), 9_i64.into(), Value::Null];
assert_vector(&expected, memory);
let expected: Vec<Value> = vec![Value::Null, "region-2".into(), "region-3".into()];
assert_vector(&expected, region);
let expected: Vec<Value> = vec![Value::Null, Value::Null, 19_i64.into()];
assert_vector(&expected, cpu);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
}

View File

@@ -23,6 +23,7 @@ use std::sync::Arc;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_runtime::Runtime;
use common_telemetry::logging::{debug, error, warn};
@@ -156,10 +157,11 @@ pub fn data_point_to_grpc_row_insert_requests(
// value
row_writer::write_f64(table_data, GREPTIME_VALUE, value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
row_writer::write_ts_to_millis(
table_data,
GREPTIME_TIMESTAMP,
Some(timestamp),
Precision::Millisecond,
&mut one_row,
)?;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::{RowInsertRequests, Value};
use common_grpc::writer::Precision;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
@@ -117,7 +117,7 @@ fn write_attributes(
}
fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
row_writer::write_ts_precision(
row_writer::write_ts_to_nanos(
table,
GREPTIME_TIMESTAMP,
Some(time_nano),

View File

@@ -14,7 +14,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::writer::Precision;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
@@ -132,7 +132,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
(span.end_in_nanosecond - span.start_in_nanosecond) as f64 / 1_000_000.0, // duration in millisecond
&mut row,
)?;
row_writer::write_ts_precision(
row_writer::write_ts_to_nanos(
writer,
GREPTIME_TIMESTAMP,
Some(span.start_in_nanosecond as i64),

View File

@@ -21,6 +21,7 @@ use std::hash::{Hash, Hasher};
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::RowInsertRequests;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::tracing;
@@ -351,10 +352,11 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR
&mut one_row,
)?;
// timestamp
row_writer::write_ts_millis(
row_writer::write_ts_to_millis(
table_data,
GREPTIME_TIMESTAMP,
Some(series.samples[0].timestamp),
Precision::Millisecond,
&mut one_row,
)?;
@@ -369,10 +371,11 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR
// value
row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
row_writer::write_ts_to_millis(
table_data,
GREPTIME_TIMESTAMP,
Some(*timestamp),
Precision::Millisecond,
&mut one_row,
)?;

View File

@@ -19,14 +19,19 @@ use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value,
};
use common_grpc::writer;
use common_grpc::writer::Precision;
use common_grpc::precision::Precision;
use common_time::timestamp::TimeUnit;
use common_time::timestamp::TimeUnit::Nanosecond;
use common_time::Timestamp;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{IncompatibleSchemaSnafu, InfluxdbLinesWriteSnafu, Result, TimePrecisionSnafu};
use crate::error::{
IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
};
/// The intermediate data structure for building the write request.
/// It constructs the `schema` and `rows` as all input data row
/// parsing is completed.
pub struct TableData {
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
@@ -138,15 +143,17 @@ impl MultiTableData {
}
}
/// Write data as tags into the table data.
pub fn write_tags(
table_data: &mut TableData,
kvs: impl Iterator<Item = (String, String)>,
tags: impl Iterator<Item = (String, String)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
let ktv_iter = kvs.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v)));
let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v)));
write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
}
/// Write data as fields into the table data.
pub fn write_fields(
table_data: &mut TableData,
fields: impl Iterator<Item = (String, ColumnDataType, ValueData)>,
@@ -155,6 +162,7 @@ pub fn write_fields(
write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
}
/// Write data as a tag into the table data.
pub fn write_tag(
table_data: &mut TableData,
name: impl ToString,
@@ -173,6 +181,7 @@ pub fn write_tag(
)
}
/// Write float64 data as a field into the table data.
pub fn write_f64(
table_data: &mut TableData,
name: impl ToString,
@@ -225,21 +234,54 @@ fn write_by_semantic_type(
Ok(())
}
pub fn write_ts_millis(
table_data: &mut TableData,
name: impl ToString,
ts: Option<i64>,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_ts_precision(table_data, name, ts, Precision::Millisecond, one_row)
}
pub fn write_ts_precision(
/// Write timestamp data as milliseconds into the table data.
pub fn write_ts_to_millis(
table_data: &mut TableData,
name: impl ToString,
ts: Option<i64>,
precision: Precision,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_ts_to(
table_data,
name,
ts,
precision,
TimestampType::Millis,
one_row,
)
}
/// Write timestamp data as nanoseconds into the table data.
pub fn write_ts_to_nanos(
table_data: &mut TableData,
name: impl ToString,
ts: Option<i64>,
precision: Precision,
one_row: &mut Vec<Value>,
) -> Result<()> {
write_ts_to(
table_data,
name,
ts,
precision,
TimestampType::Nanos,
one_row,
)
}
enum TimestampType {
Millis,
Nanos,
}
fn write_ts_to(
table_data: &mut TableData,
name: impl ToString,
ts: Option<i64>,
precision: Precision,
ts_type: TimestampType,
one_row: &mut Vec<Value>,
) -> Result<()> {
let TableData {
schema,
@@ -249,43 +291,67 @@ pub fn write_ts_precision(
let name = name.to_string();
let ts = match ts {
Some(timestamp) => writer::to_ms_ts(precision, timestamp),
Some(timestamp) => match ts_type {
TimestampType::Millis => precision.to_millis(timestamp),
TimestampType::Nanos => precision.to_nanos(timestamp),
}
.with_context(|| TimestampOverflowSnafu {
error: format!(
"timestamp {} overflow with precision {}",
timestamp, precision
),
})?,
None => {
let timestamp = Timestamp::current_millis();
let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?;
let timestamp = Timestamp::current_time(Nanosecond);
let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
name: precision.to_string(),
})?;
writer::to_ms_ts(precision, timestamp.into())
})?
.into();
match ts_type {
TimestampType::Millis => precision.to_millis(timestamp),
TimestampType::Nanos => precision.to_nanos(timestamp),
}
.with_context(|| TimestampOverflowSnafu {
error: format!(
"timestamp {} overflow with precision {}",
timestamp, precision
),
})?
}
};
let (datatype, ts) = match ts_type {
TimestampType::Millis => (
ColumnDataType::TimestampMillisecond,
ValueData::TimestampMillisecondValue(ts),
),
TimestampType::Nanos => (
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(ts),
),
};
let index = column_indexes.get(&name);
if let Some(index) = index {
check_schema(
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
&schema[*index],
)?;
one_row[*index].value_data = Some(ValueData::TimestampMillisecondValue(ts));
check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
one_row[*index].value_data = Some(ts);
} else {
let index = schema.len();
schema.push(ColumnSchema {
column_name: name.clone(),
datatype: ColumnDataType::TimestampMillisecond as i32,
datatype: datatype as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
});
column_indexes.insert(name, index);
one_row.push(ValueData::TimestampMillisecondValue(ts).into())
one_row.push(ts.into())
}
Ok(())
}
#[inline]
fn check_schema(
datatype: ColumnDataType,
semantic_type: SemanticType,

View File

@@ -116,12 +116,12 @@ monitor1,host=host2 memory=1027 1663840496400340001";
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+-------------------------+-------+------+--------+
| ts | host | cpu | memory |
+-------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024.0 |
| 2022-09-22T09:54:56.400 | host2 | | 1027.0 |
+-------------------------+-------+------+--------+"
+-------------------------------+-------+------+--------+
| ts | host | cpu | memory |
+-------------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
| 2022-09-22T09:54:56.400340001 | host2 | | 1027.0 |
+-------------------------------+-------+------+--------+"
);
}
}

View File

@@ -81,12 +81,12 @@ mod test {
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+---------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+---------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 105.0 |
| greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00 | 100.0 |
+------------+-------+--------------------+------------+---------------------+----------------+",
+------------+-------+--------------------+------------+-------------------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+-------------------------------+----------------+
| greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00.000000100 | 100.0 |
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000105 | 105.0 |
+------------+-------+--------------------+------------+-------------------------------+----------------+",
);
let mut output = instance
@@ -123,11 +123,11 @@ mod test {
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+---------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+---------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 51.0 |
+------------+-------+--------------------+------------+---------------------+----------------+",
+------------+-------+--------------------+------------+-------------------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+-------------------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 51.0 |
+------------+-------+--------------------+------------+-------------------------------+----------------+",
);
let mut output = instance
@@ -141,11 +141,11 @@ mod test {
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+---------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+---------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 4.0 |
+------------+-------+--------------------+------------+---------------------+----------------+",
+------------+-------+--------------------+------------+-------------------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+-------------------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 4.0 |
+------------+-------+--------------------+------------+-------------------------------+----------------+"
);
}