Compare commits

..

4 Commits

Author SHA1 Message Date
luofucong
517e9362ad build: release v0.3.0 2023-06-05 10:38:51 +08:00
Yingwen
466f258266 feat(servers): collect samples by metric (#1706) 2023-06-03 17:17:52 +08:00
Ruihang Xia
94228285a7 feat: convert values to vector directly (#1704)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-06-03 12:41:13 +08:00
JeremyHi
3d7185749d feat: insert with stream (#1703)
* feat: insert with stream

* chore: by CR
2023-06-03 03:58:00 +00:00
10 changed files with 228 additions and 149 deletions

1
Cargo.lock generated
View File

@@ -1537,6 +1537,7 @@ dependencies = [
"substrait 0.2.0", "substrait 0.2.0",
"substrait 0.7.5", "substrait 0.7.5",
"tokio", "tokio",
"tokio-stream",
"tonic 0.9.2", "tonic 0.9.2",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View File

@@ -30,12 +30,13 @@ parking_lot = "0.12"
prost.workspace = true prost.workspace = true
rand.workspace = true rand.workspace = true
snafu.workspace = true snafu.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tokio.workspace = true
tonic.workspace = true tonic.workspace = true
[dev-dependencies] [dev-dependencies]
datanode = { path = "../datanode" } datanode = { path = "../datanode" }
substrait = { path = "../common/substrait" } substrait = { path = "../common/substrait" }
tokio.workspace = true
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
prost.workspace = true prost.workspace = true

View File

@@ -29,6 +29,9 @@ use common_telemetry::{logging, timer};
use futures_util::{TryFutureExt, TryStreamExt}; use futures_util::{TryFutureExt, TryStreamExt};
use prost::Message; use prost::Message;
use snafu::{ensure, ResultExt}; use snafu::{ensure, ResultExt};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, OnceCell};
use tokio_stream::wrappers::ReceiverStream;
use crate::error::{ use crate::error::{
ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu, ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
@@ -47,6 +50,7 @@ pub struct Database {
dbname: String, dbname: String,
client: Client, client: Client,
streaming_client: OnceCell<Sender<GreptimeRequest>>,
ctx: FlightContext, ctx: FlightContext,
} }
@@ -58,6 +62,7 @@ impl Database {
schema: schema.into(), schema: schema.into(),
dbname: "".to_string(), dbname: "".to_string(),
client, client,
streaming_client: OnceCell::new(),
ctx: FlightContext::default(), ctx: FlightContext::default(),
} }
} }
@@ -75,6 +80,7 @@ impl Database {
schema: "".to_string(), schema: "".to_string(),
dbname: dbname.into(), dbname: dbname.into(),
client, client,
streaming_client: OnceCell::new(),
ctx: FlightContext::default(), ctx: FlightContext::default(),
} }
} }
@@ -114,6 +120,22 @@ impl Database {
self.handle(Request::Inserts(requests)).await self.handle(Request::Inserts(requests)).await
} }
pub async fn insert_to_stream(&self, requests: InsertRequests) -> Result<()> {
let streaming_client = self
.streaming_client
.get_or_try_init(|| self.client_stream())
.await?;
let request = self.to_rpc_request(Request::Inserts(requests));
streaming_client.send(request).await.map_err(|e| {
error::ClientStreamingSnafu {
err_msg: e.to_string(),
}
.build()
})
}
pub async fn delete(&self, request: DeleteRequest) -> Result<u32> { pub async fn delete(&self, request: DeleteRequest) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_DELETE); let _timer = timer!(metrics::METRIC_GRPC_DELETE);
self.handle(Request::Delete(request)).await self.handle(Request::Delete(request)).await
@@ -121,15 +143,7 @@ impl Database {
async fn handle(&self, request: Request) -> Result<u32> { async fn handle(&self, request: Request) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner; let mut client = self.client.make_database_client()?.inner;
let request = GreptimeRequest { let request = self.to_rpc_request(request);
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
}),
request: Some(request),
};
let response = client let response = client
.handle(request) .handle(request)
.await? .await?
@@ -142,6 +156,27 @@ impl Database {
Ok(value) Ok(value)
} }
#[inline]
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
GreptimeRequest {
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
}),
request: Some(request),
}
}
async fn client_stream(&self) -> Result<Sender<GreptimeRequest>> {
let mut client = self.client.make_database_client()?.inner;
let (sender, receiver) = mpsc::channel::<GreptimeRequest>(65536);
let receiver = ReceiverStream::new(receiver);
client.handle_requests(receiver).await?;
Ok(sender)
}
pub async fn sql(&self, sql: &str) -> Result<Output> { pub async fn sql(&self, sql: &str) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_SQL); let _timer = timer!(metrics::METRIC_GRPC_SQL);
self.do_get(Request::Query(QueryRequest { self.do_get(Request::Query(QueryRequest {
@@ -212,15 +247,7 @@ impl Database {
async fn do_get(&self, request: Request) -> Result<Output> { async fn do_get(&self, request: Request) -> Result<Output> {
// FIXME(paomian): should be added some labels for metrics // FIXME(paomian): should be added some labels for metrics
let _timer = timer!(metrics::METRIC_GRPC_DO_GET); let _timer = timer!(metrics::METRIC_GRPC_DO_GET);
let request = GreptimeRequest { let request = self.to_rpc_request(request);
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
}),
request: Some(request),
};
let request = Ticket { let request = Ticket {
ticket: request.encode_to_vec().into(), ticket: request.encode_to_vec().into(),
}; };

View File

@@ -67,6 +67,9 @@ pub enum Error {
#[snafu(display("Illegal Database response: {err_msg}"))] #[snafu(display("Illegal Database response: {err_msg}"))]
IllegalDatabaseResponse { err_msg: String }, IllegalDatabaseResponse { err_msg: String },
#[snafu(display("Failed to send request with streaming: {}", err_msg))]
ClientStreaming { err_msg: String, location: Location },
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -77,7 +80,8 @@ impl ErrorExt for Error {
Error::IllegalFlightMessages { .. } Error::IllegalFlightMessages { .. }
| Error::ColumnDataType { .. } | Error::ColumnDataType { .. }
| Error::MissingField { .. } | Error::MissingField { .. }
| Error::IllegalDatabaseResponse { .. } => StatusCode::Internal, | Error::IllegalDatabaseResponse { .. }
| Error::ClientStreaming { .. } => StatusCode::Internal,
Error::Server { code, .. } => *code, Error::Server { code, .. } => *code,
Error::FlightGet { source, .. } => source.status_code(), Error::FlightGet { source, .. } => source.status_code(),

View File

@@ -16,7 +16,6 @@ use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper; use api::helper::ColumnDataTypeWrapper;
use api::v1::{Column, DeleteRequest as GrpcDeleteRequest}; use api::v1::{Column, DeleteRequest as GrpcDeleteRequest};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use snafu::{ensure, ResultExt}; use snafu::{ensure, ResultExt};
use table::requests::DeleteRequest; use table::requests::DeleteRequest;
@@ -41,14 +40,11 @@ pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result<DeleteReque
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype) let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype)
.context(ColumnDataTypeSnafu)? .context(ColumnDataTypeSnafu)?
.into(); .into();
let vector = add_values_to_builder(datatype, values, row_count, null_mask)?;
let vector_builder = &mut datatype.create_mutable_vector(row_count);
add_values_to_builder(vector_builder, values, row_count, null_mask)?;
ensure!( ensure!(
key_column_values key_column_values
.insert(column_name.clone(), vector_builder.to_vector()) .insert(column_name.clone(), vector)
.is_none(), .is_none(),
IllegalDeleteRequestSnafu { IllegalDeleteRequestSnafu {
reason: format!("Duplicated column '{column_name}' in delete request.") reason: format!("Duplicated column '{column_name}' in delete request.")

View File

@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper; use api::helper::ColumnDataTypeWrapper;
use api::v1::column::{SemanticType, Values}; use api::v1::column::{SemanticType, Values};
@@ -25,10 +26,16 @@ use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime}; use common_time::{Date, DateTime};
use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{ValueRef, VectorRef}; use datatypes::prelude::{ValueRef, VectorRef};
use datatypes::scalars::ScalarVector;
use datatypes::schema::SchemaRef; use datatypes::schema::SchemaRef;
use datatypes::types::TimestampType; use datatypes::types::{Int16Type, Int8Type, TimestampType, UInt16Type, UInt8Type};
use datatypes::value::Value; use datatypes::value::Value;
use datatypes::vectors::MutableVector; use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int32Vector, Int64Vector, PrimitiveVector, StringVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
UInt64Vector,
};
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId; use table::metadata::TableId;
use table::requests::InsertRequest; use table::requests::InsertRequest;
@@ -287,15 +294,10 @@ pub fn to_table_insert_request(
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype) let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype)
.context(ColumnDataTypeSnafu)? .context(ColumnDataTypeSnafu)?
.into(); .into();
let vector = add_values_to_builder(datatype, values, row_count, null_mask)?;
let vector_builder = &mut datatype.create_mutable_vector(row_count);
add_values_to_builder(vector_builder, values, row_count, null_mask)?;
ensure!( ensure!(
columns_values columns_values.insert(column_name.clone(), vector).is_none(),
.insert(column_name.clone(), vector_builder.to_vector())
.is_none(),
ColumnAlreadyExistsSnafu { ColumnAlreadyExistsSnafu {
column: column_name column: column_name
} }
@@ -312,28 +314,16 @@ pub fn to_table_insert_request(
} }
pub(crate) fn add_values_to_builder( pub(crate) fn add_values_to_builder(
builder: &mut Box<dyn MutableVector>, data_type: ConcreteDataType,
values: Values, values: Values,
row_count: usize, row_count: usize,
null_mask: Vec<u8>, null_mask: Vec<u8>,
) -> Result<()> { ) -> Result<VectorRef> {
let data_type = builder.data_type();
let values = convert_values(&data_type, values);
if null_mask.is_empty() { if null_mask.is_empty() {
ensure!( Ok(values_to_vector(&data_type, values))
values.len() == row_count,
UnexpectedValuesLengthSnafu {
reason: "If null_mask is empty, the length of values must be equal to row_count."
}
);
values.iter().try_for_each(|value| {
builder
.try_push_value_ref(value.as_value_ref())
.context(CreateVectorSnafu)
})?;
} else { } else {
let builder = &mut data_type.create_mutable_vector(row_count);
let values = convert_values(&data_type, values);
let null_mask = BitVec::from_vec(null_mask); let null_mask = BitVec::from_vec(null_mask);
ensure!( ensure!(
null_mask.count_ones() + values.len() == row_count, null_mask.count_ones() + values.len() == row_count,
@@ -354,8 +344,53 @@ pub(crate) fn add_values_to_builder(
} }
} }
} }
Ok(builder.to_vector())
}
}
fn values_to_vector(data_type: &ConcreteDataType, values: Values) -> VectorRef {
match data_type {
ConcreteDataType::Boolean(_) => Arc::new(BooleanVector::from(values.bool_values)),
ConcreteDataType::Int8(_) => Arc::new(PrimitiveVector::<Int8Type>::from_iter_values(
values.i8_values.into_iter().map(|x| x as i8),
)),
ConcreteDataType::Int16(_) => Arc::new(PrimitiveVector::<Int16Type>::from_iter_values(
values.i16_values.into_iter().map(|x| x as i16),
)),
ConcreteDataType::Int32(_) => Arc::new(Int32Vector::from_vec(values.i32_values)),
ConcreteDataType::Int64(_) => Arc::new(Int64Vector::from_vec(values.i64_values)),
ConcreteDataType::UInt8(_) => Arc::new(PrimitiveVector::<UInt8Type>::from_iter_values(
values.u8_values.into_iter().map(|x| x as u8),
)),
ConcreteDataType::UInt16(_) => Arc::new(PrimitiveVector::<UInt16Type>::from_iter_values(
values.u16_values.into_iter().map(|x| x as u16),
)),
ConcreteDataType::UInt32(_) => Arc::new(UInt32Vector::from_vec(values.u32_values)),
ConcreteDataType::UInt64(_) => Arc::new(UInt64Vector::from_vec(values.u64_values)),
ConcreteDataType::Float32(_) => Arc::new(Float32Vector::from_vec(values.f32_values)),
ConcreteDataType::Float64(_) => Arc::new(Float64Vector::from_vec(values.f64_values)),
ConcreteDataType::Binary(_) => Arc::new(BinaryVector::from(values.binary_values)),
ConcreteDataType::String(_) => Arc::new(StringVector::from_vec(values.string_values)),
ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)),
ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)),
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => {
Arc::new(TimestampSecondVector::from_vec(values.ts_second_values))
}
TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec(
values.ts_millisecond_values,
)),
TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec(
values.ts_microsecond_values,
)),
TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec(
values.ts_nanosecond_values,
)),
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
} }
Ok(())
} }
fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> { fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {

View File

@@ -113,6 +113,14 @@ impl Vector for BinaryVector {
} }
} }
impl From<Vec<Vec<u8>>> for BinaryVector {
fn from(data: Vec<Vec<u8>>) -> Self {
Self {
array: BinaryArray::from_iter_values(data),
}
}
}
impl ScalarVector for BinaryVector { impl ScalarVector for BinaryVector {
type OwnedItem = Vec<u8>; type OwnedItem = Vec<u8>;
type RefItem<'a> = &'a [u8]; type RefItem<'a> = &'a [u8];

View File

@@ -130,6 +130,12 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
} }
} }
pub fn from_iter_values<I: IntoIterator<Item = T::Native>>(iter: I) -> Self {
Self {
array: PrimitiveArray::from_iter_values(iter),
}
}
pub fn from_values<I: IntoIterator<Item = T::Native>>(iter: I) -> Self { pub fn from_values<I: IntoIterator<Item = T::Native>>(iter: I) -> Self {
Self { Self {
array: PrimitiveArray::from_iter_values(iter), array: PrimitiveArray::from_iter_values(iter),

View File

@@ -122,6 +122,12 @@ pub enum Error {
source: common_grpc::error::Error, source: common_grpc::error::Error,
}, },
#[snafu(display("Failed to write prometheus series, source: {}", source))]
PromSeriesWrite {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
#[snafu(display("Failed to convert time precision, name: {}", name))] #[snafu(display("Failed to convert time precision, name: {}", name))]
TimePrecision { name: String, location: Location }, TimePrecision { name: String, location: Location },
@@ -300,7 +306,9 @@ impl ErrorExt for Error {
| InvalidPrepareStatement { .. } | InvalidPrepareStatement { .. }
| TimePrecision { .. } => StatusCode::InvalidArguments, | TimePrecision { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. } => source.status_code(), InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } => {
source.status_code()
}
Hyper { .. } => StatusCode::Unknown, Hyper { .. } => StatusCode::Unknown,
TlsRequired { .. } => StatusCode::Unknown, TlsRequired { .. } => StatusCode::Unknown,
@@ -405,6 +413,7 @@ impl IntoResponse for Error {
let (status, error_message) = match self { let (status, error_message) = match self {
Error::InfluxdbLineProtocol { .. } Error::InfluxdbLineProtocol { .. }
| Error::InfluxdbLinesWrite { .. } | Error::InfluxdbLinesWrite { .. }
| Error::PromSeriesWrite { .. }
| Error::InvalidOpentsdbLine { .. } | Error::InvalidOpentsdbLine { .. }
| Error::InvalidOpentsdbJsonRequest { .. } | Error::InvalidOpentsdbJsonRequest { .. }
| Error::DecodePromRemoteRequest { .. } | Error::DecodePromRemoteRequest { .. }

View File

@@ -15,13 +15,13 @@
//! prometheus protocol supportings //! prometheus protocol supportings
//! handles prometheus remote_write, remote_read logic //! handles prometheus remote_write, remote_read logic
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::BTreeMap; use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::column::SemanticType; use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests}; use common_grpc::writer::{LinesWriter, Precision};
use common_recordbatch::{RecordBatch, RecordBatches}; use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit; use common_time::timestamp::TimeUnit;
use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::prelude::{ConcreteDataType, Value};
@@ -284,83 +284,68 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
} }
pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> { pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> {
let (inserts, samples_counts) = itertools::process_results( let mut writers: HashMap<String, LinesWriter> = HashMap::new();
request.timeseries.into_iter().map(to_grpc_insert_request), for timeseries in &request.timeseries {
|x| x.unzip::<_, _, Vec<_>, Vec<_>>(), let table_name = timeseries
)?; .labels
Ok(( .iter()
InsertRequests { inserts }, .find(|label| {
samples_counts.into_iter().sum::<usize>(), // The metric name is a special label
)) label.name == METRIC_NAME_LABEL
} })
.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?
.value
.clone();
fn to_grpc_insert_request(timeseries: TimeSeries) -> Result<(GrpcInsertRequest, usize)> { let writer = writers
let samples_count = timeseries.samples.len(); .entry(table_name)
.or_insert_with(|| LinesWriter::with_lines(16));
// For each sample
for sample in &timeseries.samples {
// Insert labels first.
for label in &timeseries.labels {
// The metric name is a special label
if label.name == METRIC_NAME_LABEL {
continue;
}
// TODO(dennis): save exemplars into a column writer
let labels = timeseries.labels; .write_tag(&label.name, &label.value)
let samples = timeseries.samples; .context(error::PromSeriesWriteSnafu)?;
}
// Insert sample timestamp.
writer
.write_ts(
TIMESTAMP_COLUMN_NAME,
(sample.timestamp, Precision::Millisecond),
)
.context(error::PromSeriesWriteSnafu)?;
// Insert sample value.
writer
.write_f64(FIELD_COLUMN_NAME, sample.value)
.context(error::PromSeriesWriteSnafu)?;
let row_count = samples.len(); writer.commit();
let mut columns = Vec::with_capacity(2 + labels.len());
let ts_column = Column {
column_name: TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millisecond_values: samples.iter().map(|x| x.timestamp).collect(),
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
};
columns.push(ts_column);
let field_column = Column {
column_name: FIELD_COLUMN_NAME.to_string(),
values: Some(column::Values {
f64_values: samples.iter().map(|x| x.value).collect(),
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
..Default::default()
};
columns.push(field_column);
let mut table_name = None;
for label in labels {
let tagk = label.name;
let tagv = label.value;
// The metric name is a special label
if tagk == METRIC_NAME_LABEL {
table_name = Some(tagv);
continue;
} }
columns.push(Column {
column_name: tagk.to_string(),
values: Some(column::Values {
string_values: std::iter::repeat(tagv).take(row_count).collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
});
} }
let request = GrpcInsertRequest { let mut sample_counts = 0;
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu { let inserts = writers
msg: "missing '__name__' label in timeseries", .into_iter()
})?, .map(|(table_name, writer)| {
region_number: 0, let (columns, row_count) = writer.finish();
columns, sample_counts += row_count as usize;
row_count: row_count as u32, GrpcInsertRequest {
}; table_name,
Ok((request, samples_count)) region_number: 0,
columns,
row_count,
}
})
.collect();
Ok((InsertRequests { inserts }, sample_counts))
} }
#[inline] #[inline]
@@ -516,13 +501,16 @@ mod tests {
..Default::default() ..Default::default()
}; };
let exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts; let mut exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts;
exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
assert_eq!(3, exprs.len()); assert_eq!(3, exprs.len());
assert_eq!("metric1", exprs[0].table_name); assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name); assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name); assert_eq!("metric3", exprs[2].table_name);
let expr = exprs.get(0).unwrap(); let expr = exprs.get_mut(0).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns; let columns = &expr.columns;
let row_count = expr.row_count; let row_count = expr.row_count;
@@ -548,7 +536,9 @@ mod tests {
vec!["spark", "spark"] vec!["spark", "spark"]
); );
let expr = exprs.get(1).unwrap(); let expr = exprs.get_mut(1).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns; let columns = &expr.columns;
let row_count = expr.row_count; let row_count = expr.row_count;
@@ -568,18 +558,20 @@ mod tests {
vec![3.0, 4.0] vec![3.0, 4.0]
); );
assert_eq!(columns[2].column_name, "instance"); assert_eq!(columns[2].column_name, "idc");
assert_eq!( assert_eq!(
columns[2].values.as_ref().unwrap().string_values, columns[2].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["z001", "z001"] vec!["z001", "z001"]
); );
assert_eq!(columns[3].column_name, "instance");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
let expr = exprs.get(2).unwrap(); let expr = exprs.get_mut(2).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns; let columns = &expr.columns;
let row_count = expr.row_count; let row_count = expr.row_count;
@@ -587,27 +579,27 @@ mod tests {
assert_eq!(3, row_count); assert_eq!(3, row_count);
assert_eq!(columns.len(), 4); assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); assert_eq!(columns[0].column_name, "app");
assert_eq!( assert_eq!(
columns[0].values.as_ref().unwrap().ts_millisecond_values, columns[0].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"]
);
assert_eq!(columns[1].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000, 3000] vec![1000, 2000, 3000]
); );
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME); assert_eq!(columns[2].column_name, FIELD_COLUMN_NAME);
assert_eq!( assert_eq!(
columns[1].values.as_ref().unwrap().f64_values, columns[2].values.as_ref().unwrap().f64_values,
vec![5.0, 6.0, 7.0] vec![5.0, 6.0, 7.0]
); );
assert_eq!(columns[2].column_name, "idc"); assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["z002", "z002", "z002"]
);
assert_eq!(columns[3].column_name, "app");
assert_eq!( assert_eq!(
columns[3].values.as_ref().unwrap().string_values, columns[3].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"] vec!["z002", "z002", "z002"]
); );
} }