feat: streaming do_get (#2171)

* feat: rewrite do_get for streaming get flight data

* feat: rewrite do_get call stack but leave the async stream adapter not modified yet

* feat: rewrite the async stream adapter to accept greptime record batch stream

* fix: resolve some PR comments

* feat: rewrite tests to adapt to the streaming do_get

* feat: add unit tests for streaming do_get

* feat: rewrite timer metric of merge scan

* remove unhelpful unit tests for streaming do_get

* add a new metric timer for merge scan and fix some test errors

* rewrite mysql writer to write query results in a streaming manner

* fix: fix fmt errors

* fix: rewrite sqlness runner to take into account the streaming do_get

* fix: fix toml format errors

* fix: resolve some PR comments

* fix: resolve some PR comments

* fix: refactor do_get to increase readability

* fix: refactor mysql try_write_one to increase readability
This commit is contained in:
niebayes
2023-08-22 10:54:05 +08:00
committed by GitHub
parent 5b7b2cf77d
commit e5f4ca2dab
11 changed files with 224 additions and 201 deletions

1
Cargo.lock generated
View File

@@ -9241,6 +9241,7 @@ dependencies = [
"common-error",
"common-grpc",
"common-query",
"common-recordbatch",
"common-time",
"serde",
"sqlness",

View File

@@ -21,16 +21,19 @@ use api::v1::{
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest,
RequestHeader, TruncateTableExpr,
};
use arrow_flight::{FlightData, Ticket};
use arrow_flight::Ticket;
use async_stream::stream;
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage};
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamAdaptor;
use common_telemetry::{logging, timer};
use futures_util::{TryFutureExt, TryStreamExt};
use futures_util::StreamExt;
use prost::Message;
use snafu::{ensure, ResultExt};
use crate::error::{ConvertFlightDataSnafu, IllegalFlightMessagesSnafu, ServerSnafu};
use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu};
use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter};
#[derive(Clone, Debug, Default)]
@@ -283,55 +286,81 @@ impl Database {
let mut client = self.client.make_flight_client()?;
let flight_data: Vec<FlightData> = client
.mut_inner()
.do_get(request)
.and_then(|response| response.into_inner().try_collect())
.await
.map_err(|e| {
let tonic_code = e.code();
let e: error::Error = e.into();
let code = e.status_code();
let msg = e.to_string();
ServerSnafu { code, msg }
.fail::<()>()
.map_err(BoxedError::new)
.context(error::FlightGetSnafu {
tonic_code,
addr: client.addr(),
})
.map_err(|error| {
logging::error!(
"Failed to do Flight get, addr: {}, code: {}, source: {}",
client.addr(),
tonic_code,
error
);
error
})
.unwrap_err()
})?;
let decoder = &mut FlightDecoder::default();
let flight_messages = flight_data
.into_iter()
.map(|x| decoder.try_decode(x).context(ConvertFlightDataSnafu))
.collect::<Result<Vec<_>>>()?;
let output = if let Some(FlightMessage::AffectedRows(rows)) = flight_messages.get(0) {
ensure!(
flight_messages.len() == 1,
IllegalFlightMessagesSnafu {
reason: "Expect 'AffectedRows' Flight messages to be one and only!"
}
let response = client.mut_inner().do_get(request).await.map_err(|e| {
let tonic_code = e.code();
let e: error::Error = e.into();
let code = e.status_code();
let msg = e.to_string();
let error = Error::FlightGet {
tonic_code,
addr: client.addr().to_string(),
source: BoxedError::new(ServerSnafu { code, msg }.build()),
};
logging::error!(
"Failed to do Flight get, addr: {}, code: {}, source: {}",
client.addr(),
tonic_code,
error
);
Output::AffectedRows(*rows)
} else {
let recordbatches = flight_messages_to_recordbatches(flight_messages)
.context(ConvertFlightDataSnafu)?;
Output::RecordBatches(recordbatches)
error
})?;
let flight_data_stream = response.into_inner();
let mut decoder = FlightDecoder::default();
let mut flight_message_stream = flight_data_stream.map(move |flight_data| {
flight_data
.map_err(Error::from)
.and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu))
});
let Some(first_flight_message) = flight_message_stream.next().await else {
return IllegalFlightMessagesSnafu {
reason: "Expect the response not to be empty",
}
.fail();
};
Ok(output)
let first_flight_message = first_flight_message?;
match first_flight_message {
FlightMessage::AffectedRows(rows) => {
ensure!(
flight_message_stream.next().await.is_none(),
IllegalFlightMessagesSnafu {
reason: "Expect 'AffectedRows' Flight messages to be the one and the only!"
}
);
Ok(Output::AffectedRows(rows))
}
FlightMessage::Recordbatch(_) => IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch message",
}
.fail(),
FlightMessage::Schema(schema) => {
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
}
}));
let record_batch_stream = RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
}
}
}
}

View File

@@ -34,13 +34,8 @@ use crate::{
SendableRecordBatchStream, Stream,
};
type FutureStream = Pin<
Box<
dyn std::future::Future<
Output = std::result::Result<DfSendableRecordBatchStream, DataFusionError>,
> + Send,
>,
>;
type FutureStream =
Pin<Box<dyn std::future::Future<Output = Result<SendableRecordBatchStream>> + Send>>;
/// ParquetRecordBatchStream -> DataFusion RecordBatchStream
pub struct ParquetRecordBatchStreamAdapter<T> {
@@ -223,7 +218,7 @@ impl Stream for RecordBatchStreamAdapter {
enum AsyncRecordBatchStreamAdapterState {
Uninit(FutureStream),
Ready(DfSendableRecordBatchStream),
Ready(SendableRecordBatchStream),
Failed,
}
@@ -261,17 +256,12 @@ impl Stream for AsyncRecordBatchStreamAdapter {
}
Err(e) => {
self.state = AsyncRecordBatchStreamAdapterState::Failed;
return Poll::Ready(Some(
Err(e).context(error::InitRecordbatchStreamSnafu),
));
return Poll::Ready(Some(Err(e)));
}
};
}
AsyncRecordBatchStreamAdapterState::Ready(stream) => {
return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)).map(|x| {
let df_record_batch = x.context(error::PollStreamSnafu)?;
RecordBatch::try_from_df_record_batch(self.schema(), df_record_batch)
}))
return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)))
}
AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
}
@@ -330,12 +320,7 @@ mod test {
) -> FutureStream {
Box::pin(async move {
maybe_recordbatches
.map(|items| {
Box::pin(DfRecordBatchStreamAdapter::new(Box::pin(
MaybeErrorRecordBatchStream { items },
))) as _
})
.map_err(|e| DataFusionError::External(Box::new(e)))
.map(|items| Box::pin(MaybeErrorRecordBatchStream { items }) as _)
})
}
@@ -372,7 +357,7 @@ mod test {
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"Failed to poll stream, source: External error: External error, source: Unknown"
"External error, source: Unknown",
);
let failed_to_init_stream =
@@ -382,7 +367,7 @@ mod test {
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"Failed to init Recordbatch stream, source: External error: External error, source: Internal"
"External error, source: Internal",
);
}
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
@@ -23,30 +22,24 @@ use common_meta::table_name::TableName;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
use common_query::Output;
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use common_recordbatch::error::{
InitRecordbatchStreamSnafu, PollStreamSnafu, Result as RecordBatchResult,
};
use common_recordbatch::{
RecordBatch, RecordBatchStreamAdaptor, RecordBatches, SendableRecordBatchStream,
ExternalSnafu as RecordBatchExternalSnafu, Result as RecordBatchResult,
};
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::{
Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream,
};
use datafusion_common::DataFusionError;
use datafusion::physical_plan::Partitioning;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use futures_util::{Stream, StreamExt};
use futures_util::StreamExt;
use snafu::prelude::*;
use store_api::storage::ScanRequest;
use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfoRef, TableType};
use table::requests::{DeleteRequest, InsertRequest};
use table::Table;
use tokio::sync::RwLock;
use crate::catalog::FrontendCatalogManager;
use crate::error::Result;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::scan::{DatanodeInstance, TableScanPlan};
@@ -132,35 +125,26 @@ impl Table for DistTable {
projection: request.projection.clone(),
filters: request.filters.clone(),
limit: request.limit,
batches: Arc::new(RwLock::new(None)),
}));
}
let schema = project_schema(self.schema(), request.projection.as_ref());
let schema_to_move = schema.clone();
let stream: Pin<Box<dyn Stream<Item = RecordBatchResult<RecordBatch>> + Send>> = Box::pin(
async_stream::try_stream! {
for partition_exec in partition_execs {
partition_exec
.maybe_init()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))
.context(InitRecordbatchStreamSnafu)?;
let mut stream = partition_exec.as_stream().await.context(InitRecordbatchStreamSnafu)?;
while let Some(batch) = stream.next().await{
yield RecordBatch::try_from_df_record_batch(schema_to_move.clone(),batch.context(PollStreamSnafu)?)?
}
let stream = Box::pin(async_stream::stream!({
for partition_exec in partition_execs {
let mut stream = partition_exec.scan_to_stream().await?;
while let Some(record_batch) = stream.next().await {
yield record_batch;
}
},
);
let record_batch_stream = RecordBatchStreamAdaptor {
}
}));
let schema = project_schema(self.schema(), request.projection.as_ref());
let stream = RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
};
Ok(Box::pin(record_batch_stream))
Ok(Box::pin(stream))
}
fn supports_filters_pushdown(
@@ -245,12 +229,7 @@ impl PhysicalPlan for DistTableScan {
_context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let exec = self.partition_execs[partition].clone();
let stream = Box::pin(async move {
exec.maybe_init()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
exec.as_stream().await
});
let stream = Box::pin(async move { exec.scan_to_stream().await });
let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream);
Ok(Box::pin(stream))
}
@@ -263,38 +242,29 @@ struct PartitionExec {
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
limit: Option<usize>,
batches: Arc<RwLock<Option<RecordBatches>>>,
}
impl PartitionExec {
async fn maybe_init(&self) -> Result<()> {
if self.batches.read().await.is_some() {
return Ok(());
}
let mut batches = self.batches.write().await;
if batches.is_some() {
return Ok(());
}
async fn scan_to_stream(&self) -> RecordBatchResult<SendableRecordBatchStream> {
let plan: TableScanPlan = TableScanPlan {
table_name: self.table_name.clone(),
projection: self.projection.clone(),
filters: self.filters.clone(),
limit: self.limit,
};
let result = self.datanode_instance.grpc_table_scan(plan).await?;
let _ = batches.insert(result);
Ok(())
}
/// Notice: the record batch will be consumed.
async fn as_stream(&self) -> std::result::Result<DfSendableRecordBatchStream, DataFusionError> {
let mut batches = self.batches.write().await;
Ok(batches
.take()
.expect("should have been initialized in \"maybe_init\"")
.into_df_stream())
let output = self
.datanode_instance
.grpc_table_scan(plan)
.await
.map_err(BoxedError::new)
.context(RecordBatchExternalSnafu)?;
let Output::Stream(stream) = output else {
unreachable!()
};
Ok(stream)
}
}

View File

@@ -19,7 +19,6 @@ use client::Database;
use common_meta::table_name::TableName;
use common_query::prelude::Expr;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datafusion::datasource::DefaultTableSource;
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder};
use snafu::ResultExt;
@@ -46,22 +45,17 @@ impl DatanodeInstance {
Self { table, db }
}
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result<RecordBatches> {
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result<Output> {
let logical_plan = self.build_logical_plan(&plan)?;
let substrait_plan = DFLogicalSubstraitConvertor
.encode(&logical_plan)
.context(error::EncodeSubstraitLogicalPlanSnafu)?;
let result = self
.db
self.db
.logical_plan(substrait_plan.to_vec(), None)
.await
.context(error::RequestDatanodeSnafu)?;
let Output::RecordBatches(record_batches) = result else {
unreachable!()
};
Ok(record_batches)
.context(error::RequestDatanodeSnafu)
}
fn build_logical_plan(&self, table_scan: &TableScanPlan) -> Result<LogicalPlan> {

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::sync::Arc;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use async_stream::try_stream;
use async_stream::stream;
use client::client_manager::DatanodeClients;
use client::Database;
use common_base::bytes::Bytes;
@@ -149,11 +149,14 @@ impl MergeScanExec {
let trace_id = context.task_id().and_then(|id| id.parse().ok());
let metric = MergeScanMetric::new(&self.metric);
let stream = try_stream! {
let stream = Box::pin(stream!({
let _finish_timer = metric.finish_time().timer();
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
for peer in peers {
let client = clients.get_client(&peer).await;
let database = Database::new(&table.catalog_name, &table.schema_name, client);
let _timer = metric.grpc_time().timer();
let output: Output = database
.logical_plan(substrait_plan.clone(), trace_id)
.await
@@ -161,37 +164,34 @@ impl MergeScanExec {
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
match output {
Output::AffectedRows(_) => {
Err(BoxedError::new(
UnexpectedOutputKindSnafu {
expected: "RecordBatches or Stream",
got: "AffectedRows",
}
.build(),
))
.context(ExternalSnafu)?;
let Output::Stream(mut stream) = output else {
yield UnexpectedOutputKindSnafu {
expected: "Stream",
got: "RecordBatches or AffectedRows",
}
Output::RecordBatches(record_batches) => {
for batch in record_batches.into_iter() {
metric.record_output_batch_rows(batch.num_rows());
yield Self::remove_metadata_from_record_batch(batch);
}
}
Output::Stream(mut stream) => {
while let Some(batch) = stream.next().await {
let batch = batch?;
metric.record_output_batch_rows(batch.num_rows());
yield Self::remove_metadata_from_record_batch(batch);
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
return;
};
ready_timer.stop();
while let Some(batch) = stream.next().await {
let batch = batch?;
metric.record_output_batch_rows(batch.num_rows());
yield Ok(Self::remove_metadata_from_record_batch(batch));
if let Some(first_consume_timer) = first_consume_timer.as_mut().take() {
first_consume_timer.stop();
}
}
}
};
}));
Ok(Box::pin(RecordBatchStreamAdaptor {
schema: self.schema.clone(),
stream: Box::pin(stream),
stream,
output_ordering: None,
}))
}
@@ -285,8 +285,12 @@ impl DisplayAs for MergeScanExec {
#[derive(Debug, Clone)]
struct MergeScanMetric {
/// Nanosecond spent on fetching data from remote
grpc_time: Time,
/// Nanosecond elapsed till the scan operator is ready to emit data
ready_time: Time,
/// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
first_consume_time: Time,
/// Nanosecond elapsed till the scan operator finished execution
finish_time: Time,
/// Count of rows fetched from remote
output_rows: Count,
}
@@ -294,13 +298,23 @@ struct MergeScanMetric {
impl MergeScanMetric {
pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
Self {
grpc_time: MetricBuilder::new(metric).subset_time("gRPC", 1),
ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
output_rows: MetricBuilder::new(metric).output_rows(1),
}
}
pub fn grpc_time(&self) -> &Time {
&self.grpc_time
pub fn ready_time(&self) -> &Time {
&self.ready_time
}
pub fn first_consume_time(&self) -> &Time {
&self.first_consume_time
}
pub fn finish_time(&self) -> &Time {
&self.finish_time
}
pub fn record_output_batch_rows(&self, num_rows: usize) {

View File

@@ -14,10 +14,12 @@
use std::ops::Deref;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use metrics::increment_counter;
use opensrv_mysql::{
Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
@@ -26,7 +28,7 @@ use session::context::QueryContextRef;
use snafu::prelude::*;
use tokio::io::AsyncWrite;
use crate::error::{self, Error, Result};
use crate::error::{self, Error, OtherSnafu, Result};
use crate::metrics::*;
/// Try to write multiple output to the writer if possible.
@@ -50,8 +52,8 @@ pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
}
struct QueryResult {
recordbatches: Vec<RecordBatch>,
schema: SchemaRef,
stream: SendableRecordBatchStream,
}
pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
@@ -80,20 +82,16 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
match output {
Ok(output) => match output {
Output::Stream(stream) => {
let schema = stream.schema().clone();
let recordbatches = util::collect(stream)
.await
.context(error::CollectRecordbatchSnafu)?;
let query_result = QueryResult {
recordbatches,
schema,
schema: stream.schema(),
stream,
};
Self::write_query_result(query_result, self.writer, self.query_context).await?;
}
Output::RecordBatches(recordbatches) => {
let query_result = QueryResult {
schema: recordbatches.schema(),
recordbatches: recordbatches.take(),
stream: recordbatches.as_stream(),
};
Self::write_query_result(query_result, self.writer, self.query_context).await?;
}
@@ -130,7 +128,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
}
async fn write_query_result(
query_result: QueryResult,
mut query_result: QueryResult,
writer: QueryResultWriter<'a, W>,
query_context: QueryContextRef,
) -> Result<()> {
@@ -139,9 +137,20 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
// The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
// to return a new QueryResultWriter.
let mut row_writer = writer.start(&column_def).await?;
for recordbatch in &query_result.recordbatches {
Self::write_recordbatch(&mut row_writer, recordbatch, query_context.clone())
.await?;
while let Some(record_batch) = query_result.stream.next().await {
match record_batch {
Ok(record_batch) => {
Self::write_recordbatch(
&mut row_writer,
&record_batch,
query_context.clone(),
)
.await?
}
Err(e) => {
return Err(e).map_err(BoxedError::new).context(OtherSnafu);
}
}
}
row_writer.finish().await?;
Ok(())

View File

@@ -17,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
@@ -54,8 +55,8 @@ impl MemTable {
table_name,
recordbatch,
1,
"greptime".to_string(),
"public".to_string(),
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
regions,
)
}

View File

@@ -23,6 +23,7 @@ use auth::user_provider_from_option;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_query::Output;
use common_recordbatch::RecordBatches;
use servers::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse};
use servers::server::Server;
use tests_integration::test_util::{
@@ -310,14 +311,19 @@ async fn insert_and_assert(db: &Database) {
assert!(matches!(result, Output::AffectedRows(2)));
// select
let result = db
let output = db
.sql("SELECT host, cpu, memory, ts FROM demo")
.await
.unwrap();
match result {
Output::RecordBatches(recordbatches) => {
let pretty = recordbatches.pretty_print().unwrap();
let expected = "\
let record_batches = match output {
Output::RecordBatches(record_batches) => record_batches,
Output::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(),
Output::AffectedRows(_) => unreachable!(),
};
let pretty = record_batches.pretty_print().unwrap();
let expected = "\
+-------+------+--------+-------------------------+
| host | cpu | memory | ts |
+-------+------+--------+-------------------------+
@@ -329,10 +335,7 @@ async fn insert_and_assert(db: &Database) {
| host6 | 88.8 | 333.3 | 2022-12-28T04:17:08 |
+-------+------+--------+-------------------------+\
";
assert_eq!(pretty, expected);
}
_ => unreachable!(),
}
assert_eq!(pretty, expected);
}
fn testing_create_expr() -> CreateTableExpr {

View File

@@ -11,6 +11,7 @@ common-base = { workspace = true }
common-error = { workspace = true }
common-grpc = { workspace = true }
common-query = { workspace = true }
common-recordbatch = { workspace = true }
common-time = { workspace = true }
serde.workspace = true
sqlness = { version = "0.5" }

View File

@@ -23,12 +23,14 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use client::error::ServerSnafu;
use client::{
Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use common_error::ext::ErrorExt;
use common_error::snafu::ErrorCompat;
use common_query::Output;
use common_recordbatch::RecordBatches;
use serde::Serialize;
use sqlness::{Database, EnvController, QueryContext};
use tinytemplate::TinyTemplate;
@@ -358,7 +360,21 @@ impl Database for GreptimeDB {
result: Ok(Output::AffectedRows(0)),
}) as _
} else {
let result = client.sql(&query).await;
let mut result = client.sql(&query).await;
if let Ok(Output::Stream(stream)) = result {
match RecordBatches::try_collect(stream).await {
Ok(recordbatches) => result = Ok(Output::RecordBatches(recordbatches)),
Err(e) => {
let status_code = e.status_code();
let source_error = e.iter_chain().last().unwrap();
result = ServerSnafu {
code: status_code,
msg: source_error.to_string(),
}
.fail();
}
}
}
Box::new(ResultDisplayer { result }) as _
}
}