refactor: introduce new Output with OutputMeta (#3466)

* refactor: introduce new output struct

* chore: add helper function

* chore: update comment

* chore: update commit

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: rename according to cr

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
shuiyisong
2024-03-11 10:24:09 +08:00
committed by GitHub
parent 8c37c3fc0f
commit 0bb949787c
54 changed files with 807 additions and 592 deletions

View File

@@ -28,7 +28,7 @@ use client::{
Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use common_error::ext::ErrorExt;
use common_query::Output;
use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use serde::Serialize;
use sqlness::{Database, EnvController, QueryContext};
@@ -443,7 +443,7 @@ impl Database for GreptimeDB {
.trim_end_matches(';');
client.set_schema(database);
Box::new(ResultDisplayer {
result: Ok(Output::AffectedRows(0)),
result: Ok(Output::new_with_affected_rows(0)),
}) as _
} else if query.trim().to_lowercase().starts_with("set time_zone") {
// set time_zone='xxx'
@@ -460,13 +460,19 @@ impl Database for GreptimeDB {
client.set_timezone(timezone);
Box::new(ResultDisplayer {
result: Ok(Output::AffectedRows(0)),
result: Ok(Output::new_with_affected_rows(0)),
}) as _
} else {
let mut result = client.sql(&query).await;
if let Ok(Output::Stream(stream, _)) = result {
if let Ok(Output {
data: OutputData::Stream(stream),
..
}) = result
{
match RecordBatches::try_collect(stream).await {
Ok(recordbatches) => result = Ok(Output::RecordBatches(recordbatches)),
Ok(recordbatches) => {
result = Ok(Output::new_with_record_batches(recordbatches));
}
Err(e) => {
let status_code = e.status_code();
let msg = e.output_msg();
@@ -567,11 +573,11 @@ struct ResultDisplayer {
impl Display for ResultDisplayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.result {
Ok(result) => match result {
Output::AffectedRows(rows) => {
Ok(result) => match &result.data {
OutputData::AffectedRows(rows) => {
write!(f, "Affected Rows: {rows}")
}
Output::RecordBatches(recordbatches) => {
OutputData::RecordBatches(recordbatches) => {
let pretty = recordbatches.pretty_print().map_err(|e| e.to_string());
match pretty {
Ok(s) => write!(f, "{s}"),
@@ -580,7 +586,7 @@ impl Display for ResultDisplayer {
}
}
}
Output::Stream(_, _) => unreachable!(),
OutputData::Stream(_) => unreachable!(),
},
Err(e) => {
let status_code = e.status_code();