feat: handle InsertRequest(formerly InsertExpr) in new Arrow Flight (#800)

feat: handle InsertRequest(formerly InsertExpr) in new Arrow Flight interface
This commit is contained in:
LFC
2022-12-30 10:24:09 +08:00
committed by GitHub
parent d0ef3aa9eb
commit de6803d253
31 changed files with 510 additions and 822 deletions

3
Cargo.lock generated
View File

@@ -137,6 +137,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
name = "api"
version = "0.1.0"
dependencies = [
"arrow-flight",
"common-base",
"common-error",
"common-time",
@@ -1477,6 +1478,7 @@ dependencies = [
"dashmap",
"datafusion",
"datatypes",
"flatbuffers",
"futures",
"prost 0.11.3",
"rand 0.8.5",
@@ -2158,7 +2160,6 @@ dependencies = [
"datafusion",
"datafusion-common",
"datatypes",
"flatbuffers",
"futures",
"hyper",
"log-store",

View File

@@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::admin::Admin;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertExpr, TableId};
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId};
use client::{Client, Database};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -100,7 +100,7 @@ async fn write_data(
for record_batch in record_batch_reader {
let record_batch = record_batch.unwrap();
let (columns, row_count) = convert_record_batch(record_batch);
let insert_expr = InsertExpr {
let request = InsertRequest {
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
region_number: 0,
@@ -108,7 +108,7 @@ async fn write_data(
row_count,
};
let now = Instant::now();
db.insert(insert_expr).await.unwrap();
db.insert(request).await.unwrap();
let elapsed = now.elapsed();
total_rpc_elapsed_ms += elapsed.as_millis();
progress_bar.inc(row_count as _);

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
arrow-flight.workspace = true
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }

View File

@@ -15,10 +15,9 @@ message DatabaseResponse {
}
message ObjectExpr {
ExprHeader header = 1;
oneof expr {
InsertExpr insert = 2;
QueryRequest query = 3;
oneof request {
InsertRequest insert = 1;
QueryRequest query = 2;
}
}
@@ -29,7 +28,7 @@ message QueryRequest {
}
}
message InsertExpr {
message InsertRequest {
string schema_name = 1;
string table_name = 2;
@@ -38,7 +37,7 @@ message InsertExpr {
// The row_count of all columns, which include null and non-null values.
//
// Note: the row_count of all columns in a InsertExpr must be same.
// Note: the row_count of all columns in a InsertRequest must be same.
uint32 row_count = 4;
// The region number of current insert request.
@@ -47,14 +46,7 @@ message InsertExpr {
message ObjectResult {
ResultHeader header = 1;
oneof result {
MutateResult mutate = 2;
FlightDataRaw flight_data = 3;
}
}
message FlightDataRaw {
repeated bytes raw_data = 1;
repeated bytes flight_data = 2;
}
message FlightDataExt {

View File

@@ -12,29 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::prelude::ErrorExt;
use arrow_flight::FlightData;
use prost::Message;
use crate::v1::{
admin_result, object_result, AdminResult, MutateResult, ObjectResult, ResultHeader,
};
use crate::v1::{admin_result, AdminResult, MutateResult, ObjectResult, ResultHeader};
pub const PROTOCOL_VERSION: u32 = 1;
pub type Success = u32;
pub type Failure = u32;
type FlightDataRaw = Vec<Vec<u8>>;
#[derive(Default)]
pub struct ObjectResultBuilder {
version: u32,
code: u32,
err_msg: Option<String>,
result: Option<Body>,
}
pub enum Body {
Mutate((Success, Failure)),
FlightDataRaw(FlightDataRaw),
flight_data: Option<Vec<FlightData>>,
}
impl ObjectResultBuilder {
@@ -61,13 +54,8 @@ impl ObjectResultBuilder {
self
}
pub fn mutate_result(mut self, success: u32, failure: u32) -> Self {
self.result = Some(Body::Mutate((success, failure)));
self
}
pub fn flight_data(mut self, flight_data: FlightDataRaw) -> Self {
self.result = Some(Body::FlightDataRaw(flight_data));
pub fn flight_data(mut self, flight_data: Vec<FlightData>) -> Self {
self.flight_data = Some(flight_data);
self
}
@@ -78,30 +66,21 @@ impl ObjectResultBuilder {
err_msg: self.err_msg.unwrap_or_default(),
});
let result = match self.result {
Some(Body::Mutate((success, failure))) => {
Some(object_result::Result::Mutate(MutateResult {
success,
failure,
}))
}
Some(Body::FlightDataRaw(raw_data)) => Some(object_result::Result::FlightData(
crate::v1::FlightDataRaw { raw_data },
)),
None => None,
let flight_data = if let Some(flight_data) = self.flight_data {
flight_data
.into_iter()
.map(|x| x.encode_to_vec())
.collect::<Vec<Vec<u8>>>()
} else {
vec![]
};
ObjectResult { header, result }
ObjectResult {
header,
flight_data,
}
}
}
pub fn build_err_result(err: &impl ErrorExt) -> ObjectResult {
ObjectResultBuilder::new()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build()
}
#[derive(Debug)]
pub struct AdminResultBuilder {
version: u32,
@@ -159,11 +138,7 @@ impl Default for AdminResultBuilder {
#[cfg(test)]
mod tests {
use common_error::status_code::StatusCode;
use super::*;
use crate::error::UnknownColumnDataTypeSnafu;
use crate::v1::{object_result, MutateResult};
#[test]
fn test_object_result_builder() {
@@ -171,32 +146,10 @@ mod tests {
.version(101)
.status_code(500)
.err_msg("Failed to read this file!".to_string())
.mutate_result(100, 20)
.build();
let header = obj_result.header.unwrap();
assert_eq!(101, header.version);
assert_eq!(500, header.code);
assert_eq!("Failed to read this file!", header.err_msg);
let result = obj_result.result.unwrap();
assert_eq!(
object_result::Result::Mutate(MutateResult {
success: 100,
failure: 20,
}),
result
);
}
#[test]
fn test_build_err_result() {
let err = UnknownColumnDataTypeSnafu { datatype: 1 }.build();
let err_result = build_err_result(&err);
let header = err_result.header.unwrap();
let result = err_result.result;
assert_eq!(PROTOCOL_VERSION, header.version);
assert_eq!(StatusCode::InvalidArguments as u32, header.code);
assert!(result.is_none());
}
}

View File

@@ -1,106 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::*;
use client::{Client, Database};
fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish())
.unwrap();
run();
}
#[tokio::main]
async fn run() {
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let db = Database::new("greptime", client);
let (columns, row_count) = insert_data();
let expr = InsertExpr {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
region_number: 0,
columns,
row_count,
};
db.insert(expr).await.unwrap();
}
fn insert_data() -> (Vec<Column>, u32) {
const SEMANTIC_TAG: i32 = 0;
const SEMANTIC_FIELD: i32 = 1;
const SEMANTIC_TS: i32 = 2;
let row_count = 4;
let host_vals = column::Values {
string_values: vec![
"host1".to_string(),
"host2".to_string(),
"host3".to_string(),
"host4".to_string(),
],
..Default::default()
};
let host_column = Column {
column_name: "host".to_string(),
semantic_type: SEMANTIC_TAG,
values: Some(host_vals),
null_mask: vec![0],
..Default::default()
};
let cpu_vals = column::Values {
f64_values: vec![0.31, 0.41, 0.2],
..Default::default()
};
let cpu_column = Column {
column_name: "cpu".to_string(),
semantic_type: SEMANTIC_FIELD,
values: Some(cpu_vals),
null_mask: vec![2],
..Default::default()
};
let mem_vals = column::Values {
f64_values: vec![0.1, 0.2, 0.3],
..Default::default()
};
let mem_column = Column {
column_name: "memory".to_string(),
semantic_type: SEMANTIC_FIELD,
values: Some(mem_vals),
null_mask: vec![4],
..Default::default()
};
let ts_vals = column::Values {
i64_values: vec![100, 101, 102, 103],
..Default::default()
};
let ts_column = Column {
column_name: "ts".to_string(),
semantic_type: SEMANTIC_TS,
values: Some(ts_vals),
null_mask: vec![0],
..Default::default()
};
(
vec![host_column, cpu_column, mem_column, ts_column],
row_count,
)
}

View File

@@ -13,15 +13,18 @@
// limitations under the License.
use api::v1::{
object_expr, object_result, query_request, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest,
object_expr, query_request, DatabaseRequest, InsertRequest, ObjectExpr,
ObjectResult as GrpcObjectResult, QueryRequest,
};
use common_error::status_code::StatusCode;
use common_grpc::flight::{raw_flight_data_to_message, FlightMessage};
use common_grpc::flight::{
flight_messages_to_recordbatches, raw_flight_data_to_message, FlightMessage,
};
use common_query::Output;
use common_recordbatch::RecordBatches;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::DatanodeSnafu;
use crate::error::{ConvertFlightDataSnafu, DatanodeSnafu, IllegalFlightMessagesSnafu};
use crate::{error, Client, Result};
pub const PROTOCOL_VERSION: u32 = 1;
@@ -44,57 +47,30 @@ impl Database {
&self.name
}
pub async fn insert(&self, insert: InsertExpr) -> Result<ObjectResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
pub async fn insert(&self, request: InsertRequest) -> Result<RpcOutput> {
let expr = ObjectExpr {
header: Some(header),
expr: Some(object_expr::Expr::Insert(insert)),
request: Some(object_expr::Request::Insert(request)),
};
self.object(expr).await?.try_into()
}
pub async fn batch_insert(&self, insert_exprs: Vec<InsertExpr>) -> Result<Vec<ObjectResult>> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let obj_exprs = insert_exprs
.into_iter()
.map(|expr| ObjectExpr {
header: Some(header.clone()),
expr: Some(object_expr::Expr::Insert(expr)),
})
.collect();
self.objects(obj_exprs)
.await?
.into_iter()
.map(|result| result.try_into())
.collect()
}
pub async fn sql(&self, sql: &str) -> Result<ObjectResult> {
pub async fn sql(&self, sql: &str) -> Result<RpcOutput> {
let query = QueryRequest {
query: Some(query_request::Query::Sql(sql.to_string())),
};
self.do_select(query).await
self.do_query(query).await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<ObjectResult> {
let select_expr = QueryRequest {
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<RpcOutput> {
let query = QueryRequest {
query: Some(query_request::Query::LogicalPlan(logical_plan)),
};
self.do_select(select_expr).await
self.do_query(query).await
}
async fn do_select(&self, select_expr: QueryRequest) -> Result<ObjectResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
async fn do_query(&self, request: QueryRequest) -> Result<RpcOutput> {
let expr = ObjectExpr {
header: Some(header),
expr: Some(object_expr::Expr::Query(select_expr)),
request: Some(object_expr::Request::Query(request)),
};
let obj_result = self.object(expr).await?;
@@ -130,12 +106,12 @@ impl Database {
}
#[derive(Debug)]
pub enum ObjectResult {
FlightData(Vec<FlightMessage>),
Mutate(GrpcMutateResult),
pub enum RpcOutput {
RecordBatches(RecordBatches),
AffectedRows(usize),
}
impl TryFrom<api::v1::ObjectResult> for ObjectResult {
impl TryFrom<api::v1::ObjectResult> for RpcOutput {
type Error = error::Error;
fn try_from(object_result: api::v1::ObjectResult) -> std::result::Result<Self, Self::Error> {
@@ -148,39 +124,32 @@ impl TryFrom<api::v1::ObjectResult> for ObjectResult {
.fail();
}
let obj_result = object_result.result.context(error::MissingResultSnafu {
name: "result".to_string(),
expected: 1_usize,
actual: 0_usize,
})?;
Ok(match obj_result {
object_result::Result::Mutate(mutate) => ObjectResult::Mutate(mutate),
object_result::Result::FlightData(flight_data) => {
let flight_messages = raw_flight_data_to_message(flight_data.raw_data)
.context(error::ConvertFlightDataSnafu)?;
ObjectResult::FlightData(flight_messages)
}
})
let flight_messages = raw_flight_data_to_message(object_result.flight_data)
.context(ConvertFlightDataSnafu)?;
let output = if let Some(FlightMessage::AffectedRows(rows)) = flight_messages.get(0) {
ensure!(
flight_messages.len() == 1,
IllegalFlightMessagesSnafu {
reason: "Expect 'AffectedRows' Flight messages to be one and only!"
}
);
RpcOutput::AffectedRows(*rows)
} else {
let recordbatches = flight_messages_to_recordbatches(flight_messages)
.context(ConvertFlightDataSnafu)?;
RpcOutput::RecordBatches(recordbatches)
};
Ok(output)
}
}
impl TryFrom<ObjectResult> for Output {
type Error = error::Error;
fn try_from(value: ObjectResult) -> Result<Self> {
let output = match value {
ObjectResult::Mutate(mutate) => {
if mutate.failure != 0 {
return error::MutateFailureSnafu {
failure: mutate.failure,
}
.fail();
}
Output::AffectedRows(mutate.success as usize)
}
ObjectResult::FlightData(_) => unreachable!(),
};
Ok(output)
impl From<RpcOutput> for Output {
fn from(value: RpcOutput) -> Self {
match value {
RpcOutput::AffectedRows(x) => Output::AffectedRows(x),
RpcOutput::RecordBatches(x) => Output::RecordBatches(x),
}
}
}

View File

@@ -19,10 +19,9 @@ use common_error::prelude::*;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Connect failed to {}, source: {}", url, source))]
ConnectFailed {
url: String,
source: tonic::transport::Error,
#[snafu(display("Illegal Flight messages, reason: {}", reason))]
IllegalFlightMessages {
reason: String,
backtrace: Backtrace,
},
@@ -87,7 +86,7 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ConnectFailed { .. }
Error::IllegalFlightMessages { .. }
| Error::MissingResult { .. }
| Error::MissingHeader { .. }
| Error::TonicStatus { .. }

View File

@@ -21,5 +21,5 @@ pub mod load_balance;
pub use api;
pub use self::client::Client;
pub use self::database::{Database, ObjectResult};
pub use self::database::{Database, RpcOutput};
pub use self::error::{Error, Result};

View File

@@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::column::{SemanticType, Values};
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr};
use api::v1::{
AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr,
InsertRequest as GrpcInsertRequest,
};
use common_base::BitVec;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime};
use datatypes::data_type::{ConcreteDataType, DataType};
@@ -30,7 +32,6 @@ use datatypes::vectors::MutableVector;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};
use table::Table;
use crate::error::{
ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu,
@@ -280,50 +281,43 @@ pub fn build_create_expr_from_insertion(
Ok(expr)
}
pub fn insertion_expr_to_request(
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: Vec<(Vec<Column>, u32)>,
table: Arc<dyn Table>,
pub fn to_table_insert_request(
request: GrpcInsertRequest,
schema: SchemaRef,
) -> Result<InsertRequest> {
let schema = table.schema();
let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len());
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let row_count = request.row_count as usize;
for (columns, row_count) in insert_batches {
for Column {
column_name,
values,
null_mask,
..
} in columns
{
let values = match values {
Some(vals) => vals,
None => continue,
};
let mut columns_values = HashMap::with_capacity(request.columns.len());
for Column {
column_name,
values,
null_mask,
..
} in request.columns
{
let Some(values) = values else { continue };
let column = column_name.clone();
let vector_builder = match columns_builders.entry(column) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let column_schema = schema.column_schema_by_name(&column_name).context(
ColumnNotFoundSnafu {
column_name: &column_name,
table_name,
},
)?;
let data_type = &column_schema.data_type;
entry.insert(data_type.create_mutable_vector(row_count as usize))
}
};
add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?;
}
let vector_builder = &mut schema
.column_schema_by_name(&column_name)
.context(ColumnNotFoundSnafu {
column_name: &column_name,
table_name,
})?
.data_type
.create_mutable_vector(row_count);
add_values_to_builder(vector_builder, values, row_count, null_mask)?;
ensure!(
columns_values
.insert(column_name, vector_builder.to_vector())
.is_none(),
IllegalInsertDataSnafu
);
}
let columns_values = columns_builders
.into_iter()
.map(|(column_name, mut vector_builder)| (column_name, vector_builder.to_vector()))
.collect();
Ok(InsertRequest {
catalog_name: catalog_name.to_string(),
@@ -479,10 +473,7 @@ mod tests {
use table::metadata::TableInfoRef;
use table::Table;
use super::{
build_create_expr_from_insertion, convert_values, insertion_expr_to_request, is_null,
TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE,
};
use super::*;
use crate::error;
use crate::error::ColumnDataTypeSnafu;
use crate::insert::find_new_columns;
@@ -628,12 +619,18 @@ mod tests {
}
#[test]
fn test_insertion_expr_to_request() {
fn test_to_table_insert_request() {
let table: Arc<dyn Table> = Arc::new(DemoTable {});
let insert_batches = vec![mock_insert_batch()];
let insert_req =
insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap();
let (columns, row_count) = mock_insert_batch();
let request = GrpcInsertRequest {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
columns,
row_count,
region_number: 0,
};
let insert_req = to_table_insert_request(request, table.schema()).unwrap();
assert_eq!("greptime", insert_req.catalog_name);
assert_eq!("public", insert_req.schema_name);

View File

@@ -14,10 +14,9 @@
mod alter;
pub mod error;
mod insert;
pub mod insert;
pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema};
pub use insert::{
build_alter_table_request, build_create_expr_from_insertion, column_to_vector,
find_new_columns, insertion_expr_to_request,
build_alter_table_request, build_create_expr_from_insertion, column_to_vector, find_new_columns,
};

View File

@@ -16,6 +16,7 @@ common-runtime = { path = "../runtime" }
dashmap = "5.4"
datafusion.workspace = true
datatypes = { path = "../../datatypes" }
flatbuffers = "22"
futures = "0.3"
prost = "0.11"
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -18,13 +18,15 @@ use std::sync::Arc;
use api::result::ObjectResultBuilder;
use api::v1::{FlightDataExt, ObjectResult};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::FlightData;
use arrow_flight::utils::{flight_data_from_arrow_batch, flight_data_to_arrow_batch};
use arrow_flight::{FlightData, IpcMessage, SchemaAsIpc};
use common_error::prelude::StatusCode;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::arrow;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::arrow::ipc::{root_as_message, MessageHeader};
use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader};
use datatypes::schema::{Schema, SchemaRef};
use flatbuffers::FlatBufferBuilder;
use futures::TryStreamExt;
use prost::Message;
use snafu::{OptionExt, ResultExt};
@@ -46,6 +48,43 @@ pub enum FlightMessage {
AffectedRows(usize),
}
#[derive(Default)]
pub struct FlightEncoder {
write_options: writer::IpcWriteOptions,
}
impl FlightEncoder {
pub fn encode(&self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => {
SchemaAsIpc::new(schema.arrow_schema(), &self.write_options).into()
}
FlightMessage::Recordbatch(recordbatch) => {
let (flight_dictionaries, flight_batch) = flight_data_from_arrow_batch(
recordbatch.df_record_batch(),
&self.write_options,
);
// TODO(LFC): Handle dictionary as FlightData here, when we supported Arrow's Dictionary DataType.
// Currently we don't have a datatype corresponding to Arrow's Dictionary DataType,
// so there won't be any "dictionaries" here. Assert to be sure about it, and
// perform a "testing guard" in case we forgot to handle the possible "dictionaries"
// here in the future.
debug_assert_eq!(flight_dictionaries.len(), 0);
flight_batch
}
FlightMessage::AffectedRows(rows) => {
let ext_data = FlightDataExt {
affected_rows: rows as _,
}
.encode_to_vec();
FlightData::new(None, IpcMessage(build_none_flight_msg()), vec![], ext_data)
}
}
}
}
#[derive(Default)]
pub struct FlightDecoder {
schema: Option<SchemaRef>,
@@ -115,16 +154,10 @@ pub async fn flight_data_to_object_result(
let stream = response.into_inner();
let result: TonicResult<Vec<FlightData>> = stream.try_collect().await;
match result {
Ok(flight_data) => {
let flight_data = flight_data
.into_iter()
.map(|x| x.encode_to_vec())
.collect::<Vec<Vec<u8>>>();
Ok(ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.flight_data(flight_data)
.build())
}
Ok(flight_data) => Ok(ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.flight_data(flight_data)
.build()),
Err(e) => Ok(ObjectResultBuilder::new()
.status_code(StatusCode::Internal as _)
.err_msg(e.to_string())
@@ -177,6 +210,20 @@ pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<
}
}
fn build_none_flight_msg() -> Vec<u8> {
let mut builder = FlatBufferBuilder::new();
let mut message = arrow::ipc::MessageBuilder::new(&mut builder);
message.add_version(arrow::ipc::MetadataVersion::V5);
message.add_header_type(MessageHeader::NONE);
message.add_bodyLength(0);
let data = message.finish();
builder.finish(data, None);
builder.finished_data().to_vec()
}
#[cfg(test)]
mod test {
use arrow_flight::utils::batches_to_flight_data;

View File

@@ -29,7 +29,6 @@ common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }
flatbuffers = "22"
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }

View File

@@ -267,9 +267,6 @@ pub enum Error {
source: common_grpc_expr::error::Error,
},
#[snafu(display("Insert batch is empty"))]
EmptyInsertBatch,
#[snafu(display(
"Table id provider not found, cannot execute SQL directly on datanode in distributed mode"
))]
@@ -384,7 +381,6 @@ impl ErrorExt for Error {
Error::OpenStorageEngine { source } => source.status_code(),
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::EmptyInsertBatch => StatusCode::InvalidArguments,
Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported,
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,

View File

@@ -16,25 +16,28 @@ mod stream;
use std::pin::Pin;
use api::v1::object_expr::Expr;
use api::v1::object_expr::Request as GrpcRequest;
use api::v1::query_request::Query;
use api::v1::{FlightDataExt, ObjectExpr};
use api::v1::{InsertRequest, ObjectExpr};
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, IpcMessage, PutResult, SchemaResult, Ticket,
HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket,
};
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::Output;
use datatypes::arrow;
use flatbuffers::FlatBufferBuilder;
use futures::Stream;
use prost::Message;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response, Streaming};
use crate::error::{self, Result};
use crate::error::{
CatalogSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, InvalidFlightTicketSnafu,
MissingRequiredFieldSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::flight::stream::FlightRecordBatchStream;
use crate::instance::Instance;
@@ -77,21 +80,21 @@ impl FlightService for Instance {
async fn do_get(&self, request: Request<Ticket>) -> TonicResult<Response<Self::DoGetStream>> {
let ticket = request.into_inner().ticket;
let expr = ObjectExpr::decode(ticket.as_slice())
.context(error::InvalidFlightTicketSnafu)?
.expr
.context(error::MissingRequiredFieldSnafu { name: "expr" })?;
match expr {
Expr::Query(query_request) => {
let request = ObjectExpr::decode(ticket.as_slice())
.context(InvalidFlightTicketSnafu)?
.request
.context(MissingRequiredFieldSnafu { name: "request" })?;
let output = match request {
GrpcRequest::Query(query_request) => {
let query = query_request
.query
.context(error::MissingRequiredFieldSnafu { name: "expr" })?;
let stream = self.handle_query(query).await?;
Ok(Response::new(stream))
.context(MissingRequiredFieldSnafu { name: "query" })?;
self.handle_query(query).await?
}
// TODO(LFC): Implement Insertion Flight interface.
Expr::Insert(_) => Err(tonic::Status::unimplemented("Not yet implemented")),
}
GrpcRequest::Insert(request) => self.handle_insert(request).await?,
};
let stream = to_flight_data_stream(output);
Ok(Response::new(stream))
}
type DoPutStream = TonicStream<PutResult>;
@@ -132,56 +135,61 @@ impl FlightService for Instance {
}
impl Instance {
async fn handle_query(&self, query: Query) -> Result<TonicStream<FlightData>> {
let output = match query {
async fn handle_query(&self, query: Query) -> Result<Output> {
Ok(match query {
Query::Sql(sql) => {
let stmt = self
.query_engine
.sql_to_statement(&sql)
.context(error::ExecuteSqlSnafu)?;
.context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, QueryContext::arc()).await?
}
Query::LogicalPlan(plan) => self.execute_logical(plan).await?,
};
Ok(match output {
Output::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream);
Box::pin(stream) as _
}
Output::RecordBatches(x) => {
let stream = FlightRecordBatchStream::new(x.as_stream());
Box::pin(stream) as _
}
Output::AffectedRows(rows) => {
let stream = async_stream::stream! {
let ext_data = FlightDataExt {
affected_rows: rows as _,
}.encode_to_vec();
yield Ok(FlightData::new(None, IpcMessage(build_none_flight_msg()), vec![], ext_data))
};
Box::pin(stream) as _
}
})
}
pub async fn handle_insert(&self, request: InsertRequest) -> Result<Output> {
let table_name = &request.table_name.clone();
// TODO(LFC): InsertRequest should carry catalog name, too.
let table = self
.catalog_manager
.table(DEFAULT_CATALOG_NAME, &request.schema_name, table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?;
let request = common_grpc_expr::insert::to_table_insert_request(request, table.schema())
.context(InsertDataSnafu)?;
let affected_rows = table
.insert(request)
.await
.context(InsertSnafu { table_name })?;
Ok(Output::AffectedRows(affected_rows))
}
}
fn build_none_flight_msg() -> Vec<u8> {
let mut builder = FlatBufferBuilder::new();
let mut message = arrow::ipc::MessageBuilder::new(&mut builder);
message.add_version(arrow::ipc::MetadataVersion::V5);
message.add_header_type(arrow::ipc::MessageHeader::NONE);
message.add_bodyLength(0);
let data = message.finish();
builder.finish(data, None);
builder.finished_data().to_vec()
fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
match output {
Output::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream);
Box::pin(stream) as _
}
Output::RecordBatches(x) => {
let stream = FlightRecordBatchStream::new(x.as_stream());
Box::pin(stream) as _
}
Output::AffectedRows(rows) => {
let stream = tokio_stream::once(Ok(
FlightEncoder::default().encode(FlightMessage::AffectedRows(rows))
));
Box::pin(stream) as _
}
}
}
#[cfg(test)]
mod test {
use api::v1::{object_result, FlightDataRaw, QueryRequest};
use api::v1::QueryRequest;
use common_grpc::flight;
use common_grpc::flight::FlightMessage;
use datatypes::prelude::*;
@@ -201,8 +209,7 @@ mod test {
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
header: None,
expr: Some(Expr::Query(QueryRequest {
request: Some(GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"INSERT INTO demo(host, cpu, memory, ts) VALUES \
('host1', 66.6, 1024, 1672201025000),\
@@ -218,10 +225,7 @@ mod test {
let result = flight::flight_data_to_object_result(response)
.await
.unwrap();
let result = result.result.unwrap();
assert!(matches!(result, object_result::Result::FlightData(_)));
let object_result::Result::FlightData(FlightDataRaw { raw_data }) = result else { unreachable!() };
let raw_data = result.flight_data;
let mut messages = flight::raw_flight_data_to_message(raw_data).unwrap();
assert_eq!(messages.len(), 1);
@@ -232,8 +236,7 @@ mod test {
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
header: None,
expr: Some(Expr::Query(QueryRequest {
request: Some(GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, host, cpu, memory FROM demo".to_string(),
)),
@@ -246,10 +249,7 @@ mod test {
let result = flight::flight_data_to_object_result(response)
.await
.unwrap();
let result = result.result.unwrap();
assert!(matches!(result, object_result::Result::FlightData(_)));
let object_result::Result::FlightData(FlightDataRaw { raw_data }) = result else { unreachable!() };
let raw_data = result.flight_data;
let messages = flight::raw_flight_data_to_message(raw_data).unwrap();
assert_eq!(messages.len(), 2);

View File

@@ -15,11 +15,10 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use arrow_flight::utils::flight_data_from_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use arrow_flight::FlightData;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::warn;
use datatypes::arrow;
use futures::channel::mpsc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, Stream, StreamExt};
@@ -33,14 +32,15 @@ use crate::instance::flight::TonicResult;
#[pin_project(PinnedDrop)]
pub(super) struct FlightRecordBatchStream {
#[pin]
rx: mpsc::Receiver<Result<FlightData, tonic::Status>>,
rx: mpsc::Receiver<Result<FlightMessage, tonic::Status>>,
join_handle: JoinHandle<()>,
done: bool,
encoder: FlightEncoder,
}
impl FlightRecordBatchStream {
pub(super) fn new(recordbatches: SendableRecordBatchStream) -> Self {
let (tx, rx) = mpsc::channel::<TonicResult<FlightData>>(1);
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
let join_handle =
common_runtime::spawn_read(
async move { Self::flight_data_stream(recordbatches, tx).await },
@@ -49,36 +49,24 @@ impl FlightRecordBatchStream {
rx,
join_handle,
done: false,
encoder: FlightEncoder::default(),
}
}
async fn flight_data_stream(
mut recordbatches: SendableRecordBatchStream,
mut tx: Sender<TonicResult<FlightData>>,
mut tx: Sender<TonicResult<FlightMessage>>,
) {
let schema = recordbatches.schema();
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema_flight_data: FlightData =
SchemaAsIpc::new(schema.arrow_schema(), &options).into();
if let Err(e) = tx.send(Ok(schema_flight_data)).await {
if let Err(e) = tx.send(Ok(FlightMessage::Schema(schema))).await {
warn!("stop sending Flight data, err: {e}");
return;
}
while let Some(batch_or_err) = recordbatches.next().await {
match batch_or_err {
Ok(batch) => {
let (flight_dictionaries, flight_batch) =
flight_data_from_arrow_batch(batch.df_record_batch(), &options);
// TODO(LFC): Handle dictionary as FlightData here, when we supported Arrow's Dictionary DataType.
// Currently we don't have a datatype corresponding to Arrow's Dictionary DataType,
// so there won't be any "dictionaries" here. Assert to be sure about it, and
// perform a "testing guard" in case we forgot to handle the possible "dictionaries"
// here in the future.
debug_assert_eq!(flight_dictionaries.len(), 0);
if let Err(e) = tx.send(Ok(flight_batch)).await {
Ok(recordbatch) => {
if let Err(e) = tx.send(Ok(FlightMessage::Recordbatch(recordbatch))).await {
warn!("stop sending Flight data, err: {e}");
return;
}
@@ -115,11 +103,17 @@ impl Stream for FlightRecordBatchStream {
*this.done = true;
Poll::Ready(None)
}
e @ Poll::Ready(Some(Err(_))) => {
*this.done = true;
e
}
other => other,
Poll::Ready(Some(result)) => match result {
Ok(flight_message) => {
let flight_data = this.encoder.encode(flight_message);
Poll::Ready(Some(Ok(flight_data)))
}
Err(e) => {
*this.done = true;
Poll::Ready(Some(Err(e)))
}
},
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -12,20 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder};
use api::v1::{
admin_expr, object_expr, AdminExpr, AdminResult, Column, CreateDatabaseExpr, ObjectExpr,
ObjectResult, QueryRequest,
};
use api::result::AdminResultBuilder;
use api::v1::{admin_expr, AdminExpr, AdminResult, CreateDatabaseExpr, ObjectExpr, ObjectResult};
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::Ticket;
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::prelude::BoxedError;
use common_error::status_code::StatusCode;
use common_grpc::flight::flight_data_to_object_result;
use common_grpc_expr::insertion_expr_to_request;
use common_grpc::flight;
use common_query::Output;
use prost::Message;
use query::plan::LogicalPlan;
@@ -36,88 +31,16 @@ use table::requests::CreateDatabaseRequest;
use tonic::Request;
use crate::error::{
self, CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu,
ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
DecodeLogicalPlanSnafu, ExecuteSqlSnafu, FlightGetSnafu, InvalidFlightDataSnafu, Result,
};
use crate::instance::Instance;
impl Instance {
pub async fn execute_grpc_insert(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: Vec<(Vec<Column>, u32)>,
) -> Result<Output> {
let schema_provider = self
.catalog_manager
.catalog(catalog_name)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu { name: catalog_name })?
.schema(schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu { name: schema_name })?;
ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu);
let table = schema_provider
.table(table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?;
let insert = insertion_expr_to_request(
catalog_name,
schema_name,
table_name,
insert_batches,
table.clone(),
)
.context(InsertDataSnafu)?;
let affected_rows = table
.insert(insert)
async fn boarding(&self, ticket: Request<Ticket>) -> Result<ObjectResult> {
let response = self.do_get(ticket).await.context(FlightGetSnafu)?;
flight::flight_data_to_object_result(response)
.await
.context(InsertSnafu { table_name })?;
Ok(Output::AffectedRows(affected_rows))
}
async fn handle_insert(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: Vec<(Vec<Column>, u32)>,
) -> ObjectResult {
match self
.execute_grpc_insert(catalog_name, schema_name, table_name, insert_batches)
.await
{
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Err(err) => {
common_telemetry::error!(err; "Failed to handle insert, catalog name: {}, schema name: {}, table name: {}", catalog_name, schema_name, table_name);
// TODO(fys): failure count
build_err_result(&err)
}
_ => unreachable!(),
}
}
async fn handle_query_request(&self, query_request: QueryRequest) -> Result<ObjectResult> {
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
header: None,
expr: Some(object_expr::Expr::Query(query_request)),
}
.encode_to_vec(),
});
// TODO(LFC): Temporarily use old GRPC interface here, will make it been replaced.
let response = self.do_get(ticket).await.context(error::FlightGetSnafu)?;
flight_data_to_object_result(response)
.await
.context(error::InvalidFlightDataSnafu)
.context(InvalidFlightDataSnafu)
}
async fn execute_create_database(
@@ -156,34 +79,16 @@ impl Instance {
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> servers::error::Result<ObjectResult> {
let object_resp = match query.expr {
Some(object_expr::Expr::Insert(insert_expr)) => {
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &insert_expr.schema_name;
let table_name = &insert_expr.table_name;
// TODO(fys): _region_number is for later use.
let _region_number: u32 = insert_expr.region_number;
let insert_batches = vec![(insert_expr.columns, insert_expr.row_count)];
self.handle_insert(catalog_name, schema_name, table_name, insert_batches)
.await
}
Some(object_expr::Expr::Query(query_request)) => self
.handle_query_request(query_request.clone())
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteQuerySnafu {
query: format!("{query_request:?}"),
})?,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{other:?}"),
}
.fail();
}
};
Ok(object_resp)
let ticket = Request::new(Ticket {
ticket: query.encode_to_vec(),
});
// TODO(LFC): Temporarily use old GRPC interface here, will get rid of them near the end of Arrow Flight adoption.
self.boarding(ticket)
.await
.map_err(BoxedError::new)
.with_context(|_| servers::error::ExecuteQuerySnafu {
query: format!("{query:?}"),
})
}
}

View File

@@ -442,12 +442,6 @@ pub enum Error {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Failed to convert Flight Message, source: {}", source))]
ConvertFlightMessage {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -537,7 +531,6 @@ impl ErrorExt for Error {
Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Error::EncodeSubstraitLogicalPlan { source } => source.status_code(),
Error::BuildVector { source, .. } => source.status_code(),
Error::ConvertFlightMessage { source } => source.status_code(),
}
}

View File

@@ -22,20 +22,21 @@ use std::time::Duration;
use api::result::{ObjectResultBuilder, PROTOCOL_VERSION};
use api::v1::alter_expr::Kind;
use api::v1::object_expr::Expr;
use api::v1::object_expr::Request;
use api::v1::{
admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr,
CreateTableExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
CreateTableExpr, DropTableExpr, ExprHeader, InsertRequest, ObjectExpr,
ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef};
use client::admin::admin_result_to_output;
use client::ObjectResult;
use client::RpcOutput;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::{debug, info};
@@ -245,10 +246,10 @@ impl Instance {
}
/// Handle batch inserts
pub async fn handle_inserts(&self, insert_expr: Vec<InsertExpr>) -> Result<Output> {
pub async fn handle_inserts(&self, requests: Vec<InsertRequest>) -> Result<Output> {
let mut success = 0;
for expr in insert_expr {
match self.handle_insert(expr).await? {
for request in requests {
match self.handle_insert(request).await? {
Output::AffectedRows(rows) => success += rows,
_ => unreachable!("Insert should not yield output other than AffectedRows"),
}
@@ -256,30 +257,26 @@ impl Instance {
Ok(Output::AffectedRows(success))
}
/// Handle insert. for 'values' insertion, create/alter the destination table on demand.
async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result<Output> {
let table_name = &insert_expr.table_name;
// TODO(LFC): Revisit GRPC insertion feature, check if the "create/alter table on demand" functionality is broken.
// Should be supplied with enough tests.
async fn handle_insert(&self, request: InsertRequest) -> Result<Output> {
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &insert_expr.schema_name;
let columns = &insert_expr.columns;
let columns = &request.columns;
self.create_or_alter_table_on_demand(catalog_name, schema_name, table_name, columns)
.await?;
insert_expr.region_number = 0;
let query = ObjectExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(Expr::Insert(insert_expr)),
request: Some(Request::Insert(request)),
};
let result = GrpcQueryHandler::do_query(&*self.grpc_query_handler, query)
.await
.context(error::InvokeGrpcServerSnafu)?;
let result: ObjectResult = result.try_into().context(InsertSnafu)?;
result.try_into().context(InsertSnafu)
let result: RpcOutput = result.try_into().context(InsertSnafu)?;
Ok(result.into())
}
// check if table already exist:
@@ -514,14 +511,14 @@ impl Instance {
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
let expr = InsertExpr {
let request = InsertRequest {
schema_name,
table_name,
region_number: 0,
columns,
row_count,
};
self.handle_insert(expr).await
self.handle_insert(request).await
}
Mode::Distributed => {
let affected = self
@@ -672,24 +669,26 @@ impl ScriptHandler for Instance {
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> server_error::Result<GrpcObjectResult> {
let expr = query
let request = query
.clone()
.expr
.request
.context(server_error::InvalidQuerySnafu {
reason: "empty expr",
})?;
match expr {
Expr::Insert(insert_expr) => {
match request {
Request::Insert(request) => {
let output = self
.handle_insert(insert_expr.clone())
.handle_insert(request.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{insert_expr:?}"),
query: format!("{request:?}"),
})?;
let object_result = match output {
Output::AffectedRows(rows) => ObjectResultBuilder::default()
.mutate_result(rows as _, 0)
.flight_data(vec![
FlightEncoder::default().encode(FlightMessage::AffectedRows(rows))
])
.build(),
_ => unreachable!(),
};
@@ -721,9 +720,8 @@ mod tests {
use api::v1::column::SemanticType;
use api::v1::{
admin_expr, admin_result, column, object_expr, object_result, query_request, Column,
ColumnDataType, ColumnDef as GrpcColumnDef, ExprHeader, FlightDataRaw, MutateResult,
QueryRequest,
admin_expr, admin_result, column, query_request, Column, ColumnDataType,
ColumnDef as GrpcColumnDef, ExprHeader, MutateResult, QueryRequest,
};
use common_grpc::flight::{raw_flight_data_to_message, FlightMessage};
use common_recordbatch::RecordBatch;
@@ -905,7 +903,7 @@ mod tests {
expected_ts_col.clone(),
];
let row_count = 4;
let insert_expr = InsertExpr {
let request = InsertRequest {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
region_number: 0,
@@ -913,94 +911,84 @@ mod tests {
row_count,
};
let object_expr = ObjectExpr {
header: Some(ExprHeader::default()),
expr: Some(object_expr::Expr::Insert(insert_expr)),
request: Some(Request::Insert(request)),
};
let result = GrpcQueryHandler::do_query(&*instance, object_expr)
.await
.unwrap();
assert_matches!(
result.result,
Some(object_result::Result::Mutate(MutateResult {
success: 4,
failure: 0
}))
);
let raw_data = result.flight_data;
let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap();
assert_eq!(flight_messages.len(), 1);
let message = flight_messages.remove(0);
assert!(matches!(message, FlightMessage::AffectedRows(4)));
// select
let object_expr = ObjectExpr {
header: Some(ExprHeader::default()),
expr: Some(Expr::Query(QueryRequest {
request: Some(Request::Query(QueryRequest {
query: Some(query_request::Query::Sql("select * from demo".to_string())),
})),
};
let result = GrpcQueryHandler::do_query(&*instance, object_expr)
.await
.unwrap();
match result.result {
Some(object_result::Result::FlightData(FlightDataRaw { raw_data })) => {
let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap();
assert_eq!(flight_messages.len(), 2);
let raw_data = result.flight_data;
let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap();
assert_eq!(flight_messages.len(), 2);
let expected_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("disk_util", ConcreteDataType::float64_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(
9.9f64,
))))
.unwrap(),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true),
]));
match flight_messages.remove(0) {
FlightMessage::Schema(schema) => {
assert_eq!(schema, expected_schema);
}
_ => unreachable!(),
}
let expected_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("disk_util", ConcreteDataType::float64_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(9.9f64))))
.unwrap(),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true),
]));
match flight_messages.remove(0) {
FlightMessage::Schema(schema) => {
assert_eq!(schema, expected_schema);
}
_ => unreachable!(),
}
match flight_messages.remove(0) {
FlightMessage::Recordbatch(recordbatch) => {
let expect_recordbatch = RecordBatch::new(
expected_schema,
vec![
Arc::new(StringVector::from(vec![
"fe.host.a",
"fe.host.b",
"fe.host.c",
"fe.host.d",
])) as _,
Arc::new(Float64Vector::from(vec![
Some(1.0f64),
None,
Some(3.0f64),
Some(4.0f64),
])) as _,
Arc::new(Float64Vector::from(vec![
Some(100f64),
Some(200f64),
None,
Some(400f64),
])) as _,
Arc::new(Float64Vector::from_vec(
iter::repeat(9.9f64).take(4).collect(),
)) as _,
Arc::new(TimestampMillisecondVector::from_vec(vec![
1000i64, 2000, 3000, 4000,
])) as _,
],
)
.unwrap();
assert_eq!(recordbatch, expect_recordbatch);
}
_ => unreachable!(),
}
match flight_messages.remove(0) {
FlightMessage::Recordbatch(recordbatch) => {
let expect_recordbatch = RecordBatch::new(
expected_schema,
vec![
Arc::new(StringVector::from(vec![
"fe.host.a",
"fe.host.b",
"fe.host.c",
"fe.host.d",
])) as _,
Arc::new(Float64Vector::from(vec![
Some(1.0f64),
None,
Some(3.0f64),
Some(4.0f64),
])) as _,
Arc::new(Float64Vector::from(vec![
Some(100f64),
Some(200f64),
None,
Some(400f64),
])) as _,
Arc::new(Float64Vector::from_vec(
iter::repeat(9.9f64).take(4).collect(),
)) as _,
Arc::new(TimestampMillisecondVector::from_vec(vec![
1000i64, 2000, 3000, 4000,
])) as _,
],
)
.unwrap();
assert_eq!(recordbatch, expect_recordbatch);
}
_ => unreachable!(),
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use api::v1::{Column, InsertExpr};
use api::v1::{Column, InsertRequest as GrpcInsertRequest};
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
@@ -34,8 +34,7 @@ impl InfluxdbLineProtocolHandler for Instance {
async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> {
match self.mode {
Mode::Standalone => {
let exprs: Vec<InsertExpr> = request.try_into()?;
self.handle_inserts(exprs)
self.handle_inserts(request.try_into()?)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
@@ -57,7 +56,7 @@ impl InfluxdbLineProtocolHandler for Instance {
}
impl Instance {
pub(crate) async fn dist_insert(&self, inserts: Vec<InsertExpr>) -> Result<usize> {
pub(crate) async fn dist_insert(&self, inserts: Vec<GrpcInsertRequest>) -> Result<usize> {
let mut joins = Vec::with_capacity(inserts.len());
let catalog_name = DEFAULT_CATALOG_NAME;

View File

@@ -14,12 +14,11 @@
use api::prometheus::remote::read_request::ResponseType;
use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::v1::object_expr::Expr;
use api::v1::object_expr::Request;
use api::v1::{query_request, ObjectExpr, QueryRequest};
use async_trait::async_trait;
use client::ObjectResult;
use client::RpcOutput;
use common_error::prelude::BoxedError;
use common_grpc::flight;
use common_telemetry::logging;
use prost::Message;
use servers::error::{self, Result as ServerResult};
@@ -61,13 +60,8 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
Ok(ResponseType::from_i32(*response_type).unwrap())
}
fn object_result_to_query_result(
table_name: &str,
object_result: ObjectResult,
) -> ServerResult<QueryResult> {
let ObjectResult::FlightData(flight_messages) = object_result else { unreachable!() };
let recordbatches = flight::flight_messages_to_recordbatches(flight_messages)
.context(error::ConvertFlightMessageSnafu)?;
fn to_query_result(table_name: &str, object_result: RpcOutput) -> ServerResult<QueryResult> {
let RpcOutput::RecordBatches(recordbatches) = object_result else { unreachable!() };
Ok(QueryResult {
timeseries: prometheus::recordbatches_to_timeseries(table_name, recordbatches)?,
})
@@ -78,7 +72,7 @@ impl Instance {
&self,
db: &str,
queries: &[Query],
) -> ServerResult<Vec<(String, ObjectResult)>> {
) -> ServerResult<Vec<(String, RpcOutput)>> {
let mut results = Vec::with_capacity(queries.len());
for query in queries {
@@ -90,8 +84,7 @@ impl Instance {
);
let query = ObjectExpr {
header: None,
expr: Some(Expr::Query(QueryRequest {
request: Some(Request::Query(QueryRequest {
query: Some(query_request::Query::Sql(sql.to_string())),
})),
};
@@ -112,10 +105,10 @@ impl Instance {
#[async_trait]
impl PrometheusProtocolHandler for Instance {
async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> {
let exprs = prometheus::write_request_to_insert_exprs(database, request.clone())?;
let requests = prometheus::to_grpc_insert_requests(database, request.clone())?;
match self.mode {
Mode::Standalone => {
self.handle_inserts(exprs)
self.handle_inserts(requests)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
@@ -123,7 +116,7 @@ impl PrometheusProtocolHandler for Instance {
})?;
}
Mode::Distributed => {
self.dist_insert(exprs)
self.dist_insert(requests)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
@@ -146,9 +139,7 @@ impl PrometheusProtocolHandler for Instance {
ResponseType::Samples => {
let query_results = results
.into_iter()
.map(|(table_name, object_result)| {
object_result_to_query_result(&table_name, object_result)
})
.map(|(table_name, object_result)| to_query_result(&table_name, object_result))
.collect::<ServerResult<Vec<_>>>()?;
let response = ReadResponse {

View File

@@ -21,7 +21,7 @@ use std::sync::Arc;
use api::v1::AlterExpr;
use async_trait::async_trait;
use client::admin::Admin;
use client::Database;
use client::{Database, RpcOutput};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
@@ -88,11 +88,10 @@ impl Table for DistTable {
let spliter = WriteSpliter::with_partition_rule(partition_rule);
let inserts = spliter.split(request).map_err(TableError::new)?;
let result = match self.dist_insert(inserts).await.map_err(TableError::new)? {
client::ObjectResult::Mutate(result) => result,
_ => unreachable!(),
};
Ok(result.success as usize)
let output = self.dist_insert(inserts).await.map_err(TableError::new)?;
let RpcOutput::AffectedRows(rows) = output else { unreachable!() };
Ok(rows)
}
async fn scan(
@@ -508,7 +507,7 @@ impl PartitionExec {
#[cfg(test)]
mod test {
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType};
use api::v1::{column, Column, ColumnDataType, InsertRequest};
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -993,49 +992,45 @@ mod test {
data: Vec<i32>,
start_ts: i64,
) {
let rows = data.len() as u32;
let values = vec![(
vec![
Column {
column_name: "ts".to_string(),
values: Some(column::Values {
i64_values: (start_ts..start_ts + rows as i64).collect::<Vec<i64>>(),
..Default::default()
}),
datatype: ColumnDataType::Int64 as i32,
semantic_type: SemanticType::Timestamp as i32,
let row_count = data.len() as u32;
let columns = vec![
Column {
column_name: "ts".to_string(),
values: Some(column::Values {
i64_values: (start_ts..start_ts + row_count as i64).collect::<Vec<i64>>(),
..Default::default()
},
Column {
column_name: "a".to_string(),
values: Some(column::Values {
i32_values: data,
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
}),
datatype: ColumnDataType::Int64 as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
Column {
column_name: "a".to_string(),
values: Some(column::Values {
i32_values: data,
..Default::default()
},
Column {
column_name: "row_id".to_string(),
values: Some(column::Values {
i32_values: (1..=rows as i32).collect::<Vec<i32>>(),
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "row_id".to_string(),
values: Some(column::Values {
i32_values: (1..=row_count as i32).collect::<Vec<i32>>(),
..Default::default()
},
],
rows,
)];
dn_instance
.execute_grpc_insert(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
values,
)
.await
.unwrap();
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
];
let request = InsertRequest {
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
columns,
row_count,
region_number: 0,
};
dn_instance.handle_insert(request).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -17,8 +17,8 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::column::SemanticType;
use api::v1::{Column, InsertExpr, MutateResult};
use client::{Database, ObjectResult};
use api::v1::{Column, InsertRequest as GrpcInsertRequest};
use client::{Database, RpcOutput};
use datatypes::prelude::ConcreteDataType;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
@@ -33,7 +33,7 @@ impl DistTable {
pub async fn dist_insert(
&self,
inserts: HashMap<RegionNumber, InsertRequest>,
) -> Result<ObjectResult> {
) -> Result<RpcOutput> {
let route = self.table_routes.get_route(&self.table_name).await?;
let mut joins = Vec::with_capacity(inserts.len());
@@ -57,7 +57,7 @@ impl DistTable {
// TODO(fys): a separate runtime should be used here.
let join = tokio::spawn(async move {
instance
.grpc_insert(to_insert_expr(region_id, insert)?)
.grpc_insert(to_grpc_insert_request(region_id, insert)?)
.await
.context(error::RequestDatanodeSnafu)
});
@@ -66,19 +66,12 @@ impl DistTable {
}
let mut success = 0;
let mut failure = 0;
for join in joins {
let object_result = join.await.context(error::JoinTaskSnafu)??;
let result = match object_result {
ObjectResult::Mutate(result) => result,
ObjectResult::FlightData(_) => unreachable!(),
};
success += result.success;
failure += result.failure;
let RpcOutput::AffectedRows(rows) = object_result else { unreachable!() };
success += rows;
}
Ok(ObjectResult::Mutate(MutateResult { success, failure }))
Ok(RpcOutput::AffectedRows(success))
}
}
@@ -130,10 +123,13 @@ pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec<Col
Ok((columns, row_count))
}
fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result<InsertExpr> {
fn to_grpc_insert_request(
region_number: RegionNumber,
insert: InsertRequest,
) -> Result<GrpcInsertRequest> {
let table_name = insert.table_name.clone();
let (columns, row_count) = insert_request_to_insert_batch(&insert)?;
Ok(InsertExpr {
Ok(GrpcInsertRequest {
schema_name: insert.schema_name,
table_name,
region_number,
@@ -146,21 +142,21 @@ fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result<
mod tests {
use std::collections::HashMap;
use api::v1::{ColumnDataType, InsertExpr};
use api::v1::ColumnDataType;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder};
use table::requests::InsertRequest;
use super::to_insert_expr;
use super::*;
#[test]
fn test_to_insert_expr() {
fn test_to_grpc_insert_request() {
let insert_request = mock_insert_request();
let insert_expr = to_insert_expr(12, insert_request).unwrap();
let request = to_grpc_insert_request(12, insert_request).unwrap();
verify_insert_expr(insert_expr);
verify_grpc_insert_request(request);
}
fn mock_insert_request() -> InsertRequest {
@@ -186,11 +182,11 @@ mod tests {
}
}
fn verify_insert_expr(insert_expr: InsertExpr) {
let table_name = insert_expr.table_name;
fn verify_grpc_insert_request(request: GrpcInsertRequest) {
let table_name = request.table_name;
assert_eq!("demo", table_name);
for column in insert_expr.columns {
for column in request.columns {
let name = column.column_name;
if name == "id" {
assert_eq!(0, column.null_mask[0]);
@@ -207,7 +203,7 @@ mod tests {
}
}
let region_number = insert_expr.region_number;
let region_number = request.region_number;
assert_eq!(12, region_number);
}
}

View File

@@ -15,9 +15,8 @@
use std::fmt::Formatter;
use std::sync::Arc;
use api::v1::InsertExpr;
use client::{Database, ObjectResult};
use common_grpc::flight::flight_messages_to_recordbatches;
use api::v1::InsertRequest;
use client::{Database, RpcOutput};
use common_query::prelude::Expr;
use common_recordbatch::RecordBatches;
use datafusion::datasource::DefaultTableSource;
@@ -28,7 +27,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::error::{self, ConvertFlightMessageSnafu, Result};
use crate::error::{self, Result};
#[derive(Clone)]
pub struct DatanodeInstance {
@@ -47,7 +46,7 @@ impl DatanodeInstance {
Self { table, db }
}
pub(crate) async fn grpc_insert(&self, request: InsertExpr) -> client::Result<ObjectResult> {
pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result<RpcOutput> {
self.db.insert(request).await
}
@@ -63,13 +62,7 @@ impl DatanodeInstance {
.logical_plan(substrait_plan.to_vec())
.await
.context(error::RequestDatanodeSnafu)?;
let recordbatches = match result {
ObjectResult::FlightData(flight_message) => {
flight_messages_to_recordbatches(flight_message)
.context(ConvertFlightMessageSnafu)?
}
_ => unreachable!(),
};
let RpcOutput::RecordBatches(recordbatches) = result else { unreachable!() };
Ok(recordbatches)
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use api::v1::InsertExpr;
use api::v1::InsertRequest as GrpcInsertRequest;
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
@@ -86,7 +86,7 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertRequest> {
}
// TODO(fys): will remove in the future.
impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
@@ -163,7 +163,7 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
.into_iter()
.map(|(table_name, writer)| {
let (columns, row_count) = writer.finish();
InsertExpr {
GrpcInsertRequest {
schema_name: schema_name.clone(),
table_name,
region_number: 0,
@@ -180,7 +180,7 @@ mod tests {
use std::sync::Arc;
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ColumnDataType, InsertExpr};
use api::v1::{Column, ColumnDataType};
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
@@ -188,6 +188,7 @@ mod tests {
use datatypes::vectors::Vector;
use table::requests::InsertRequest;
use super::*;
use crate::influxdb::InfluxdbRequest;
#[test]
@@ -230,15 +231,14 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
lines: lines.to_string(),
};
let insert_exprs: Vec<InsertExpr> = influxdb_req.try_into().unwrap();
let requests: Vec<GrpcInsertRequest> = influxdb_req.try_into().unwrap();
assert_eq!(2, requests.len());
assert_eq!(2, insert_exprs.len());
for expr in insert_exprs {
assert_eq!("public", expr.schema_name);
match &expr.table_name[..] {
"monitor1" => assert_monitor_1(&expr.columns),
"monitor2" => assert_monitor_2(&expr.columns),
for request in requests {
assert_eq!("public", request.schema_name);
match &request.table_name[..] {
"monitor1" => assert_monitor_1(&request.columns),
"monitor2" => assert_monitor_2(&request.columns),
_ => panic!(),
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertExpr};
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use table::requests::InsertRequest;
@@ -144,8 +144,9 @@ impl DataPoint {
line_writer.finish()
}
// TODO(fys): will remove in the future.
pub fn as_grpc_insert(&self) -> InsertExpr {
// TODO(LFC): opentsdb and influxdb insertions should go through the Table trait directly.
// Currently: line protocol -> grpc request -> grpc interface -> table trait
pub fn as_grpc_insert(&self) -> GrpcInsertRequest {
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let mut columns = Vec::with_capacity(2 + self.tags.len());
@@ -186,7 +187,7 @@ impl DataPoint {
});
}
InsertExpr {
GrpcInsertRequest {
schema_name,
table_name: self.metric.clone(),
region_number: 0,

View File

@@ -20,7 +20,7 @@ use std::hash::{Hash, Hasher};
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertExpr};
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use common_grpc::writer::Precision::Millisecond;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
@@ -339,21 +339,20 @@ fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result<
}
// TODO(fys): it will remove in the future.
/// Cast a remote write request into gRPC's InsertExpr.
pub fn write_request_to_insert_exprs(
pub fn to_grpc_insert_requests(
database: &str,
mut request: WriteRequest,
) -> Result<Vec<InsertExpr>> {
) -> Result<Vec<GrpcInsertRequest>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries
.into_iter()
.map(|timeseries| timeseries_to_insert_expr(database, timeseries))
.map(|timeseries| to_grpc_insert_request(database, timeseries))
.collect()
}
// TODO(fys): it will remove in the future.
fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Result<InsertExpr> {
fn to_grpc_insert_request(database: &str, mut timeseries: TimeSeries) -> Result<GrpcInsertRequest> {
let schema_name = database.to_string();
// TODO(dennis): save exemplars into a column
@@ -411,7 +410,7 @@ fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Resu
});
}
Ok(InsertExpr {
Ok(GrpcInsertRequest {
schema_name,
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
@@ -666,7 +665,7 @@ mod tests {
..Default::default()
};
let exprs = write_request_to_insert_exprs("prometheus", write_request).unwrap();
let exprs = to_grpc_insert_requests("prometheus", write_request).unwrap();
assert_eq!(3, exprs.len());
assert_eq!("prometheus", exprs[0].schema_name);
assert_eq!("prometheus", exprs[1].schema_name);

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::v1::InsertExpr;
use api::v1::InsertRequest;
use async_trait::async_trait;
use axum::{http, Router};
use axum_test_helper::TestClient;
@@ -34,9 +34,9 @@ struct DummyInstance {
#[async_trait]
impl InfluxdbLineProtocolHandler for DummyInstance {
async fn exec(&self, request: &InfluxdbRequest) -> Result<()> {
let exprs: Vec<InsertExpr> = request.try_into()?;
let requests: Vec<InsertRequest> = request.try_into()?;
for expr in exprs {
for expr in requests {
let _ = self.tx.send((expr.schema_name, expr.table_name)).await;
}

View File

@@ -15,12 +15,11 @@ use api::v1::alter_expr::Kind;
use api::v1::column::SemanticType;
use api::v1::{
admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateTableExpr, InsertExpr, MutateResult, TableId,
CreateTableExpr, InsertRequest, MutateResult, TableId,
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
use client::{Client, Database, RpcOutput};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc::flight::{flight_messages_to_recordbatches, FlightMessage};
use servers::server::Server;
use tests_integration::test_util::{setup_grpc_server, StorageType};
@@ -180,7 +179,7 @@ async fn insert_and_assert(db: &Database) {
// testing data:
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();
let expr = InsertExpr {
let request = InsertRequest {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
region_number: 0,
@@ -192,7 +191,7 @@ async fn insert_and_assert(db: &Database) {
],
row_count: 4,
};
let result = db.insert(expr).await;
let result = db.insert(request).await;
result.unwrap();
let result = db
@@ -203,16 +202,12 @@ async fn insert_and_assert(db: &Database) {
)
.await
.unwrap();
assert!(matches!(result, ObjectResult::FlightData(_)));
let ObjectResult::FlightData(mut messages) = result else { unreachable!() };
assert_eq!(messages.len(), 1);
assert!(matches!(messages.remove(0), FlightMessage::AffectedRows(2)));
assert!(matches!(result, RpcOutput::AffectedRows(2)));
// select
let result = db.sql("SELECT * FROM demo").await.unwrap();
match result {
ObjectResult::FlightData(flight_messages) => {
let recordbatches = flight_messages_to_recordbatches(flight_messages).unwrap();
RpcOutput::RecordBatches(recordbatches) => {
let pretty = recordbatches.pretty_print().unwrap();
let expected = "\
+-------+------+--------+-------------------------+

View File

@@ -18,9 +18,7 @@ use std::process::Stdio;
use std::time::Duration;
use async_trait::async_trait;
use client::{Client, Database as DB, Error as ClientError, ObjectResult};
use common_grpc::flight;
use common_grpc::flight::FlightMessage;
use client::{Client, Database as DB, Error as ClientError, RpcOutput};
use sqlness::{Database, Environment};
use tokio::process::{Child, Command};
@@ -119,29 +117,22 @@ impl Database for GreptimeDB {
}
struct ResultDisplayer {
result: Result<ObjectResult, ClientError>,
result: Result<RpcOutput, ClientError>,
}
impl Display for ResultDisplayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.result {
Ok(result) => match result {
ObjectResult::Mutate(mutate_result) => {
write!(f, "{mutate_result:?}")
RpcOutput::AffectedRows(rows) => {
write!(f, "Affected Rows: {rows}")
}
ObjectResult::FlightData(messages) => {
if let Some(FlightMessage::AffectedRows(rows)) = messages.get(0) {
write!(f, "Affected Rows: {rows}")
} else {
let pretty = flight::flight_messages_to_recordbatches(messages.clone())
.map_err(|e| e.to_string())
.and_then(|x| x.pretty_print().map_err(|e| e.to_string()));
match pretty {
Ok(s) => write!(f, "{s}"),
Err(e) => write!(
f,
"Failed to convert Flight messages {messages:?} to Recordbatches, error: {e}"
),
RpcOutput::RecordBatches(recordbatches) => {
let pretty = recordbatches.pretty_print().map_err(|e| e.to_string());
match pretty {
Ok(s) => write!(f, "{s}"),
Err(e) => {
write!(f, "Failed to pretty format {recordbatches:?}, error: {e}")
}
}
}