From a649f34832879fcafbfdfb14fa53975b937131a3 Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 19 Sep 2022 11:28:12 +0800 Subject: [PATCH] fix: select empty table (#268) * fix: select empty table Co-authored-by: luofucong --- src/client/src/database.rs | 10 +++- src/datanode/src/error.rs | 7 +++ src/datanode/src/server/grpc/select.rs | 81 +++++++++++++------------- src/datatypes/src/arrow_array.rs | 26 ++++++++- src/servers/src/mysql/writer.rs | 10 ++-- 5 files changed, 83 insertions(+), 51 deletions(-) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 89e141d374..edf166ef3b 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -195,9 +195,13 @@ impl TryFrom for Output { .collect::>(); let schema = Arc::new(Schema::new(column_schemas)); - let recordbatches = RecordBatch::new(schema, vectors) - .and_then(|batch| RecordBatches::try_new(batch.schema.clone(), vec![batch])) - .context(error::CreateRecordBatchesSnafu)?; + let recordbatches = if vectors.is_empty() { + RecordBatches::try_new(schema, vec![]) + } else { + RecordBatch::new(schema, vectors) + .and_then(|batch| RecordBatches::try_new(batch.schema.clone(), vec![batch])) + } + .context(error::CreateRecordBatchesSnafu)?; Output::RecordBatches(recordbatches) } ObjectResult::Mutate(mutate) => { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index b6b9109e91..7be0fd58eb 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -200,6 +200,12 @@ pub enum Error { #[snafu(backtrace)] source: script::error::Error, }, + + #[snafu(display("Failed to collect RecordBatches, source: {}", source))] + CollectRecordBatches { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, } pub type Result = std::result::Result; @@ -244,6 +250,7 @@ impl ErrorExt for Error { Error::StartScriptManager { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, + Error::CollectRecordBatches { source } => source.status_code(), } } diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index 354355cfde..936b2eede4 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -4,62 +4,61 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult}; use arrow::array::{Array, BooleanArray, PrimitiveArray}; use common_base::BitVec; -use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; -use common_recordbatch::{util, RecordBatch, SendableRecordBatchStream}; +use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream}; use datatypes::arrow_array::{BinaryArray, StringArray}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, ConversionSnafu, Result}; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; -pub async fn to_object_result(result: Result) -> ObjectResult { - match result { - Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() +pub async fn to_object_result(output: Result) -> ObjectResult { + let result = match output { + Ok(Output::AffectedRows(rows)) => Ok(ObjectResultBuilder::new() .status_code(StatusCode::Success as u32) .mutate_result(rows as u32, 0) - .build(), - Ok(Output::Stream(stream)) => record_batchs(stream).await, - Ok(Output::RecordBatches(recordbatches)) => build_result(recordbatches.take()).await, - Err(err) => ObjectResultBuilder::new() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } -} - -async fn record_batchs(stream: SendableRecordBatchStream) -> ObjectResult { - match util::collect(stream).await { - Ok(recordbatches) => build_result(recordbatches).await, - Err(err) => build_err_result(&err), - } -} - -async fn build_result(recordbatches: Vec) -> ObjectResult { - match try_convert(recordbatches) { - Ok(select_result) => ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .select_result(select_result) - .build(), - Err(err) => build_err_result(&err), - } -} - -// All schemas of record_batches must be the same. -fn try_convert(record_batches: Vec) -> Result { - let first = if let Some(r) = record_batches.get(0) { - r - } else { - return Ok(SelectResult::default()); + .build()), + Ok(Output::Stream(stream)) => collect(stream).await, + Ok(Output::RecordBatches(recordbatches)) => build_result(recordbatches), + Err(e) => return build_err_result(&e), }; + match result { + Ok(r) => r, + Err(e) => build_err_result(&e), + } +} +async fn collect(stream: SendableRecordBatchStream) -> Result { + let schema = stream.schema(); + + let recordbatches = util::collect(stream) + .await + .and_then(|batches| RecordBatches::try_new(schema, batches)) + .context(error::CollectRecordBatchesSnafu)?; + + let object_result = build_result(recordbatches)?; + Ok(object_result) +} + +fn build_result(recordbatches: RecordBatches) -> Result { + let select_result = try_convert(recordbatches)?; + let object_result = ObjectResultBuilder::new() + .status_code(StatusCode::Success as u32) + .select_result(select_result) + .build(); + Ok(object_result) +} + +fn try_convert(record_batches: RecordBatches) -> Result { + let schema = record_batches.schema(); + let record_batches = record_batches.take(); let row_count: usize = record_batches .iter() .map(|r| r.df_recordbatch.num_rows()) .sum(); - let schemas = first.schema.column_schemas(); + let schemas = schema.column_schemas(); let mut columns = Vec::with_capacity(schemas.len()); for (idx, schema) in schemas.iter().enumerate() { @@ -179,7 +178,7 @@ mod tests { array::{Array, BooleanArray, PrimitiveArray}, datatypes::{DataType, Field}, }; - use common_recordbatch::RecordBatch; + use common_recordbatch::{RecordBatch, RecordBatches}; use datafusion::field_util::SchemaExt; use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::{ @@ -193,8 +192,10 @@ mod tests { #[test] fn test_convert_record_batches_to_select_result() { let r1 = mock_record_batch(); + let schema = r1.schema.clone(); let r2 = mock_record_batch(); let record_batches = vec![r1, r2]; + let record_batches = RecordBatches::try_new(schema, record_batches).unwrap(); let s = try_convert(record_batches).unwrap(); diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index deaf9b4c09..f784c9574b 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -3,9 +3,11 @@ use arrow::array::{ MutableUtf8Array, PrimitiveArray, Utf8Array, }; use arrow::datatypes::DataType as ArrowDataType; +use common_time::timestamp::Timestamp; use snafu::OptionExt; use crate::error::{ConversionSnafu, Result}; +use crate::prelude::ConcreteDataType; use crate::value::Value; pub type BinaryArray = ArrowBinaryArray; @@ -59,6 +61,14 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result { ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => { Value::String(cast_array!(array, StringArray).value(idx).into()) } + ArrowDataType::Timestamp(t, _) => { + let value = cast_array!(array, PrimitiveArray::).value(idx); + let unit = match ConcreteDataType::from_arrow_time_unit(t) { + ConcreteDataType::Timestamp(t) => t.unit, + _ => unreachable!(), + }; + Value::Timestamp(Timestamp::new(value, unit)) + } // TODO(sunng87): List _ => unimplemented!("Arrow array datatype: {:?}", array.data_type()), }; @@ -68,9 +78,13 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result { #[cfg(test)] mod test { + use arrow::array::Int64Array as ArrowI64Array; use arrow::array::*; + use common_time::timestamp::TimeUnit; use super::*; + use crate::prelude::Vector; + use crate::vectors::TimestampVector; #[test] fn test_arrow_array_access() { @@ -88,8 +102,8 @@ mod test { assert_eq!(Value::Int32(2), arrow_array_get(&array1, 1).unwrap()); let array1 = UInt32Array::from_vec(vec![1, 2, 3, 4]); assert_eq!(Value::UInt32(2), arrow_array_get(&array1, 1).unwrap()); - let array1 = Int64Array::from_vec(vec![1, 2, 3, 4]); - assert_eq!(Value::Int64(2), arrow_array_get(&array1, 1).unwrap()); + let array = ArrowI64Array::from_vec(vec![1, 2, 3, 4]); + assert_eq!(Value::Int64(2), arrow_array_get(&array, 1).unwrap()); let array1 = UInt64Array::from_vec(vec![1, 2, 3, 4]); assert_eq!(Value::UInt64(2), arrow_array_get(&array1, 1).unwrap()); let array1 = Float32Array::from_vec(vec![1f32, 2f32, 3f32, 4f32]); @@ -120,5 +134,13 @@ mod test { arrow_array_get(&array3, 0).unwrap() ); assert_eq!(Value::Null, arrow_array_get(&array3, 1).unwrap()); + + let vector = TimestampVector::new(ArrowI64Array::from_vec(vec![1, 2, 3, 4])); + let array = vector.to_boxed_arrow_array(); + let value = arrow_array_get(&*array, 1).unwrap(); + assert_eq!( + value, + Value::Timestamp(Timestamp::new(2, TimeUnit::Millisecond)) + ); } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index e409d2db19..f34afa765b 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -3,6 +3,8 @@ use std::ops::Deref; use common_query::Output; use common_recordbatch::{util, RecordBatch}; +use common_time::datetime::DateTime; +use common_time::timestamp::TimeUnit; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::{ColumnSchema, SchemaRef}; use opensrv_mysql::{ @@ -71,11 +73,6 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { query_result: QueryResult, writer: QueryResultWriter<'a, W>, ) -> Result<()> { - if query_result.recordbatches.is_empty() { - writer.completed(OkResponse::default())?; - return Ok(()); - } - match create_mysql_column_def(&query_result.schema) { Ok(column_def) => { let mut row_writer = writer.start(&column_def)?; @@ -111,7 +108,7 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { Value::Date(v) => row_writer.write_col(v.val())?, Value::DateTime(v) => row_writer.write_col(v.val())?, Value::Timestamp(v) => row_writer - .write_col(v.convert_to(common_time::timestamp::TimeUnit::Second))?, // TODO(hl): Can we also write + .write_col(DateTime::new(v.convert_to(TimeUnit::Second)).to_string())?, Value::List(_) => { return Err(Error::Internal { err_msg: format!( @@ -152,6 +149,7 @@ fn create_mysql_column(column_schema: &ColumnSchema) -> Result { ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => { Ok(ColumnType::MYSQL_TYPE_VARCHAR) } + ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME), _ => error::InternalSnafu { err_msg: format!( "not implemented for column datatype {:?}",