From 945b685556b0d0dafbca9e8f78657a8791448620 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sat, 29 Oct 2022 13:48:01 +0800 Subject: [PATCH] refactor: get schema from stream --- src/servers/src/http.rs | 99 +++++++++++++++++++---------------------- 1 file changed, 46 insertions(+), 53 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 14a17b5271..a7993afacd 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -18,6 +18,7 @@ use common_query::Output; use common_recordbatch::{util, RecordBatch}; use common_telemetry::logging::info; use datatypes::data_type::DataType; +use datatypes::schema::SchemaRef; use schemars::JsonSchema; use serde::Serialize; use serde_json::Value; @@ -55,7 +56,7 @@ pub struct Schema { #[derive(Debug, Serialize, JsonSchema)] pub struct HttpRecordsOutput { - schema: Option, + schema: Schema, rows: Vec>, } @@ -65,59 +66,42 @@ impl HttpRecordsOutput { } pub fn num_cols(&self) -> usize { - self.schema - .as_ref() - .map(|x| x.column_schemas.len()) - .unwrap_or(0) + self.schema.column_schemas.len() } } -impl TryFrom> for HttpRecordsOutput { - type Error = String; - - fn try_from( +impl HttpRecordsOutput { + fn new( + schema: SchemaRef, recordbatches: Vec, - ) -> std::result::Result { - if recordbatches.is_empty() { - Ok(HttpRecordsOutput { - schema: None, - rows: vec![], - }) - } else { - // safety ensured by previous empty check - let first = &recordbatches[0]; - let schema = Schema { - column_schemas: first - .schema - .column_schemas() - .iter() - .map(|cs| ColumnSchema { - name: cs.name.clone(), - data_type: cs.data_type.name().to_owned(), - }) - .collect(), - }; + ) -> std::result::Result { + let schema = Schema { + column_schemas: schema + .column_schemas() + .iter() + .map(|cs| ColumnSchema { + name: cs.name.clone(), + data_type: cs.data_type.name().to_owned(), + }) + .collect(), + }; - let mut rows = - Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::()); + let mut rows = + Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::()); - for recordbatch in recordbatches { - for row in recordbatch.rows() { - let row = row.map_err(|e| e.to_string())?; - let value_row = row - .into_iter() - .map(|f| Value::try_from(f).map_err(|err| err.to_string())) - .collect::, _>>()?; + for recordbatch in recordbatches { + for row in recordbatch.rows() { + let row = row.map_err(|e| e.to_string())?; + let value_row = row + .into_iter() + .map(|f| Value::try_from(f).map_err(|err| err.to_string())) + .collect::, _>>()?; - rows.push(value_row); - } + rows.push(value_row); } - - Ok(HttpRecordsOutput { - schema: Some(schema), - rows, - }) } + + Ok(HttpRecordsOutput { schema, rows }) } } @@ -160,15 +144,18 @@ impl JsonResponse { Ok(Output::AffectedRows(rows)) => { Self::with_output(Some(JsonOutput::AffectedRows(rows))) } - Ok(Output::Stream(stream)) => match util::collect(stream).await { - Ok(rows) => match HttpRecordsOutput::try_from(rows) { - Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))), - Err(err) => Self::with_error(Some(format!(": {}", err))), - }, - Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))), - }, + Ok(Output::Stream(stream)) => { + let schema = stream.schema(); + match util::collect(stream).await { + Ok(rows) => match HttpRecordsOutput::new(schema, rows) { + Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))), + Err(err) => Self::with_error(Some(format!(": {}", err))), + }, + Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))), + } + } Ok(Output::RecordBatches(recordbatches)) => { - match HttpRecordsOutput::try_from(recordbatches.take()) { + match HttpRecordsOutput::new(recordbatches.schema(), recordbatches.take()) { Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))), Err(err) => Self::with_error(Some(format!(": {}", err))), } @@ -342,3 +329,9 @@ async fn handle_error(err: BoxError) -> Json { output: None, }) } + +#[cfg(test)] +mod test { + #[test] + fn test_recordbatches_convertion() {} +}