fix: Fix compile error in server subcrate (#727)

* fix: Fix compile error in server subcrate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused type alias

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* explicitly panic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/storage/src/sst/parquet.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2022-12-08 20:27:53 +08:00
committed by GitHub
parent fff530cb50
commit d0892bf0b7
11 changed files with 64 additions and 51 deletions

View File

@@ -175,7 +175,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
))
}
ColumnDataType::Timestamp => {
collect_values!(values.ts_millis_values, |v| ValueRef::Timestamp(
collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp(
Timestamp::from_millis(*v)
))
}
@@ -418,7 +418,7 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
.map(|v| Value::Date(v.into()))
.collect(),
ConcreteDataType::Timestamp(_) => values
.ts_millis_values
.ts_millisecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::from_millis(v)))
.collect(),
@@ -745,7 +745,7 @@ mod tests {
};
let ts_vals = column::Values {
ts_millis_values: vec![100, 101],
ts_millisecond_values: vec![100, 101],
..Default::default()
};
let ts_column = Column {

View File

@@ -943,7 +943,7 @@ mod tests {
let expected_ts_col = Column {
column_name: "ts".to_string(),
values: Some(column::Values {
ts_millis_values: vec![1000, 2000, 3000, 4000],
ts_millisecond_values: vec![1000, 2000, 3000, 4000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,

View File

@@ -273,7 +273,7 @@ impl JsonResponse {
}
}
async fn serve_api(Extension(api): Extension<Arc<OpenApi>>) -> impl IntoApiResponse {
async fn serve_api(Extension(api): Extension<OpenApi>) -> impl IntoApiResponse {
Json(api)
}

View File

@@ -48,12 +48,12 @@ pub async fn influxdb_write(
fn parse_time_precision(value: &str) -> Result<Precision> {
match value {
"n" => Ok(Precision::NANOSECOND),
"u" => Ok(Precision::MICROSECOND),
"ms" => Ok(Precision::MILLISECOND),
"s" => Ok(Precision::SECOND),
"m" => Ok(Precision::MINUTE),
"h" => Ok(Precision::HOUR),
"n" => Ok(Precision::Nanosecond),
"u" => Ok(Precision::Microsecond),
"ms" => Ok(Precision::Millisecond),
"s" => Ok(Precision::Second),
"m" => Ok(Precision::Minute),
"h" => Ok(Precision::Hour),
unknown => TimePrecisionSnafu {
name: unknown.to_string(),
}
@@ -69,12 +69,12 @@ mod tests {
#[test]
fn test_parse_time_precision() {
assert_eq!(Precision::NANOSECOND, parse_time_precision("n").unwrap());
assert_eq!(Precision::MICROSECOND, parse_time_precision("u").unwrap());
assert_eq!(Precision::MILLISECOND, parse_time_precision("ms").unwrap());
assert_eq!(Precision::SECOND, parse_time_precision("s").unwrap());
assert_eq!(Precision::MINUTE, parse_time_precision("m").unwrap());
assert_eq!(Precision::HOUR, parse_time_precision("h").unwrap());
assert_eq!(Precision::Nanosecond, parse_time_precision("n").unwrap());
assert_eq!(Precision::Microsecond, parse_time_precision("u").unwrap());
assert_eq!(Precision::Millisecond, parse_time_precision("ms").unwrap());
assert_eq!(Precision::Second, parse_time_precision("s").unwrap());
assert_eq!(Precision::Minute, parse_time_precision("m").unwrap());
assert_eq!(Precision::Hour, parse_time_precision("h").unwrap());
assert!(parse_time_precision("unknown").is_err());
}
}

View File

@@ -24,7 +24,7 @@ use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu};
use crate::line_writer::LineWriter;
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND;
pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
pub struct InfluxdbRequest {
pub precision: Option<Precision>,
@@ -363,7 +363,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
SemanticType::Timestamp,
Vec::new(),
Values {
ts_millis_values: vec![1663840496100, 1663840496400],
ts_millisecond_values: vec![1663840496100, 1663840496400],
..Default::default()
},
);
@@ -402,7 +402,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
SemanticType::Timestamp,
Vec::new(),
Values {
ts_millis_values: vec![1663840496100, 1663840496400],
ts_millisecond_values: vec![1663840496100, 1663840496400],
..Default::default()
},
);

View File

@@ -18,12 +18,16 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_grpc::writer::{to_ms_ts, Precision};
use common_time::timestamp::TimeUnit::Millisecond;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::value::Value;
use datatypes::vectors::{VectorBuilder, VectorRef};
use datatypes::types::{TimestampMillisecondType, TimestampType};
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{MutableVector, VectorRef};
use snafu::ResultExt;
use table::requests::InsertRequest;
use crate::error::VectorConversionSnafu;
type ColumnLen = usize;
type ColumnName = String;
@@ -32,7 +36,7 @@ pub struct LineWriter {
table_name: String,
expected_rows: usize,
current_rows: usize,
columns_builders: HashMap<ColumnName, (VectorBuilder, ColumnLen)>,
columns_builders: HashMap<ColumnName, (Box<dyn MutableVector>, ColumnLen)>,
}
impl LineWriter {
@@ -48,7 +52,8 @@ impl LineWriter {
pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) {
let (val, precision) = value;
let datatype = ConcreteDataType::Timestamp(TimestampType { unit: Millisecond });
let datatype =
ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType));
let ts_val = Value::Timestamp(Timestamp::new(to_ms_ts(precision, val), Millisecond));
self.write(column_name, datatype, ts_val);
}
@@ -104,8 +109,12 @@ impl LineWriter {
fn write(&mut self, column_name: &str, datatype: ConcreteDataType, value: Value) {
let or_insert = || {
let rows = self.current_rows;
let mut builder = VectorBuilder::with_capacity(datatype, self.expected_rows);
(0..rows).into_iter().for_each(|_| builder.push_null());
let mut builder = datatype.create_mutable_vector(self.expected_rows);
(0..rows)
.into_iter()
.try_for_each(|_| builder.push_value_ref(ValueRef::Null))
.context(VectorConversionSnafu)
.unwrap();
(builder, rows)
};
let (builder, column_len) = self
@@ -113,7 +122,7 @@ impl LineWriter {
.entry(column_name.to_string())
.or_insert_with(or_insert);
builder.push(&value);
builder.push_value_ref(value.as_value_ref()).unwrap();
*column_len += 1;
}
@@ -122,18 +131,22 @@ impl LineWriter {
self.columns_builders
.values_mut()
.into_iter()
.for_each(|(builder, len)| {
.try_for_each(|(builder, len)| {
if self.current_rows > *len {
builder.push(&Value::Null)
builder.push_value_ref(ValueRef::Null)
} else {
Ok(())
}
});
})
.context(VectorConversionSnafu)
.unwrap();
}
pub fn finish(self) -> InsertRequest {
let columns_values: HashMap<ColumnName, VectorRef> = self
.columns_builders
.into_iter()
.map(|(column_name, (mut builder, _))| (column_name, builder.finish()))
.map(|(column_name, (mut builder, _))| (column_name, builder.to_vector()))
.collect();
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
@@ -158,18 +171,18 @@ mod tests {
#[test]
fn test_writer() {
let mut writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, "demo".to_string(), 4);
writer.write_ts("ts", (1665893727685, Precision::MILLISECOND));
writer.write_ts("ts", (1665893727685, Precision::Millisecond));
writer.write_tag("host", "host-1");
writer.write_i64("memory", 10_i64);
writer.commit();
writer.write_ts("ts", (1665893727686, Precision::MILLISECOND));
writer.write_ts("ts", (1665893727686, Precision::Millisecond));
writer.write_tag("host", "host-2");
writer.write_tag("region", "region-2");
writer.write_i64("memory", 9_i64);
writer.commit();
writer.write_ts("ts", (1665893727689, Precision::MILLISECOND));
writer.write_ts("ts", (1665893727689, Precision::Millisecond));
writer.write_tag("host", "host-3");
writer.write_tag("region", "region-3");
writer.write_i64("cpu", 19_i64);

View File

@@ -132,7 +132,7 @@ impl DataPoint {
let mut line_writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, self.metric.clone(), 1);
line_writer.write_ts(
OPENTSDB_TIMESTAMP_COLUMN_NAME,
(self.ts_millis(), Precision::MILLISECOND),
(self.ts_millis(), Precision::Millisecond),
);
line_writer.write_f64(OPENTSDB_VALUE_COLUMN_NAME, self.value);
@@ -152,11 +152,11 @@ impl DataPoint {
let ts_column = Column {
column_name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millis_values: vec![self.ts_millis],
ts_millisecond_values: vec![self.ts_millis],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
};
columns.push(ts_column);
@@ -336,7 +336,7 @@ mod test {
assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
columns[0].values.as_ref().unwrap().ts_millisecond_values,
vec![1000]
);

View File

@@ -22,7 +22,7 @@ use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::codec::SelectResult;
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertExpr};
use common_grpc::writer::Precision::MILLISECOND;
use common_grpc::writer::Precision::Millisecond;
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
@@ -279,7 +279,7 @@ pub fn select_result_to_timeseries(
timestamp: ts_column
.values
.as_ref()
.map(|vs| vs.ts_millis_values[ts_row])
.map(|vs| vs.ts_millisecond_values[ts_row])
.unwrap_or(0i64),
};
@@ -325,7 +325,7 @@ fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result<
let ts_millis = sample.timestamp;
let val = sample.value;
line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, MILLISECOND));
line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, Millisecond));
line_writer.write_f64(VALUE_COLUMN_NAME, val);
labels
@@ -368,11 +368,11 @@ fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Resu
let ts_column = Column {
column_name: TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millis_values: samples.iter().map(|x| x.timestamp).collect(),
ts_millisecond_values: samples.iter().map(|x| x.timestamp).collect(),
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
};
columns.push(ts_column);
@@ -686,7 +686,7 @@ mod tests {
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
columns[0].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000]
);
@@ -712,7 +712,7 @@ mod tests {
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
columns[0].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000]
);
@@ -743,7 +743,7 @@ mod tests {
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
columns[0].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000, 3000]
);
@@ -773,7 +773,7 @@ mod tests {
Column {
column_name: TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millis_values: vec![1000, 2000],
ts_millisecond_values: vec![1000, 2000],
..Default::default()
}),
..Default::default()

View File

@@ -184,7 +184,7 @@ fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
Struct => {
if let DataType::Struct(fields) = data_type.to_logical_type() {
for field in fields {
transverse_recursive(&field.data_type, map.clone(), encodings)
transverse_recursive(field.data_type(), map.clone(), encodings)
}
} else {
unreachable!()

View File

@@ -110,7 +110,7 @@ fn expect_data() -> (Column, Column, Column, Column) {
let expected_ts_col = Column {
column_name: "ts".to_string(),
values: Some(column::Values {
ts_millis_values: vec![100, 101, 102, 103],
ts_millisecond_values: vec![100, 101, 102, 103],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,

View File

@@ -99,7 +99,7 @@ pub fn values_to_string(data_type: ColumnDataType, values: Values) -> Vec<String
.map(|v| v.to_string())
.collect(),
ColumnDataType::Timestamp => values
.ts_millis_values
.ts_millisecond_values
.into_iter()
.map(|v| v.to_string())
.collect(),