From d0892bf0b74131e9cef762d9759efb7fd95b85b8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 8 Dec 2022 20:27:53 +0800 Subject: [PATCH] fix: Fix compile error in server subcrate (#727) * fix: Fix compile error in server subcrate Signed-off-by: Ruihang Xia * remove unused type alias Signed-off-by: Ruihang Xia * explicitly panic Signed-off-by: Ruihang Xia * Update src/storage/src/sst/parquet.rs Co-authored-by: Yingwen Signed-off-by: Ruihang Xia Co-authored-by: Yingwen --- src/common/grpc-expr/src/insert.rs | 6 ++--- src/frontend/src/instance.rs | 2 +- src/servers/src/http.rs | 2 +- src/servers/src/http/influxdb.rs | 24 ++++++++--------- src/servers/src/influxdb.rs | 6 ++--- src/servers/src/line_writer.rs | 43 +++++++++++++++++++----------- src/servers/src/opentsdb/codec.rs | 8 +++--- src/servers/src/prometheus.rs | 18 ++++++------- src/storage/src/sst/parquet.rs | 2 +- tests-integration/tests/grpc.rs | 2 +- tests/runner/src/util.rs | 2 +- 11 files changed, 64 insertions(+), 51 deletions(-) diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 3f67b192e6..515ffcbe7e 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -175,7 +175,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve )) } ColumnDataType::Timestamp => { - collect_values!(values.ts_millis_values, |v| ValueRef::Timestamp( + collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp( Timestamp::from_millis(*v) )) } @@ -418,7 +418,7 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { .map(|v| Value::Date(v.into())) .collect(), ConcreteDataType::Timestamp(_) => values - .ts_millis_values + .ts_millisecond_values .into_iter() .map(|v| Value::Timestamp(Timestamp::from_millis(v))) .collect(), @@ -745,7 +745,7 @@ mod tests { }; let ts_vals = column::Values { - ts_millis_values: vec![100, 101], + ts_millisecond_values: vec![100, 101], ..Default::default() }; let ts_column = Column { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index b1c04389a7..7cc22c52f7 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -943,7 +943,7 @@ mod tests { let expected_ts_col = Column { column_name: "ts".to_string(), values: Some(column::Values { - ts_millis_values: vec![1000, 2000, 3000, 4000], + ts_millisecond_values: vec![1000, 2000, 3000, 4000], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index c12403bff2..8ef18b6887 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -273,7 +273,7 @@ impl JsonResponse { } } -async fn serve_api(Extension(api): Extension>) -> impl IntoApiResponse { +async fn serve_api(Extension(api): Extension) -> impl IntoApiResponse { Json(api) } diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index 06929eb341..b68cb3616f 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -48,12 +48,12 @@ pub async fn influxdb_write( fn parse_time_precision(value: &str) -> Result { match value { - "n" => Ok(Precision::NANOSECOND), - "u" => Ok(Precision::MICROSECOND), - "ms" => Ok(Precision::MILLISECOND), - "s" => Ok(Precision::SECOND), - "m" => Ok(Precision::MINUTE), - "h" => Ok(Precision::HOUR), + "n" => Ok(Precision::Nanosecond), + "u" => Ok(Precision::Microsecond), + "ms" => Ok(Precision::Millisecond), + "s" => Ok(Precision::Second), + "m" => Ok(Precision::Minute), + "h" => Ok(Precision::Hour), unknown => TimePrecisionSnafu { name: unknown.to_string(), } @@ -69,12 +69,12 @@ mod tests { #[test] fn test_parse_time_precision() { - assert_eq!(Precision::NANOSECOND, parse_time_precision("n").unwrap()); - assert_eq!(Precision::MICROSECOND, parse_time_precision("u").unwrap()); - assert_eq!(Precision::MILLISECOND, parse_time_precision("ms").unwrap()); - assert_eq!(Precision::SECOND, parse_time_precision("s").unwrap()); - assert_eq!(Precision::MINUTE, parse_time_precision("m").unwrap()); - assert_eq!(Precision::HOUR, parse_time_precision("h").unwrap()); + assert_eq!(Precision::Nanosecond, parse_time_precision("n").unwrap()); + assert_eq!(Precision::Microsecond, parse_time_precision("u").unwrap()); + assert_eq!(Precision::Millisecond, parse_time_precision("ms").unwrap()); + assert_eq!(Precision::Second, parse_time_precision("s").unwrap()); + assert_eq!(Precision::Minute, parse_time_precision("m").unwrap()); + assert_eq!(Precision::Hour, parse_time_precision("h").unwrap()); assert!(parse_time_precision("unknown").is_err()); } } diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 0766d65843..58533d6dbc 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -24,7 +24,7 @@ use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu}; use crate::line_writer::LineWriter; pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; -pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND; +pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond; pub struct InfluxdbRequest { pub precision: Option, @@ -363,7 +363,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; SemanticType::Timestamp, Vec::new(), Values { - ts_millis_values: vec![1663840496100, 1663840496400], + ts_millisecond_values: vec![1663840496100, 1663840496400], ..Default::default() }, ); @@ -402,7 +402,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; SemanticType::Timestamp, Vec::new(), Values { - ts_millis_values: vec![1663840496100, 1663840496400], + ts_millisecond_values: vec![1663840496100, 1663840496400], ..Default::default() }, ); diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index cbb2aff987..f4fc7bf8f3 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -18,12 +18,16 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_grpc::writer::{to_ms_ts, Precision}; use common_time::timestamp::TimeUnit::Millisecond; use common_time::Timestamp; +use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; -use datatypes::types::TimestampType; -use datatypes::value::Value; -use datatypes::vectors::{VectorBuilder, VectorRef}; +use datatypes::types::{TimestampMillisecondType, TimestampType}; +use datatypes::value::{Value, ValueRef}; +use datatypes::vectors::{MutableVector, VectorRef}; +use snafu::ResultExt; use table::requests::InsertRequest; +use crate::error::VectorConversionSnafu; + type ColumnLen = usize; type ColumnName = String; @@ -32,7 +36,7 @@ pub struct LineWriter { table_name: String, expected_rows: usize, current_rows: usize, - columns_builders: HashMap, + columns_builders: HashMap, ColumnLen)>, } impl LineWriter { @@ -48,7 +52,8 @@ impl LineWriter { pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) { let (val, precision) = value; - let datatype = ConcreteDataType::Timestamp(TimestampType { unit: Millisecond }); + let datatype = + ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)); let ts_val = Value::Timestamp(Timestamp::new(to_ms_ts(precision, val), Millisecond)); self.write(column_name, datatype, ts_val); } @@ -104,8 +109,12 @@ impl LineWriter { fn write(&mut self, column_name: &str, datatype: ConcreteDataType, value: Value) { let or_insert = || { let rows = self.current_rows; - let mut builder = VectorBuilder::with_capacity(datatype, self.expected_rows); - (0..rows).into_iter().for_each(|_| builder.push_null()); + let mut builder = datatype.create_mutable_vector(self.expected_rows); + (0..rows) + .into_iter() + .try_for_each(|_| builder.push_value_ref(ValueRef::Null)) + .context(VectorConversionSnafu) + .unwrap(); (builder, rows) }; let (builder, column_len) = self @@ -113,7 +122,7 @@ impl LineWriter { .entry(column_name.to_string()) .or_insert_with(or_insert); - builder.push(&value); + builder.push_value_ref(value.as_value_ref()).unwrap(); *column_len += 1; } @@ -122,18 +131,22 @@ impl LineWriter { self.columns_builders .values_mut() .into_iter() - .for_each(|(builder, len)| { + .try_for_each(|(builder, len)| { if self.current_rows > *len { - builder.push(&Value::Null) + builder.push_value_ref(ValueRef::Null) + } else { + Ok(()) } - }); + }) + .context(VectorConversionSnafu) + .unwrap(); } pub fn finish(self) -> InsertRequest { let columns_values: HashMap = self .columns_builders .into_iter() - .map(|(column_name, (mut builder, _))| (column_name, builder.finish())) + .map(|(column_name, (mut builder, _))| (column_name, builder.to_vector())) .collect(); InsertRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), @@ -158,18 +171,18 @@ mod tests { #[test] fn test_writer() { let mut writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, "demo".to_string(), 4); - writer.write_ts("ts", (1665893727685, Precision::MILLISECOND)); + writer.write_ts("ts", (1665893727685, Precision::Millisecond)); writer.write_tag("host", "host-1"); writer.write_i64("memory", 10_i64); writer.commit(); - writer.write_ts("ts", (1665893727686, Precision::MILLISECOND)); + writer.write_ts("ts", (1665893727686, Precision::Millisecond)); writer.write_tag("host", "host-2"); writer.write_tag("region", "region-2"); writer.write_i64("memory", 9_i64); writer.commit(); - writer.write_ts("ts", (1665893727689, Precision::MILLISECOND)); + writer.write_ts("ts", (1665893727689, Precision::Millisecond)); writer.write_tag("host", "host-3"); writer.write_tag("region", "region-3"); writer.write_i64("cpu", 19_i64); diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 260a206fe5..49fccc4848 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -132,7 +132,7 @@ impl DataPoint { let mut line_writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, self.metric.clone(), 1); line_writer.write_ts( OPENTSDB_TIMESTAMP_COLUMN_NAME, - (self.ts_millis(), Precision::MILLISECOND), + (self.ts_millis(), Precision::Millisecond), ); line_writer.write_f64(OPENTSDB_VALUE_COLUMN_NAME, self.value); @@ -152,11 +152,11 @@ impl DataPoint { let ts_column = Column { column_name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), values: Some(column::Values { - ts_millis_values: vec![self.ts_millis], + ts_millisecond_values: vec![self.ts_millis], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, ..Default::default() }; columns.push(ts_column); @@ -336,7 +336,7 @@ mod test { assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, + columns[0].values.as_ref().unwrap().ts_millisecond_values, vec![1000] ); diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 1c2b035ec0..80d9db0b74 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -22,7 +22,7 @@ use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; use api::v1::codec::SelectResult; use api::v1::column::SemanticType; use api::v1::{column, Column, ColumnDataType, InsertExpr}; -use common_grpc::writer::Precision::MILLISECOND; +use common_grpc::writer::Precision::Millisecond; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; @@ -279,7 +279,7 @@ pub fn select_result_to_timeseries( timestamp: ts_column .values .as_ref() - .map(|vs| vs.ts_millis_values[ts_row]) + .map(|vs| vs.ts_millisecond_values[ts_row]) .unwrap_or(0i64), }; @@ -325,7 +325,7 @@ fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result< let ts_millis = sample.timestamp; let val = sample.value; - line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, MILLISECOND)); + line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, Millisecond)); line_writer.write_f64(VALUE_COLUMN_NAME, val); labels @@ -368,11 +368,11 @@ fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Resu let ts_column = Column { column_name: TIMESTAMP_COLUMN_NAME.to_string(), values: Some(column::Values { - ts_millis_values: samples.iter().map(|x| x.timestamp).collect(), + ts_millisecond_values: samples.iter().map(|x| x.timestamp).collect(), ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, ..Default::default() }; columns.push(ts_column); @@ -686,7 +686,7 @@ mod tests { assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, + columns[0].values.as_ref().unwrap().ts_millisecond_values, vec![1000, 2000] ); @@ -712,7 +712,7 @@ mod tests { assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, + columns[0].values.as_ref().unwrap().ts_millisecond_values, vec![1000, 2000] ); @@ -743,7 +743,7 @@ mod tests { assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, + columns[0].values.as_ref().unwrap().ts_millisecond_values, vec![1000, 2000, 3000] ); @@ -773,7 +773,7 @@ mod tests { Column { column_name: TIMESTAMP_COLUMN_NAME.to_string(), values: Some(column::Values { - ts_millis_values: vec![1000, 2000], + ts_millisecond_values: vec![1000, 2000], ..Default::default() }), ..Default::default() diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 1244582b69..f9b4e233e6 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -184,7 +184,7 @@ fn transverse_recursive T + Clone>( Struct => { if let DataType::Struct(fields) = data_type.to_logical_type() { for field in fields { - transverse_recursive(&field.data_type, map.clone(), encodings) + transverse_recursive(field.data_type(), map.clone(), encodings) } } else { unreachable!() diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index cf6ba4b922..62b3ca8795 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -110,7 +110,7 @@ fn expect_data() -> (Column, Column, Column, Column) { let expected_ts_col = Column { column_name: "ts".to_string(), values: Some(column::Values { - ts_millis_values: vec![100, 101, 102, 103], + ts_millisecond_values: vec![100, 101, 102, 103], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index a6accc9ed7..85acbfd671 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -99,7 +99,7 @@ pub fn values_to_string(data_type: ColumnDataType, values: Values) -> Vec values - .ts_millis_values + .ts_millisecond_values .into_iter() .map(|v| v.to_string()) .collect(),