diff --git a/Cargo.lock b/Cargo.lock index bba4a44998..aaa49b2af1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9241,6 +9241,7 @@ dependencies = [ "common-error", "common-grpc", "common-query", + "common-recordbatch", "common-time", "serde", "sqlness", diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 5bd4fc79e1..90228ba559 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -21,16 +21,19 @@ use api::v1::{ DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, RequestHeader, TruncateTableExpr, }; -use arrow_flight::{FlightData, Ticket}; +use arrow_flight::Ticket; +use async_stream::stream; use common_error::ext::{BoxedError, ErrorExt}; -use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage}; +use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::RecordBatchStreamAdaptor; use common_telemetry::{logging, timer}; -use futures_util::{TryFutureExt, TryStreamExt}; +use futures_util::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; -use crate::error::{ConvertFlightDataSnafu, IllegalFlightMessagesSnafu, ServerSnafu}; +use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter}; #[derive(Clone, Debug, Default)] @@ -283,55 +286,81 @@ impl Database { let mut client = self.client.make_flight_client()?; - let flight_data: Vec = client - .mut_inner() - .do_get(request) - .and_then(|response| response.into_inner().try_collect()) - .await - .map_err(|e| { - let tonic_code = e.code(); - let e: error::Error = e.into(); - let code = e.status_code(); - let msg = e.to_string(); - ServerSnafu { code, msg } - .fail::<()>() - .map_err(BoxedError::new) - .context(error::FlightGetSnafu { - tonic_code, - addr: client.addr(), - }) - .map_err(|error| { - logging::error!( - "Failed to do Flight get, addr: {}, code: {}, source: {}", - client.addr(), - tonic_code, - error - ); - error - }) - .unwrap_err() - })?; - - let decoder = &mut FlightDecoder::default(); - let flight_messages = flight_data - .into_iter() - .map(|x| decoder.try_decode(x).context(ConvertFlightDataSnafu)) - .collect::>>()?; - - let output = if let Some(FlightMessage::AffectedRows(rows)) = flight_messages.get(0) { - ensure!( - flight_messages.len() == 1, - IllegalFlightMessagesSnafu { - reason: "Expect 'AffectedRows' Flight messages to be one and only!" - } + let response = client.mut_inner().do_get(request).await.map_err(|e| { + let tonic_code = e.code(); + let e: error::Error = e.into(); + let code = e.status_code(); + let msg = e.to_string(); + let error = Error::FlightGet { + tonic_code, + addr: client.addr().to_string(), + source: BoxedError::new(ServerSnafu { code, msg }.build()), + }; + logging::error!( + "Failed to do Flight get, addr: {}, code: {}, source: {}", + client.addr(), + tonic_code, + error ); - Output::AffectedRows(*rows) - } else { - let recordbatches = flight_messages_to_recordbatches(flight_messages) - .context(ConvertFlightDataSnafu)?; - Output::RecordBatches(recordbatches) + error + })?; + + let flight_data_stream = response.into_inner(); + let mut decoder = FlightDecoder::default(); + + let mut flight_message_stream = flight_data_stream.map(move |flight_data| { + flight_data + .map_err(Error::from) + .and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu)) + }); + + let Some(first_flight_message) = flight_message_stream.next().await else { + return IllegalFlightMessagesSnafu { + reason: "Expect the response not to be empty", + } + .fail(); }; - Ok(output) + + let first_flight_message = first_flight_message?; + + match first_flight_message { + FlightMessage::AffectedRows(rows) => { + ensure!( + flight_message_stream.next().await.is_none(), + IllegalFlightMessagesSnafu { + reason: "Expect 'AffectedRows' Flight messages to be the one and the only!" + } + ); + Ok(Output::AffectedRows(rows)) + } + FlightMessage::Recordbatch(_) => IllegalFlightMessagesSnafu { + reason: "The first flight message cannot be a RecordBatch message", + } + .fail(), + FlightMessage::Schema(schema) => { + let stream = Box::pin(stream!({ + while let Some(flight_message) = flight_message_stream.next().await { + let flight_message = flight_message + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let FlightMessage::Recordbatch(record_batch) = flight_message else { + yield IllegalFlightMessagesSnafu {reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"} + .fail() + .map_err(BoxedError::new) + .context(ExternalSnafu); + break; + }; + yield Ok(record_batch); + } + })); + let record_batch_stream = RecordBatchStreamAdaptor { + schema, + stream, + output_ordering: None, + }; + Ok(Output::Stream(Box::pin(record_batch_stream))) + } + } } } diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 93fe9a3a5b..6c113562cd 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -34,13 +34,8 @@ use crate::{ SendableRecordBatchStream, Stream, }; -type FutureStream = Pin< - Box< - dyn std::future::Future< - Output = std::result::Result, - > + Send, - >, ->; +type FutureStream = + Pin> + Send>>; /// ParquetRecordBatchStream -> DataFusion RecordBatchStream pub struct ParquetRecordBatchStreamAdapter { @@ -223,7 +218,7 @@ impl Stream for RecordBatchStreamAdapter { enum AsyncRecordBatchStreamAdapterState { Uninit(FutureStream), - Ready(DfSendableRecordBatchStream), + Ready(SendableRecordBatchStream), Failed, } @@ -261,17 +256,12 @@ impl Stream for AsyncRecordBatchStreamAdapter { } Err(e) => { self.state = AsyncRecordBatchStreamAdapterState::Failed; - return Poll::Ready(Some( - Err(e).context(error::InitRecordbatchStreamSnafu), - )); + return Poll::Ready(Some(Err(e))); } }; } AsyncRecordBatchStreamAdapterState::Ready(stream) => { - return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)).map(|x| { - let df_record_batch = x.context(error::PollStreamSnafu)?; - RecordBatch::try_from_df_record_batch(self.schema(), df_record_batch) - })) + return Poll::Ready(ready!(Pin::new(stream).poll_next(cx))) } AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None), } @@ -330,12 +320,7 @@ mod test { ) -> FutureStream { Box::pin(async move { maybe_recordbatches - .map(|items| { - Box::pin(DfRecordBatchStreamAdapter::new(Box::pin( - MaybeErrorRecordBatchStream { items }, - ))) as _ - }) - .map_err(|e| DataFusionError::External(Box::new(e))) + .map(|items| Box::pin(MaybeErrorRecordBatchStream { items }) as _) }) } @@ -372,7 +357,7 @@ mod test { let result = RecordBatches::try_collect(Box::pin(adapter)).await; assert_eq!( result.unwrap_err().to_string(), - "Failed to poll stream, source: External error: External error, source: Unknown" + "External error, source: Unknown", ); let failed_to_init_stream = @@ -382,7 +367,7 @@ mod test { let result = RecordBatches::try_collect(Box::pin(adapter)).await; assert_eq!( result.unwrap_err().to_string(), - "Failed to init Recordbatch stream, source: External error: External error, source: Internal" + "External error, source: Internal", ); } } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 281dcdab26..f6cad01041 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; @@ -23,30 +22,24 @@ use common_meta::table_name::TableName; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; +use common_query::Output; use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; use common_recordbatch::error::{ - InitRecordbatchStreamSnafu, PollStreamSnafu, Result as RecordBatchResult, -}; -use common_recordbatch::{ - RecordBatch, RecordBatchStreamAdaptor, RecordBatches, SendableRecordBatchStream, + ExternalSnafu as RecordBatchExternalSnafu, Result as RecordBatchResult, }; +use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::{ - Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream, -}; -use datafusion_common::DataFusionError; +use datafusion::physical_plan::Partitioning; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use futures_util::{Stream, StreamExt}; +use futures_util::StreamExt; use snafu::prelude::*; use store_api::storage::ScanRequest; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfoRef, TableType}; use table::requests::{DeleteRequest, InsertRequest}; use table::Table; -use tokio::sync::RwLock; use crate::catalog::FrontendCatalogManager; -use crate::error::Result; use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; use crate::table::scan::{DatanodeInstance, TableScanPlan}; @@ -132,35 +125,26 @@ impl Table for DistTable { projection: request.projection.clone(), filters: request.filters.clone(), limit: request.limit, - batches: Arc::new(RwLock::new(None)), })); } - let schema = project_schema(self.schema(), request.projection.as_ref()); - let schema_to_move = schema.clone(); - let stream: Pin> + Send>> = Box::pin( - async_stream::try_stream! { - for partition_exec in partition_execs { - partition_exec - .maybe_init() - .await - .map_err(|e| DataFusionError::External(Box::new(e))) - .context(InitRecordbatchStreamSnafu)?; - let mut stream = partition_exec.as_stream().await.context(InitRecordbatchStreamSnafu)?; - - while let Some(batch) = stream.next().await{ - yield RecordBatch::try_from_df_record_batch(schema_to_move.clone(),batch.context(PollStreamSnafu)?)? - } + let stream = Box::pin(async_stream::stream!({ + for partition_exec in partition_execs { + let mut stream = partition_exec.scan_to_stream().await?; + while let Some(record_batch) = stream.next().await { + yield record_batch; } - }, - ); - let record_batch_stream = RecordBatchStreamAdaptor { + } + })); + + let schema = project_schema(self.schema(), request.projection.as_ref()); + let stream = RecordBatchStreamAdaptor { schema, stream, output_ordering: None, }; - Ok(Box::pin(record_batch_stream)) + Ok(Box::pin(stream)) } fn supports_filters_pushdown( @@ -245,12 +229,7 @@ impl PhysicalPlan for DistTableScan { _context: Arc, ) -> QueryResult { let exec = self.partition_execs[partition].clone(); - let stream = Box::pin(async move { - exec.maybe_init() - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; - exec.as_stream().await - }); + let stream = Box::pin(async move { exec.scan_to_stream().await }); let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream); Ok(Box::pin(stream)) } @@ -263,38 +242,29 @@ struct PartitionExec { projection: Option>, filters: Vec, limit: Option, - batches: Arc>>, } impl PartitionExec { - async fn maybe_init(&self) -> Result<()> { - if self.batches.read().await.is_some() { - return Ok(()); - } - - let mut batches = self.batches.write().await; - if batches.is_some() { - return Ok(()); - } - + async fn scan_to_stream(&self) -> RecordBatchResult { let plan: TableScanPlan = TableScanPlan { table_name: self.table_name.clone(), projection: self.projection.clone(), filters: self.filters.clone(), limit: self.limit, }; - let result = self.datanode_instance.grpc_table_scan(plan).await?; - let _ = batches.insert(result); - Ok(()) - } - /// Notice: the record batch will be consumed. - async fn as_stream(&self) -> std::result::Result { - let mut batches = self.batches.write().await; - Ok(batches - .take() - .expect("should have been initialized in \"maybe_init\"") - .into_df_stream()) + let output = self + .datanode_instance + .grpc_table_scan(plan) + .await + .map_err(BoxedError::new) + .context(RecordBatchExternalSnafu)?; + + let Output::Stream(stream) = output else { + unreachable!() + }; + + Ok(stream) } } diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index 979eb6ca64..d149ef2805 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -19,7 +19,6 @@ use client::Database; use common_meta::table_name::TableName; use common_query::prelude::Expr; use common_query::Output; -use common_recordbatch::RecordBatches; use datafusion::datasource::DefaultTableSource; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; use snafu::ResultExt; @@ -46,22 +45,17 @@ impl DatanodeInstance { Self { table, db } } - pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { + pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { let logical_plan = self.build_logical_plan(&plan)?; let substrait_plan = DFLogicalSubstraitConvertor .encode(&logical_plan) .context(error::EncodeSubstraitLogicalPlanSnafu)?; - let result = self - .db + self.db .logical_plan(substrait_plan.to_vec(), None) .await - .context(error::RequestDatanodeSnafu)?; - let Output::RecordBatches(record_batches) = result else { - unreachable!() - }; - Ok(record_batches) + .context(error::RequestDatanodeSnafu) } fn build_logical_plan(&self, table_scan: &TableScanPlan) -> Result { diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index cebfd877f1..af53f446b2 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::sync::Arc; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; -use async_stream::try_stream; +use async_stream::stream; use client::client_manager::DatanodeClients; use client::Database; use common_base::bytes::Bytes; @@ -149,11 +149,14 @@ impl MergeScanExec { let trace_id = context.task_id().and_then(|id| id.parse().ok()); let metric = MergeScanMetric::new(&self.metric); - let stream = try_stream! { + let stream = Box::pin(stream!({ + let _finish_timer = metric.finish_time().timer(); + let mut ready_timer = metric.ready_time().timer(); + let mut first_consume_timer = Some(metric.first_consume_time().timer()); + for peer in peers { let client = clients.get_client(&peer).await; let database = Database::new(&table.catalog_name, &table.schema_name, client); - let _timer = metric.grpc_time().timer(); let output: Output = database .logical_plan(substrait_plan.clone(), trace_id) .await @@ -161,37 +164,34 @@ impl MergeScanExec { .map_err(BoxedError::new) .context(ExternalSnafu)?; - match output { - Output::AffectedRows(_) => { - Err(BoxedError::new( - UnexpectedOutputKindSnafu { - expected: "RecordBatches or Stream", - got: "AffectedRows", - } - .build(), - )) - .context(ExternalSnafu)?; + let Output::Stream(mut stream) = output else { + yield UnexpectedOutputKindSnafu { + expected: "Stream", + got: "RecordBatches or AffectedRows", } - Output::RecordBatches(record_batches) => { - for batch in record_batches.into_iter() { - metric.record_output_batch_rows(batch.num_rows()); - yield Self::remove_metadata_from_record_batch(batch); - } - } - Output::Stream(mut stream) => { - while let Some(batch) = stream.next().await { - let batch = batch?; - metric.record_output_batch_rows(batch.num_rows()); - yield Self::remove_metadata_from_record_batch(batch); - } + .fail() + .map_err(BoxedError::new) + .context(ExternalSnafu); + return; + }; + + ready_timer.stop(); + + while let Some(batch) = stream.next().await { + let batch = batch?; + metric.record_output_batch_rows(batch.num_rows()); + yield Ok(Self::remove_metadata_from_record_batch(batch)); + + if let Some(first_consume_timer) = first_consume_timer.as_mut().take() { + first_consume_timer.stop(); } } } - }; + })); Ok(Box::pin(RecordBatchStreamAdaptor { schema: self.schema.clone(), - stream: Box::pin(stream), + stream, output_ordering: None, })) } @@ -285,8 +285,12 @@ impl DisplayAs for MergeScanExec { #[derive(Debug, Clone)] struct MergeScanMetric { - /// Nanosecond spent on fetching data from remote - grpc_time: Time, + /// Nanosecond elapsed till the scan operator is ready to emit data + ready_time: Time, + /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed + first_consume_time: Time, + /// Nanosecond elapsed till the scan operator finished execution + finish_time: Time, /// Count of rows fetched from remote output_rows: Count, } @@ -294,13 +298,23 @@ struct MergeScanMetric { impl MergeScanMetric { pub fn new(metric: &ExecutionPlanMetricsSet) -> Self { Self { - grpc_time: MetricBuilder::new(metric).subset_time("gRPC", 1), + ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1), + first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1), + finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1), output_rows: MetricBuilder::new(metric).output_rows(1), } } - pub fn grpc_time(&self) -> &Time { - &self.grpc_time + pub fn ready_time(&self) -> &Time { + &self.ready_time + } + + pub fn first_consume_time(&self) -> &Time { + &self.first_consume_time + } + + pub fn finish_time(&self) -> &Time { + &self.finish_time } pub fn record_output_batch_rows(&self, num_rows: usize) { diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 3a4ac34872..973d3877ab 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -14,10 +14,12 @@ use std::ops::Deref; +use common_error::ext::BoxedError; use common_query::Output; -use common_recordbatch::{util, RecordBatch}; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::SchemaRef; +use futures::StreamExt; use metrics::increment_counter; use opensrv_mysql::{ Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter, @@ -26,7 +28,7 @@ use session::context::QueryContextRef; use snafu::prelude::*; use tokio::io::AsyncWrite; -use crate::error::{self, Error, Result}; +use crate::error::{self, Error, OtherSnafu, Result}; use crate::metrics::*; /// Try to write multiple output to the writer if possible. @@ -50,8 +52,8 @@ pub async fn write_output( } struct QueryResult { - recordbatches: Vec, schema: SchemaRef, + stream: SendableRecordBatchStream, } pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> { @@ -80,20 +82,16 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { match output { Ok(output) => match output { Output::Stream(stream) => { - let schema = stream.schema().clone(); - let recordbatches = util::collect(stream) - .await - .context(error::CollectRecordbatchSnafu)?; let query_result = QueryResult { - recordbatches, - schema, + schema: stream.schema(), + stream, }; Self::write_query_result(query_result, self.writer, self.query_context).await?; } Output::RecordBatches(recordbatches) => { let query_result = QueryResult { schema: recordbatches.schema(), - recordbatches: recordbatches.take(), + stream: recordbatches.as_stream(), }; Self::write_query_result(query_result, self.writer, self.query_context).await?; } @@ -130,7 +128,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { } async fn write_query_result( - query_result: QueryResult, + mut query_result: QueryResult, writer: QueryResultWriter<'a, W>, query_context: QueryContextRef, ) -> Result<()> { @@ -139,9 +137,20 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one() // to return a new QueryResultWriter. let mut row_writer = writer.start(&column_def).await?; - for recordbatch in &query_result.recordbatches { - Self::write_recordbatch(&mut row_writer, recordbatch, query_context.clone()) - .await?; + while let Some(record_batch) = query_result.stream.next().await { + match record_batch { + Ok(record_batch) => { + Self::write_recordbatch( + &mut row_writer, + &record_batch, + query_context.clone(), + ) + .await? + } + Err(e) => { + return Err(e).map_err(BoxedError::new).context(OtherSnafu); + } + } } row_writer.finish().await?; Ok(()) diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 6df6e6cb23..e1aa84b5a8 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; @@ -54,8 +55,8 @@ impl MemTable { table_name, recordbatch, 1, - "greptime".to_string(), - "public".to_string(), + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), regions, ) } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 184e03c392..5abe02f086 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -23,6 +23,7 @@ use auth::user_provider_from_option; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_query::Output; +use common_recordbatch::RecordBatches; use servers::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse}; use servers::server::Server; use tests_integration::test_util::{ @@ -310,14 +311,19 @@ async fn insert_and_assert(db: &Database) { assert!(matches!(result, Output::AffectedRows(2))); // select - let result = db + let output = db .sql("SELECT host, cpu, memory, ts FROM demo") .await .unwrap(); - match result { - Output::RecordBatches(recordbatches) => { - let pretty = recordbatches.pretty_print().unwrap(); - let expected = "\ + + let record_batches = match output { + Output::RecordBatches(record_batches) => record_batches, + Output::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + Output::AffectedRows(_) => unreachable!(), + }; + + let pretty = record_batches.pretty_print().unwrap(); + let expected = "\ +-------+------+--------+-------------------------+ | host | cpu | memory | ts | +-------+------+--------+-------------------------+ @@ -329,10 +335,7 @@ async fn insert_and_assert(db: &Database) { | host6 | 88.8 | 333.3 | 2022-12-28T04:17:08 | +-------+------+--------+-------------------------+\ "; - assert_eq!(pretty, expected); - } - _ => unreachable!(), - } + assert_eq!(pretty, expected); } fn testing_create_expr() -> CreateTableExpr { diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 7032fe8052..89ae0233a6 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -11,6 +11,7 @@ common-base = { workspace = true } common-error = { workspace = true } common-grpc = { workspace = true } common-query = { workspace = true } +common-recordbatch = { workspace = true } common-time = { workspace = true } serde.workspace = true sqlness = { version = "0.5" } diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index af04eabe9e..94463078f2 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -23,12 +23,14 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; +use client::error::ServerSnafu; use client::{ Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, }; use common_error::ext::ErrorExt; use common_error::snafu::ErrorCompat; use common_query::Output; +use common_recordbatch::RecordBatches; use serde::Serialize; use sqlness::{Database, EnvController, QueryContext}; use tinytemplate::TinyTemplate; @@ -358,7 +360,21 @@ impl Database for GreptimeDB { result: Ok(Output::AffectedRows(0)), }) as _ } else { - let result = client.sql(&query).await; + let mut result = client.sql(&query).await; + if let Ok(Output::Stream(stream)) = result { + match RecordBatches::try_collect(stream).await { + Ok(recordbatches) => result = Ok(Output::RecordBatches(recordbatches)), + Err(e) => { + let status_code = e.status_code(); + let source_error = e.iter_chain().last().unwrap(); + result = ServerSnafu { + code: status_code, + msg: source_error.to_string(), + } + .fail(); + } + } + } Box::new(ResultDisplayer { result }) as _ } }