refactor: get schema from stream

This commit is contained in:
Ning Sun
2022-10-29 13:48:01 +08:00
parent db2cb63267
commit 945b685556

View File

@@ -18,6 +18,7 @@ use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_telemetry::logging::info;
use datatypes::data_type::DataType;
use datatypes::schema::SchemaRef;
use schemars::JsonSchema;
use serde::Serialize;
use serde_json::Value;
@@ -55,7 +56,7 @@ pub struct Schema {
#[derive(Debug, Serialize, JsonSchema)]
pub struct HttpRecordsOutput {
schema: Option<Schema>,
schema: Schema,
rows: Vec<Vec<Value>>,
}
@@ -65,59 +66,42 @@ impl HttpRecordsOutput {
}
pub fn num_cols(&self) -> usize {
self.schema
.as_ref()
.map(|x| x.column_schemas.len())
.unwrap_or(0)
self.schema.column_schemas.len()
}
}
impl TryFrom<Vec<RecordBatch>> for HttpRecordsOutput {
type Error = String;
fn try_from(
impl HttpRecordsOutput {
fn new(
schema: SchemaRef,
recordbatches: Vec<RecordBatch>,
) -> std::result::Result<HttpRecordsOutput, Self::Error> {
if recordbatches.is_empty() {
Ok(HttpRecordsOutput {
schema: None,
rows: vec![],
})
} else {
// safety ensured by previous empty check
let first = &recordbatches[0];
let schema = Schema {
column_schemas: first
.schema
.column_schemas()
.iter()
.map(|cs| ColumnSchema {
name: cs.name.clone(),
data_type: cs.data_type.name().to_owned(),
})
.collect(),
};
) -> std::result::Result<HttpRecordsOutput, String> {
let schema = Schema {
column_schemas: schema
.column_schemas()
.iter()
.map(|cs| ColumnSchema {
name: cs.name.clone(),
data_type: cs.data_type.name().to_owned(),
})
.collect(),
};
let mut rows =
Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
let mut rows =
Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
for recordbatch in recordbatches {
for row in recordbatch.rows() {
let row = row.map_err(|e| e.to_string())?;
let value_row = row
.into_iter()
.map(|f| Value::try_from(f).map_err(|err| err.to_string()))
.collect::<std::result::Result<Vec<Value>, _>>()?;
for recordbatch in recordbatches {
for row in recordbatch.rows() {
let row = row.map_err(|e| e.to_string())?;
let value_row = row
.into_iter()
.map(|f| Value::try_from(f).map_err(|err| err.to_string()))
.collect::<std::result::Result<Vec<Value>, _>>()?;
rows.push(value_row);
}
rows.push(value_row);
}
Ok(HttpRecordsOutput {
schema: Some(schema),
rows,
})
}
Ok(HttpRecordsOutput { schema, rows })
}
}
@@ -160,15 +144,18 @@ impl JsonResponse {
Ok(Output::AffectedRows(rows)) => {
Self::with_output(Some(JsonOutput::AffectedRows(rows)))
}
Ok(Output::Stream(stream)) => match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))),
Err(err) => Self::with_error(Some(format!(": {}", err))),
},
Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))),
},
Ok(Output::Stream(stream)) => {
let schema = stream.schema();
match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::new(schema, rows) {
Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))),
Err(err) => Self::with_error(Some(format!(": {}", err))),
},
Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))),
}
}
Ok(Output::RecordBatches(recordbatches)) => {
match HttpRecordsOutput::try_from(recordbatches.take()) {
match HttpRecordsOutput::new(recordbatches.schema(), recordbatches.take()) {
Ok(rows) => Self::with_output(Some(JsonOutput::Records(rows))),
Err(err) => Self::with_error(Some(format!(": {}", err))),
}
@@ -342,3 +329,9 @@ async fn handle_error(err: BoxError) -> Json<JsonResponse> {
output: None,
})
}
#[cfg(test)]
mod test {
#[test]
fn test_recordbatches_convertion() {}
}