feat: remove InsertBatch in gRPC message (#570)

This commit is contained in:
fys
2022-11-24 14:04:48 +08:00
committed by GitHub
parent 4038dd4067
commit c5b0d2431f
23 changed files with 377 additions and 659 deletions

1
Cargo.lock generated
View File

@@ -1250,6 +1250,7 @@ dependencies = [
"common-base",
"common-catalog",
"common-error",
"common-grpc",
"common-query",
"common-telemetry",
"common-time",

View File

@@ -28,9 +28,8 @@ use arrow::datatypes::{DataType, Float64Type, Int64Type};
use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::admin::Admin;
use client::api::v1::codec::InsertBatch;
use client::api::v1::column::Values;
use client::api::v1::{insert_expr, Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
@@ -100,16 +99,13 @@ async fn write_data(
for record_batch in record_batch_reader {
let record_batch = record_batch.unwrap();
let row_count = record_batch.num_rows();
let insert_batch = convert_record_batch(record_batch).into();
let (columns, row_count) = convert_record_batch(record_batch);
let insert_expr = InsertExpr {
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![insert_batch],
})),
options: HashMap::default(),
region_number: 0,
columns,
row_count,
};
let now = Instant::now();
db.insert(insert_expr).await.unwrap();
@@ -125,7 +121,7 @@ async fn write_data(
total_rpc_elapsed_ms
}
fn convert_record_batch(record_batch: RecordBatch) -> InsertBatch {
fn convert_record_batch(record_batch: RecordBatch) -> (Vec<Column>, u32) {
let schema = record_batch.schema();
let fields = schema.fields();
let row_count = record_batch.num_rows();
@@ -143,10 +139,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> InsertBatch {
columns.push(column);
}
InsertBatch {
columns,
row_count: row_count as _,
}
(columns, row_count as _)
}
fn build_values(column: &ArrayRef) -> Values {

View File

@@ -20,7 +20,6 @@ fn main() {
.file_descriptor_set_path(default_out_dir.join("greptime_fd.bin"))
.compile(
&[
"greptime/v1/insert.proto",
"greptime/v1/select.proto",
"greptime/v1/physical_plan.proto",
"greptime/v1/greptime.proto",

View File

@@ -2,6 +2,7 @@ syntax = "proto3";
package greptime.v1;
import "greptime/v1/column.proto";
import "greptime/v1/common.proto";
message DatabaseRequest {
@@ -41,26 +42,16 @@ message InsertExpr {
string schema_name = 1;
string table_name = 2;
message Values {
repeated bytes values = 1;
}
// Data is represented here.
repeated Column columns = 3;
oneof expr {
Values values = 3;
// The row_count of all columns, which include null and non-null values.
//
// Note: the row_count of all columns in a InsertExpr must be same.
uint32 row_count = 4;
// TODO(LFC): Remove field "sql" in InsertExpr.
// When Frontend instance received an insertion SQL (`insert into ...`), it's anticipated to parse the SQL and
// assemble the values to insert to feed Datanode. In other words, inserting data through Datanode instance's GRPC
// interface shouldn't use SQL directly.
// Then why the "sql" field exists here? It's because the Frontend needs table schema to create the values to insert,
// which is currently not able to find anywhere. (Maybe the table schema is suppose to be fetched from Meta?)
// The "sql" field is meant to be removed in the future.
string sql = 4;
}
/// The region number of current insert request.
// The region number of current insert request.
uint32 region_number = 5;
map<string, bytes> options = 6;
}
// TODO(jiachun)

View File

@@ -1,14 +0,0 @@
syntax = "proto3";
package greptime.v1.codec;
import "greptime/v1/column.proto";
message InsertBatch {
repeated Column columns = 1;
uint32 row_count = 2;
}
message RegionNumber {
uint32 id = 1;
}

View File

@@ -15,7 +15,7 @@
pub use prost::DecodeError;
use prost::Message;
use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionNumber, SelectResult};
use crate::v1::codec::{PhysicalPlanNode, SelectResult};
use crate::v1::meta::TableRouteValue;
macro_rules! impl_convert_with_bytes {
@@ -36,10 +36,8 @@ macro_rules! impl_convert_with_bytes {
};
}
impl_convert_with_bytes!(InsertBatch);
impl_convert_with_bytes!(SelectResult);
impl_convert_with_bytes!(PhysicalPlanNode);
impl_convert_with_bytes!(RegionNumber);
impl_convert_with_bytes!(TableRouteValue);
#[cfg(test)]
@@ -51,52 +49,6 @@ mod tests {
const SEMANTIC_TAG: i32 = 0;
#[test]
fn test_convert_insert_batch() {
let insert_batch = mock_insert_batch();
let bytes: Vec<u8> = insert_batch.into();
let insert: InsertBatch = bytes.deref().try_into().unwrap();
assert_eq!(8, insert.row_count);
assert_eq!(1, insert.columns.len());
let column = &insert.columns[0];
assert_eq!("foo", column.column_name);
assert_eq!(SEMANTIC_TAG, column.semantic_type);
assert_eq!(vec![1], column.null_mask);
assert_eq!(
vec![2, 3, 4, 5, 6, 7, 8],
column.values.as_ref().unwrap().i32_values
);
}
#[should_panic]
#[test]
fn test_convert_insert_batch_wrong() {
let insert_batch = mock_insert_batch();
let mut bytes: Vec<u8> = insert_batch.into();
// modify some bytes
bytes[0] = 0b1;
bytes[1] = 0b1;
let insert: InsertBatch = bytes.deref().try_into().unwrap();
assert_eq!(8, insert.row_count);
assert_eq!(1, insert.columns.len());
let column = &insert.columns[0];
assert_eq!("foo", column.column_name);
assert_eq!(SEMANTIC_TAG, column.semantic_type);
assert_eq!(vec![1], column.null_mask);
assert_eq!(
vec![2, 3, 4, 5, 6, 7, 8],
column.values.as_ref().unwrap().i32_values
);
}
#[test]
fn test_convert_select_result() {
let select_result = mock_select_result();
@@ -143,35 +95,6 @@ mod tests {
);
}
#[test]
fn test_convert_region_id() {
let region_id = RegionNumber { id: 12 };
let bytes: Vec<u8> = region_id.into();
let region_id: RegionNumber = bytes.deref().try_into().unwrap();
assert_eq!(12, region_id.id);
}
fn mock_insert_batch() -> InsertBatch {
let values = column::Values {
i32_values: vec![2, 3, 4, 5, 6, 7, 8],
..Default::default()
};
let null_mask = vec![1];
let column = Column {
column_name: "foo".to_string(),
semantic_type: SEMANTIC_TAG,
values: Some(values),
null_mask,
..Default::default()
};
InsertBatch {
columns: vec![column],
row_count: 8,
}
}
fn mock_select_result() -> SelectResult {
let values = column::Values {
i32_values: vec![2, 3, 4, 5, 6, 7, 8],

View File

@@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::*;
use client::{Client, Database};
fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish())
.unwrap();
@@ -29,19 +27,19 @@ async fn run() {
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let db = Database::new("greptime", client);
let (columns, row_count) = insert_data();
let expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: insert_batches(),
})),
options: HashMap::default(),
region_number: 0,
columns,
row_count,
};
db.insert(expr).await.unwrap();
}
fn insert_batches() -> Vec<Vec<u8>> {
fn insert_data() -> (Vec<Column>, u32) {
const SEMANTIC_TAG: i32 = 0;
const SEMANTIC_FIELD: i32 = 1;
const SEMANTIC_TS: i32 = 2;
@@ -101,9 +99,8 @@ fn insert_batches() -> Vec<Vec<u8>> {
..Default::default()
};
let insert_batch = InsertBatch {
columns: vec![host_column, cpu_column, mem_column, ts_column],
(
vec![host_column, cpu_column, mem_column, ts_column],
row_count,
};
vec![insert_batch.into()]
)
}

View File

@@ -9,6 +9,7 @@ api = { path = "../../api" }
async-trait = "0.1"
common-base = { path = "../base" }
common-error = { path = "../error" }
common-grpc = { path = "../grpc" }
common-telemetry = { path = "../telemetry" }
common-time = { path = "../time" }
common-catalog = { path = "../catalog" }

View File

@@ -14,11 +14,9 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec::InsertBatch;
use api::v1::column::{SemanticType, Values};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateExpr};
use common_base::BitVec;
@@ -35,9 +33,8 @@ use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequ
use table::Table;
use crate::error::{
ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DecodeInsertSnafu,
DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, InvalidColumnProtoSnafu,
MissingTimestampColumnSnafu, Result,
ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu,
IllegalInsertDataSnafu, InvalidColumnProtoSnafu, MissingTimestampColumnSnafu, Result,
};
const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32;
const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32;
@@ -52,35 +49,25 @@ fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnD
}
}
pub fn find_new_columns(
schema: &SchemaRef,
insert_batches: &[InsertBatch],
) -> Result<Option<AddColumns>> {
pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result<Option<AddColumns>> {
let mut columns_to_add = Vec::default();
let mut new_columns: HashSet<String> = HashSet::default();
for InsertBatch { columns, row_count } in insert_batches {
if *row_count == 0 || columns.is_empty() {
continue;
}
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
{
if schema.column_schema_by_name(column_name).is_none() && !new_columns.contains(column_name)
{
if schema.column_schema_by_name(column_name).is_none()
&& !new_columns.contains(column_name)
{
let column_def = Some(build_column_def(column_name, *datatype, true));
columns_to_add.push(AddColumn {
column_def,
is_key: *semantic_type == TAG_SEMANTIC_TYPE,
});
new_columns.insert(column_name.to_string());
}
let column_def = Some(build_column_def(column_name, *datatype, true));
columns_to_add.push(AddColumn {
column_def,
is_key: *semantic_type == TAG_SEMANTIC_TYPE,
});
new_columns.insert(column_name.to_string());
}
}
@@ -201,92 +188,84 @@ pub fn build_create_expr_from_insertion(
schema_name: &str,
table_id: Option<TableId>,
table_name: &str,
insert_batches: &[InsertBatch],
columns: &[Column],
) -> Result<CreateExpr> {
let mut new_columns: HashSet<String> = HashSet::default();
let mut column_defs = Vec::default();
let mut primary_key_indices = Vec::default();
let mut timestamp_index = usize::MAX;
for InsertBatch { columns, row_count } in insert_batches {
if *row_count == 0 || columns.is_empty() {
continue;
}
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
{
if !new_columns.contains(column_name) {
let mut is_nullable = true;
match *semantic_type {
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()),
TIMESTAMP_SEMANTIC_TYPE => {
ensure!(
timestamp_index == usize::MAX,
DuplicatedTimestampColumnSnafu {
exists: &columns[timestamp_index].column_name,
duplicated: column_name,
}
);
timestamp_index = column_defs.len();
// Timestamp column must not be null.
is_nullable = false;
}
_ => {}
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
{
if !new_columns.contains(column_name) {
let mut is_nullable = true;
match *semantic_type {
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()),
TIMESTAMP_SEMANTIC_TYPE => {
ensure!(
timestamp_index == usize::MAX,
DuplicatedTimestampColumnSnafu {
exists: &columns[timestamp_index].column_name,
duplicated: column_name,
}
);
timestamp_index = column_defs.len();
// Timestamp column must not be null.
is_nullable = false;
}
let column_def = build_column_def(column_name, *datatype, is_nullable);
column_defs.push(column_def);
new_columns.insert(column_name.to_string());
_ => {}
}
let column_def = build_column_def(column_name, *datatype, is_nullable);
column_defs.push(column_def);
new_columns.insert(column_name.to_string());
}
ensure!(
timestamp_index != usize::MAX,
MissingTimestampColumnSnafu { msg: table_name }
);
let timestamp_field_name = columns[timestamp_index].column_name.clone();
let primary_keys = primary_key_indices
.iter()
.map(|idx| columns[*idx].column_name.clone())
.collect::<Vec<_>>();
let expr = CreateExpr {
catalog_name: Some(catalog_name.to_string()),
schema_name: Some(schema_name.to_string()),
table_name: table_name.to_string(),
desc: Some("Created on insertion".to_string()),
column_defs,
time_index: timestamp_field_name,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id,
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
};
return Ok(expr);
}
IllegalInsertDataSnafu.fail()
ensure!(
timestamp_index != usize::MAX,
MissingTimestampColumnSnafu { msg: table_name }
);
let timestamp_field_name = columns[timestamp_index].column_name.clone();
let primary_keys = primary_key_indices
.iter()
.map(|idx| columns[*idx].column_name.clone())
.collect::<Vec<_>>();
let expr = CreateExpr {
catalog_name: Some(catalog_name.to_string()),
schema_name: Some(schema_name.to_string()),
table_name: table_name.to_string(),
desc: Some("Created on insertion".to_string()),
column_defs,
time_index: timestamp_field_name,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id,
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
};
Ok(expr)
}
pub fn insertion_expr_to_request(
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: Vec<InsertBatch>,
insert_batches: Vec<(Vec<Column>, u32)>,
table: Arc<dyn Table>,
) -> Result<InsertRequest> {
let schema = table.schema();
let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len());
for InsertBatch { columns, row_count } in insert_batches {
for (columns, row_count) in insert_batches {
for Column {
column_name,
values,
@@ -332,14 +311,6 @@ pub fn insertion_expr_to_request(
})
}
#[inline]
pub fn insert_batches(bytes_vec: &[Vec<u8>]) -> Result<Vec<InsertBatch>> {
bytes_vec
.iter()
.map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu))
.collect()
}
fn add_values_to_builder(
builder: &mut VectorBuilder,
values: Values,
@@ -466,9 +437,8 @@ mod tests {
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec::InsertBatch;
use api::v1::column::{self, SemanticType, Values};
use api::v1::{insert_expr, Column, ColumnDataType};
use api::v1::{Column, ColumnDataType};
use common_base::BitVec;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
@@ -482,11 +452,12 @@ mod tests {
use table::Table;
use super::{
build_create_expr_from_insertion, convert_values, find_new_columns, insert_batches,
insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE,
build_create_expr_from_insertion, convert_values, insertion_expr_to_request, is_null,
TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE,
};
use crate::error;
use crate::error::ColumnDataTypeSnafu;
use crate::insert::find_new_columns;
#[inline]
fn build_column_schema(
@@ -511,11 +482,10 @@ mod tests {
assert!(build_create_expr_from_insertion("", "", table_id, table_name, &[]).is_err());
let mock_batch_bytes = mock_insert_batches();
let insert_batches = insert_batches(&mock_batch_bytes).unwrap();
let insert_batch = mock_insert_batch();
let create_expr =
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batches)
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0)
.unwrap();
assert_eq!(table_id, create_expr.table_id);
@@ -601,9 +571,9 @@ mod tests {
assert!(find_new_columns(&schema, &[]).unwrap().is_none());
let mock_insert_bytes = mock_insert_batches();
let insert_batches = insert_batches(&mock_insert_bytes).unwrap();
let add_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap();
let insert_batch = mock_insert_batch();
let add_columns = find_new_columns(&schema, &insert_batch.0).unwrap().unwrap();
assert_eq!(2, add_columns.add_columns.len());
let host_column = &add_columns.add_columns[0];
@@ -633,10 +603,7 @@ mod tests {
fn test_insertion_expr_to_request() {
let table: Arc<dyn Table> = Arc::new(DemoTable {});
let values = insert_expr::Values {
values: mock_insert_batches(),
};
let insert_batches = insert_batches(&values.values).unwrap();
let insert_batches = vec![mock_insert_batch()];
let insert_req =
insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap();
@@ -734,7 +701,7 @@ mod tests {
}
}
fn mock_insert_batches() -> Vec<Vec<u8>> {
fn mock_insert_batch() -> (Vec<Column>, u32) {
let row_count = 2;
let host_vals = column::Values {
@@ -785,10 +752,9 @@ mod tests {
datatype: ColumnDataType::Timestamp as i32,
};
let insert_batch = InsertBatch {
columns: vec![host_column, cpu_column, mem_column, ts_column],
(
vec![host_column, cpu_column, mem_column, ts_column],
row_count,
};
vec![insert_batch.into()]
)
}
}

View File

@@ -20,5 +20,5 @@ mod insert;
pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema};
pub use insert::{
build_alter_table_request, build_create_expr_from_insertion, column_to_vector,
find_new_columns, insert_batches, insertion_expr_to_request,
find_new_columns, insertion_expr_to_request,
};

View File

@@ -14,7 +14,6 @@
use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ColumnDataType};
use common_base::BitVec;
@@ -24,12 +23,14 @@ use crate::error::{Result, TypeMismatchSnafu};
type ColumnName = String;
type RowCount = u32;
// TODO(fys): will remove in the future.
#[derive(Default)]
pub struct LinesWriter {
column_name_index: HashMap<ColumnName, usize>,
null_masks: Vec<BitVec>,
batch: InsertBatch,
batch: (Vec<Column>, RowCount),
lines: usize,
}
@@ -171,20 +172,20 @@ impl LinesWriter {
pub fn commit(&mut self) {
let batch = &mut self.batch;
batch.row_count += 1;
batch.1 += 1;
for i in 0..batch.columns.len() {
for i in 0..batch.0.len() {
let null_mask = &mut self.null_masks[i];
if batch.row_count as usize > null_mask.len() {
if batch.1 as usize > null_mask.len() {
null_mask.push(true);
}
}
}
pub fn finish(mut self) -> InsertBatch {
pub fn finish(mut self) -> (Vec<Column>, RowCount) {
let null_masks = self.null_masks;
for (i, null_mask) in null_masks.into_iter().enumerate() {
let columns = &mut self.batch.columns;
let columns = &mut self.batch.0;
columns[i].null_mask = null_mask.into_vec();
}
self.batch
@@ -204,9 +205,9 @@ impl LinesWriter {
let batch = &mut self.batch;
let to_insert = self.lines;
let mut null_mask = BitVec::with_capacity(to_insert);
null_mask.extend(BitVec::repeat(true, batch.row_count as usize));
null_mask.extend(BitVec::repeat(true, batch.1 as usize));
self.null_masks.push(null_mask);
batch.columns.push(Column {
batch.0.push(Column {
column_name: column_name.to_string(),
semantic_type: semantic_type.into(),
values: Some(Values::with_capacity(datatype, to_insert)),
@@ -217,7 +218,7 @@ impl LinesWriter {
new_idx
}
};
(column_idx, &mut self.batch.columns[column_idx])
(column_idx, &mut self.batch.0[column_idx])
}
}
@@ -282,9 +283,9 @@ mod tests {
writer.commit();
let insert_batch = writer.finish();
assert_eq!(3, insert_batch.row_count);
assert_eq!(3, insert_batch.1);
let columns = insert_batch.columns;
let columns = insert_batch.0;
assert_eq!(9, columns.len());
let column = &columns[0];

View File

@@ -14,7 +14,7 @@
use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder};
use api::v1::{
admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, CreateDatabaseExpr,
admin_expr, object_expr, select_expr, AdminExpr, AdminResult, Column, CreateDatabaseExpr,
ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
@@ -44,7 +44,7 @@ impl Instance {
catalog_name: &str,
schema_name: &str,
table_name: &str,
values: insert_expr::Values,
insert_batches: Vec<(Vec<Column>, u32)>,
) -> Result<Output> {
let schema_provider = self
.catalog_manager
@@ -55,11 +55,7 @@ impl Instance {
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu { name: schema_name })?;
let insert_batches =
common_grpc_expr::insert_batches(&values.values).context(InsertDataSnafu)?;
ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu);
let table = schema_provider
.table(table_name)
.context(CatalogSnafu)?
@@ -87,10 +83,10 @@ impl Instance {
catalog_name: &str,
schema_name: &str,
table_name: &str,
values: insert_expr::Values,
insert_batches: Vec<(Vec<Column>, u32)>,
) -> ObjectResult {
match self
.execute_grpc_insert(catalog_name, schema_name, table_name, values)
.execute_grpc_insert(catalog_name, schema_name, table_name, insert_batches)
.await
{
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
@@ -170,25 +166,13 @@ impl GrpcQueryHandler for Instance {
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &insert_expr.schema_name;
let table_name = &insert_expr.table_name;
let expr = insert_expr
.expr
.context(servers::error::InvalidQuerySnafu {
reason: "missing `expr` in `InsertExpr`",
})?;
// TODO(fys): _region_number is for later use.
let _region_number: u32 = insert_expr.region_number;
match expr {
insert_expr::Expr::Values(values) => {
self.handle_insert(catalog_name, schema_name, table_name, values)
.await
}
insert_expr::Expr::Sql(sql) => {
let output = self.execute_sql(&sql).await;
to_object_result(output).await
}
}
let insert_batches = vec![(insert_expr.columns, insert_expr.row_count)];
self.handle_insert(catalog_name, schema_name, table_name, insert_batches)
.await
}
Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await,
other => {

View File

@@ -13,17 +13,15 @@
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::alter_expr::Kind;
use api::v1::codec::InsertBatch;
use api::v1::column::SemanticType;
use api::v1::{
admin_result, column, insert_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType,
ColumnDef, CreateExpr, InsertExpr, MutateResult,
admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateExpr, InsertExpr, MutateResult,
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
@@ -230,7 +228,10 @@ async fn insert_and_assert(db: &Database) {
// testing data:
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();
let values = vec![InsertBatch {
let expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
region_number: 0,
columns: vec![
expected_host_col.clone(),
expected_cpu_col.clone(),
@@ -238,14 +239,6 @@ async fn insert_and_assert(db: &Database) {
expected_ts_col.clone(),
],
row_count: 4,
}
.into()];
let expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })),
options: HashMap::default(),
region_number: 0,
};
let result = db.insert(expr).await;
result.unwrap();

View File

@@ -16,8 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec::InsertBatch;
use api::v1::{ColumnDataType, CreateExpr};
use api::v1::{Column, ColumnDataType, CreateExpr};
use datatypes::schema::ColumnSchema;
use snafu::{ensure, ResultExt};
use sql::statements::create::{CreateTable, TIME_INDEX};
@@ -35,12 +34,12 @@ pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
pub trait CreateExprFactory {
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr>;
async fn create_expr_by_insert_batch(
async fn create_expr_by_columns(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
batch: &[InsertBatch],
columns: &[Column],
) -> crate::error::Result<CreateExpr>;
}
@@ -53,12 +52,12 @@ impl CreateExprFactory for DefaultCreateExprFactory {
create_to_expr(None, vec![0], stmt)
}
async fn create_expr_by_insert_batch(
async fn create_expr_by_columns(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
batch: &[InsertBatch],
columns: &[Column],
) -> Result<CreateExpr> {
let table_id = None;
let create_expr = common_grpc_expr::build_create_expr_from_insertion(
@@ -66,7 +65,7 @@ impl CreateExprFactory for DefaultCreateExprFactory {
schema_name,
table_id,
table_name,
batch,
columns,
)
.context(BuildCreateExprOnInsertionSnafu)?;

View File

@@ -17,16 +17,14 @@ mod influxdb;
mod opentsdb;
mod prometheus;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::result::ObjectResultBuilder;
use api::v1::alter_expr::Kind;
use api::v1::codec::InsertBatch;
use api::v1::object_expr::Expr;
use api::v1::{
admin_expr, insert_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr,
admin_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column,
CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
@@ -60,13 +58,13 @@ use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu,
CreateDatabaseSnafu, CreateTableSnafu, DeserializeInsertBatchSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
SchemaNotFoundSnafu, SelectSnafu,
CreateDatabaseSnafu, CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu,
MissingMetasrvOptsSnafu, Result, SchemaNotFoundSnafu, SelectSnafu,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::sql::insert_to_request;
use crate::table::insert::insert_request_to_insert_batch;
use crate::table::route::TableRoutes;
#[async_trait]
@@ -292,7 +290,7 @@ impl Instance {
}
/// Handle batch inserts
pub async fn handle_inserts(&self, insert_expr: &[InsertExpr]) -> Result<Output> {
pub async fn handle_inserts(&self, insert_expr: Vec<InsertExpr>) -> Result<Output> {
let mut success = 0;
for expr in insert_expr {
match self.handle_insert(expr).await? {
@@ -304,68 +302,20 @@ impl Instance {
}
/// Handle insert. for 'values' insertion, create/alter the destination table on demand.
pub async fn handle_insert(&self, insert_expr: &InsertExpr) -> Result<Output> {
pub async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result<Output> {
let table_name = &insert_expr.table_name;
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &insert_expr.schema_name;
if let Some(expr) = &insert_expr.expr {
match expr {
api::v1::insert_expr::Expr::Values(values) => {
// TODO(hl): gRPC should also support partitioning.
let region_number = 0;
self.handle_insert_values(
catalog_name,
schema_name,
table_name,
region_number,
values,
)
.await
}
api::v1::insert_expr::Expr::Sql(_) => {
// Frontend does not comprehend insert request that is raw SQL string
self.database(schema_name)
.insert(insert_expr.clone())
.await
.and_then(Output::try_from)
.context(InsertSnafu)
}
}
} else {
// expr is empty
Ok(Output::AffectedRows(0))
}
}
let columns = &insert_expr.columns;
self.create_or_alter_table_on_demand(catalog_name, schema_name, table_name, columns)
.await?;
insert_expr.region_number = 0;
/// Handle insert requests in frontend
/// If insert is SQL string flavor, just forward to datanode
/// If insert is parsed InsertExpr, frontend should comprehend the schema and create/alter table on demand.
pub async fn handle_insert_values(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_number: u32,
values: &insert_expr::Values,
) -> Result<Output> {
let insert_batches = common_grpc_expr::insert_batches(&values.values)
.context(DeserializeInsertBatchSnafu)?;
self.create_or_alter_table_on_demand(
catalog_name,
schema_name,
table_name,
&insert_batches,
)
.await?;
self.database(schema_name)
.insert(InsertExpr {
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
region_number,
options: Default::default(),
expr: Some(insert_expr::Expr::Values(values.clone())),
})
.insert(insert_expr)
.await
.and_then(Output::try_from)
.context(InsertSnafu)
@@ -379,7 +329,7 @@ impl Instance {
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: &[InsertBatch],
columns: &[Column],
) -> Result<()> {
match self
.catalog_manager
@@ -401,13 +351,8 @@ impl Instance {
"Table {}.{}.{} does not exist, try create table",
catalog_name, schema_name, table_name,
);
self.create_table_by_insert_batches(
catalog_name,
schema_name,
table_name,
insert_batches,
)
.await?;
self.create_table_by_columns(catalog_name, schema_name, table_name, columns)
.await?;
info!(
"Successfully created table on insertion: {}.{}.{}",
catalog_name, schema_name, table_name
@@ -415,9 +360,9 @@ impl Instance {
}
Some(table) => {
let schema = table.schema();
if let Some(add_columns) =
common_grpc_expr::find_new_columns(&schema, insert_batches)
.context(FindNewColumnsOnInsertionSnafu)?
if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns)
.context(FindNewColumnsOnInsertionSnafu)?
{
info!(
"Find new columns {:?} on insertion, try to alter table: {}.{}.{}",
@@ -441,17 +386,17 @@ impl Instance {
}
/// Infer create table expr from inserting data
async fn create_table_by_insert_batches(
async fn create_table_by_columns(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: &[InsertBatch],
columns: &[Column],
) -> Result<Output> {
// Create table automatically, build schema from data.
let create_expr = self
.create_expr_factory
.create_expr_by_insert_batch(catalog_name, schema_name, table_name, insert_batches)
.create_expr_by_columns(catalog_name, schema_name, table_name, columns)
.await?;
info!(
@@ -512,9 +457,10 @@ impl Instance {
let insert_request = insert_to_request(&schema_provider, *insert)?;
let batch = crate::table::insert::insert_request_to_insert_batch(&insert_request)?;
let (columns, _row_count) =
crate::table::insert::insert_request_to_insert_batch(&insert_request)?;
self.create_or_alter_table_on_demand(&catalog, &schema, &table, &[batch])
self.create_or_alter_table_on_demand(&catalog, &schema, &table, &columns)
.await?;
let table = schema_provider
@@ -527,6 +473,19 @@ impl Instance {
.await
.context(error::TableSnafu)
}
fn stmt_to_insert_batch(
&self,
catalog: &str,
schema: &str,
insert: Box<Insert>,
) -> Result<(Vec<Column>, u32)> {
let catalog_provider = self.get_catalog(catalog)?;
let schema_provider = Self::get_schema(catalog_provider, schema)?;
let insert_request = insert_to_request(&schema_provider, *insert)?;
insert_request_to_insert_batch(&insert_request)
}
}
#[async_trait]
@@ -580,7 +539,7 @@ impl SqlQueryHandler for Instance {
.context(server_error::ExecuteQuerySnafu { query }),
Statement::Insert(insert) => match self.mode {
Mode::Standalone => {
let (_, schema_name, table_name) = insert
let (catalog_name, schema_name, table_name) = insert
.full_table_name()
.context(error::ParseSqlSnafu)
.map_err(BoxedError::new)
@@ -588,14 +547,19 @@ impl SqlQueryHandler for Instance {
msg: "Failed to get table name",
})?;
let (columns, row_count) = self
.stmt_to_insert_batch(&catalog_name, &schema_name, insert)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
let expr = InsertExpr {
schema_name,
table_name,
expr: Some(insert_expr::Expr::Sql(query.to_string())),
region_number: 0,
options: HashMap::default(),
columns,
row_count,
};
self.handle_insert(&expr)
self.handle_insert(expr)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
@@ -696,7 +660,8 @@ impl GrpcQueryHandler for Instance {
if let Some(expr) = &query.expr {
match expr {
Expr::Insert(insert) => {
let result = self.handle_insert(insert).await;
// TODO(fys): refactor, avoid clone
let result = self.handle_insert(insert.clone()).await;
result
.map(|o| match o {
Output::AffectedRows(rows) => ObjectResultBuilder::new()
@@ -787,7 +752,7 @@ impl GrpcAdminHandler for Instance {
mod tests {
use std::assert_matches::assert_matches;
use api::v1::codec::{InsertBatch, SelectResult};
use api::v1::codec::SelectResult;
use api::v1::column::SemanticType;
use api::v1::{
admin_expr, admin_result, column, object_expr, object_result, select_expr, Column,
@@ -946,22 +911,19 @@ mod tests {
);
// insert
let values = vec![InsertBatch {
columns: vec![
expected_host_col.clone(),
expected_cpu_col.clone(),
expected_mem_col.clone(),
expected_ts_col.clone(),
],
row_count: 4,
}
.into()];
let columns = vec![
expected_host_col.clone(),
expected_cpu_col.clone(),
expected_mem_col.clone(),
expected_ts_col.clone(),
];
let row_count = 4;
let insert_expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })),
options: HashMap::default(),
region_number: 0,
columns,
row_count,
};
let object_expr = ObjectExpr {
header: Some(ExprHeader::default()),

View File

@@ -14,9 +14,7 @@
use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::insert_expr::Expr;
use api::v1::InsertExpr;
use api::v1::{Column, InsertExpr};
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
@@ -28,7 +26,7 @@ use snafu::{OptionExt, ResultExt};
use table::requests::InsertRequest;
use crate::error;
use crate::error::{DeserializeInsertBatchSnafu, InsertBatchToRequestSnafu, Result};
use crate::error::{InsertBatchToRequestSnafu, Result};
use crate::instance::Instance;
#[async_trait]
@@ -37,7 +35,7 @@ impl InfluxdbLineProtocolHandler for Instance {
match self.mode {
Mode::Standalone => {
let exprs: Vec<InsertExpr> = request.try_into()?;
self.handle_inserts(&exprs)
self.handle_inserts(exprs)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
@@ -61,53 +59,41 @@ impl InfluxdbLineProtocolHandler for Instance {
impl Instance {
pub(crate) async fn dist_insert(&self, inserts: Vec<InsertExpr>) -> Result<usize> {
let mut joins = Vec::with_capacity(inserts.len());
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
let catalog_name = DEFAULT_CATALOG_NAME;
for insert in inserts {
let self_clone = self.clone();
let insert_batches = match &insert.expr.unwrap() {
Expr::Values(values) => common_grpc_expr::insert_batches(&values.values)
.context(DeserializeInsertBatchSnafu)?,
Expr::Sql(_) => unreachable!(),
};
self.create_or_alter_table_on_demand(
DEFAULT_CATALOG_NAME,
&insert.schema_name,
&insert.table_name,
&insert_batches,
)
.await?;
let schema_name = insert.schema_name.to_string();
let table_name = insert.table_name.to_string();
let schema_name = insert.schema_name.clone();
let table_name = insert.table_name.clone();
let columns = &insert.columns;
let row_count = insert.row_count;
for insert_batch in &insert_batches {
let catalog_name = catalog_name.clone();
let schema_name = schema_name.clone();
let table_name = table_name.clone();
let request = Self::insert_batch_to_request(
DEFAULT_CATALOG_NAME,
&schema_name,
&table_name,
insert_batch,
)?;
// TODO(fys): need a separate runtime here
let self_clone = self_clone.clone();
let join = tokio::spawn(async move {
let catalog = self_clone.get_catalog(&catalog_name)?;
let schema = Self::get_schema(catalog, &schema_name)?;
let table = schema
.table(&table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu {
table_name: &table_name,
})?;
self.create_or_alter_table_on_demand(catalog_name, &schema_name, &table_name, columns)
.await?;
table.insert(request).await.context(error::TableSnafu)
});
joins.push(join);
}
let request = Self::columns_to_request(
catalog_name,
&schema_name,
&table_name,
columns,
row_count,
)?;
// TODO(fys): need a separate runtime here
let self_clone = self_clone.clone();
let join = tokio::spawn(async move {
let catalog = self_clone.get_catalog(catalog_name)?;
let schema = Self::get_schema(catalog, &schema_name)?;
let table = schema
.table(&table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu { table_name })?;
table.insert(request).await.context(error::TableSnafu)
});
joins.push(join);
}
let mut affected = 0;
@@ -119,16 +105,16 @@ impl Instance {
Ok(affected)
}
fn insert_batch_to_request(
fn columns_to_request(
catalog_name: &str,
schema_name: &str,
table_name: &str,
batches: &InsertBatch,
columns: &[Column],
row_count: u32,
) -> Result<InsertRequest> {
let mut vectors = HashMap::with_capacity(batches.columns.len());
for col in &batches.columns {
let vector =
column_to_vector(col, batches.row_count).context(InsertBatchToRequestSnafu)?;
let mut vectors = HashMap::with_capacity(columns.len());
for col in columns {
let vector = column_to_vector(col, row_count).context(InsertBatchToRequestSnafu)?;
vectors.insert(col.column_name.clone(), vector);
}
Ok(InsertRequest {

View File

@@ -53,7 +53,7 @@ impl OpentsdbProtocolHandler for Instance {
impl Instance {
async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> {
let expr = data_point.as_grpc_insert();
self.handle_insert(&expr).await?;
self.handle_insert(expr).await?;
Ok(())
}
}

View File

@@ -115,7 +115,7 @@ impl PrometheusProtocolHandler for Instance {
Mode::Standalone => {
let exprs = prometheus::write_request_to_insert_exprs(database, request)?;
let futures = exprs
.iter()
.into_iter()
.map(|e| self.handle_insert(e))
.collect::<Vec<_>>();
let res = futures_util::future::join_all(futures)

View File

@@ -511,9 +511,8 @@ impl PartitionExec {
mod test {
use std::time::Duration;
use api::v1::codec::InsertBatch;
use api::v1::column::SemanticType;
use api::v1::{column, insert_expr, Column, ColumnDataType};
use api::v1::{column, Column, ColumnDataType};
use catalog::remote::MetaKvBackend;
use common_recordbatch::util;
use datafusion::arrow_print;
@@ -970,8 +969,8 @@ mod test {
start_ts: i64,
) {
let rows = data.len() as u32;
let values = vec![InsertBatch {
columns: vec![
let values = vec![(
vec![
Column {
column_name: "ts".to_string(),
values: Some(column::Values {
@@ -1001,10 +1000,8 @@ mod test {
..Default::default()
},
],
row_count: rows,
}
.into()];
let values = insert_expr::Values { values };
rows,
)];
dn_instance
.execute_grpc_insert(
&table_name.catalog_name,

View File

@@ -16,10 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec::InsertBatch;
use api::v1::column::SemanticType;
use api::v1::insert_expr::Expr;
use api::v1::{codec, insert_expr, Column, InsertExpr, MutateResult};
use api::v1::{Column, InsertExpr, MutateResult};
use client::{Database, ObjectResult};
use datatypes::prelude::ConcreteDataType;
use snafu::{ensure, OptionExt, ResultExt};
@@ -84,7 +82,7 @@ impl DistTable {
}
}
pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<InsertBatch> {
pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec<Column>, u32)> {
let mut row_count = None;
let columns = insert
@@ -127,24 +125,20 @@ pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<InsertBa
})
.collect::<Result<Vec<_>>>()?;
let insert_batch = codec::InsertBatch {
columns,
row_count: row_count.map(|rows| rows as u32).unwrap_or(0),
};
Ok(insert_batch)
let row_count = row_count.unwrap_or(0) as u32;
Ok((columns, row_count))
}
fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result<InsertExpr> {
let table_name = insert.table_name.clone();
let insert_batch = insert_request_to_insert_batch(&insert)?;
let (columns, row_count) = insert_request_to_insert_batch(&insert)?;
Ok(InsertExpr {
schema_name: insert.schema_name,
table_name,
expr: Some(Expr::Values(insert_expr::Values {
values: vec![insert_batch.into()],
})),
region_number,
options: Default::default(),
columns,
row_count,
})
}
@@ -152,8 +146,6 @@ fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result<
mod tests {
use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::insert_expr::Expr;
use api::v1::{ColumnDataType, InsertExpr};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
@@ -199,16 +191,7 @@ mod tests {
let table_name = insert_expr.table_name;
assert_eq!("demo", table_name);
let expr = insert_expr.expr.as_ref().unwrap();
let vals = match expr {
Expr::Values(vals) => vals,
Expr::Sql(_) => unreachable!(),
};
let batch: &[u8] = vals.values[0].as_ref();
let vals: InsertBatch = batch.try_into().unwrap();
for column in vals.columns {
for column in insert_expr.columns {
let name = column.column_name;
if name == "id" {
assert_eq!(0, column.null_mask[0]);

View File

@@ -14,7 +14,6 @@
use std::collections::HashMap;
use api::v1::insert_expr::{self, Expr};
use api::v1::InsertExpr;
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
@@ -165,14 +164,15 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
Ok(writers
.into_iter()
.map(|(table_name, writer)| InsertExpr {
schema_name: schema_name.clone(),
table_name,
expr: Some(Expr::Values(insert_expr::Values {
values: vec![writer.finish().into()],
})),
options: HashMap::default(),
region_number: 0,
.map(|(table_name, writer)| {
let (columns, row_count) = writer.finish();
InsertExpr {
schema_name: schema_name.clone(),
table_name,
region_number: 0,
columns,
row_count,
}
})
.collect())
}
@@ -180,12 +180,9 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
#[cfg(test)]
mod tests {
use std::ops::Deref;
use std::sync::Arc;
use api::v1::codec::InsertBatch;
use api::v1::column::{SemanticType, Values};
use api::v1::insert_expr::Expr;
use api::v1::{Column, ColumnDataType, InsertExpr};
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
@@ -242,15 +239,9 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
for expr in insert_exprs {
assert_eq!("public", expr.schema_name);
let values = match expr.expr.unwrap() {
Expr::Values(vals) => vals,
Expr::Sql(_) => panic!(),
};
let raw_batch = values.values.get(0).unwrap();
let batch: InsertBatch = raw_batch.deref().try_into().unwrap();
match &expr.table_name[..] {
"monitor1" => assert_monitor_1(&batch),
"monitor2" => assert_monitor_2(&batch),
"monitor1" => assert_monitor_1(&expr.columns),
"monitor2" => assert_monitor_2(&expr.columns),
_ => panic!(),
}
}
@@ -327,8 +318,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
}
}
fn assert_monitor_1(insert_batch: &InsertBatch) {
let columns = &insert_batch.columns;
fn assert_monitor_1(columns: &[Column]) {
assert_eq!(4, columns.len());
verify_column(
&columns[0],
@@ -379,8 +369,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
);
}
fn assert_monitor_2(insert_batch: &InsertBatch) {
let columns = &insert_batch.columns;
fn assert_monitor_2(columns: &[Column]) {
assert_eq!(4, columns.len());
verify_column(
&columns[0],

View File

@@ -12,11 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::codec::InsertBatch;
use api::v1::column::SemanticType;
use api::v1::{column, insert_expr, Column, ColumnDataType, InsertExpr};
use api::v1::{column, Column, ColumnDataType, InsertExpr};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use table::requests::InsertRequest;
@@ -189,18 +186,12 @@ impl DataPoint {
});
}
let batch = InsertBatch {
columns,
row_count: 1,
};
InsertExpr {
schema_name,
table_name: self.metric.clone(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![batch.into()],
})),
options: HashMap::default(),
region_number: 0,
columns,
row_count: 1,
}
}
@@ -337,36 +328,31 @@ mod test {
let grpc_insert = data_point.as_grpc_insert();
assert_eq!(grpc_insert.table_name, "my_metric_1");
match grpc_insert.expr {
Some(insert_expr::Expr::Values(insert_expr::Values { values })) => {
assert_eq!(values.len(), 1);
let insert_batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(insert_batch.row_count, 1);
let columns = insert_batch.columns;
assert_eq!(columns.len(), 4);
let columns = &grpc_insert.columns;
let row_count = grpc_insert.row_count;
assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000]
);
assert_eq!(row_count, 1);
assert_eq!(columns.len(), 4);
assert_eq!(columns[1].column_name, OPENTSDB_VALUE_COLUMN_NAME);
assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]);
assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000]
);
assert_eq!(columns[2].column_name, "tagk1");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["tagv1"]
);
assert_eq!(columns[1].column_name, OPENTSDB_VALUE_COLUMN_NAME);
assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]);
assert_eq!(columns[3].column_name, "tagk2");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["tagv2"]
);
}
_ => unreachable!(),
}
assert_eq!(columns[2].column_name, "tagk1");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["tagv1"]
);
assert_eq!(columns[3].column_name, "tagk2");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["tagv2"]
);
}
}

View File

@@ -14,14 +14,14 @@
//! prometheus protocol supportings
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::codec::{InsertBatch, SelectResult};
use api::v1::codec::SelectResult;
use api::v1::column::SemanticType;
use api::v1::{column, insert_expr, Column, ColumnDataType, InsertExpr};
use api::v1::{column, Column, ColumnDataType, InsertExpr};
use common_grpc::writer::Precision::MILLISECOND;
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{OptionExt, ResultExt};
@@ -413,21 +413,14 @@ fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Resu
});
}
let batch = InsertBatch {
columns,
row_count: row_count as u32,
};
Ok(InsertExpr {
schema_name,
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?,
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![batch.into()],
})),
options: HashMap::default(),
region_number: 0,
columns,
row_count: row_count as u32,
})
}
@@ -683,105 +676,93 @@ mod tests {
assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name);
let values = exprs[0].clone().expr.unwrap();
match values {
insert_expr::Expr::Values(insert_expr::Values { values }) => {
assert_eq!(1, values.len());
let batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(2, batch.row_count);
let columns = batch.columns;
assert_eq!(columns.len(), 3);
let expr = exprs.get(0).unwrap();
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000]
);
let columns = &expr.columns;
let row_count = expr.row_count;
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![1.0, 2.0]
);
assert_eq!(2, row_count);
assert_eq!(columns.len(), 3);
assert_eq!(columns[2].column_name, "job");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["spark", "spark"]
);
}
_ => unreachable!(),
}
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000]
);
let values = exprs[1].clone().expr.unwrap();
match values {
insert_expr::Expr::Values(insert_expr::Values { values }) => {
assert_eq!(1, values.len());
let batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(2, batch.row_count);
let columns = batch.columns;
assert_eq!(columns.len(), 4);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![1.0, 2.0]
);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000]
);
assert_eq!(columns[2].column_name, "job");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["spark", "spark"]
);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![3.0, 4.0]
);
let expr = exprs.get(1).unwrap();
assert_eq!(columns[2].column_name, "instance");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["z001", "z001"]
);
}
_ => unreachable!(),
}
let columns = &expr.columns;
let row_count = expr.row_count;
let values = exprs[2].clone().expr.unwrap();
match values {
insert_expr::Expr::Values(insert_expr::Values { values }) => {
assert_eq!(1, values.len());
let batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(3, batch.row_count);
let columns = batch.columns;
assert_eq!(columns.len(), 4);
assert_eq!(2, row_count);
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000, 3000]
);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000]
);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![5.0, 6.0, 7.0]
);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![3.0, 4.0]
);
assert_eq!(columns[2].column_name, "idc");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["z002", "z002", "z002"]
);
assert_eq!(columns[3].column_name, "app");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"]
);
}
_ => unreachable!(),
}
assert_eq!(columns[2].column_name, "instance");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["z001", "z001"]
);
let expr = exprs.get(2).unwrap();
let columns = &expr.columns;
let row_count = expr.row_count;
assert_eq!(3, row_count);
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000, 3000]
);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![5.0, 6.0, 7.0]
);
assert_eq!(columns[2].column_name, "idc");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["z002", "z002", "z002"]
);
assert_eq!(columns[3].column_name, "app");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"]
);
}
#[test]