fix: select empty table (#268)

* fix: select empty table

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-09-19 11:28:12 +08:00
committed by GitHub
parent 1770079691
commit a649f34832
5 changed files with 83 additions and 51 deletions

View File

@@ -195,9 +195,13 @@ impl TryFrom<ObjectResult> for Output {
.collect::<Vec<ColumnSchema>>();
let schema = Arc::new(Schema::new(column_schemas));
let recordbatches = RecordBatch::new(schema, vectors)
.and_then(|batch| RecordBatches::try_new(batch.schema.clone(), vec![batch]))
.context(error::CreateRecordBatchesSnafu)?;
let recordbatches = if vectors.is_empty() {
RecordBatches::try_new(schema, vec![])
} else {
RecordBatch::new(schema, vectors)
.and_then(|batch| RecordBatches::try_new(batch.schema.clone(), vec![batch]))
}
.context(error::CreateRecordBatchesSnafu)?;
Output::RecordBatches(recordbatches)
}
ObjectResult::Mutate(mutate) => {

View File

@@ -200,6 +200,12 @@ pub enum Error {
#[snafu(backtrace)]
source: script::error::Error,
},
#[snafu(display("Failed to collect RecordBatches, source: {}", source))]
CollectRecordBatches {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -244,6 +250,7 @@ impl ErrorExt for Error {
Error::StartScriptManager { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
Error::CollectRecordBatches { source } => source.status_code(),
}
}

View File

@@ -4,62 +4,61 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult};
use arrow::array::{Array, BooleanArray, PrimitiveArray};
use common_base::BitVec;
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatch, SendableRecordBatchStream};
use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream};
use datatypes::arrow_array::{BinaryArray, StringArray};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, ConversionSnafu, Result};
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
pub async fn to_object_result(result: Result<Output>) -> ObjectResult {
match result {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
pub async fn to_object_result(output: Result<Output>) -> ObjectResult {
let result = match output {
Ok(Output::AffectedRows(rows)) => Ok(ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::Stream(stream)) => record_batchs(stream).await,
Ok(Output::RecordBatches(recordbatches)) => build_result(recordbatches.take()).await,
Err(err) => ObjectResultBuilder::new()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
async fn record_batchs(stream: SendableRecordBatchStream) -> ObjectResult {
match util::collect(stream).await {
Ok(recordbatches) => build_result(recordbatches).await,
Err(err) => build_err_result(&err),
}
}
async fn build_result(recordbatches: Vec<RecordBatch>) -> ObjectResult {
match try_convert(recordbatches) {
Ok(select_result) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.select_result(select_result)
.build(),
Err(err) => build_err_result(&err),
}
}
// All schemas of record_batches must be the same.
fn try_convert(record_batches: Vec<RecordBatch>) -> Result<SelectResult> {
let first = if let Some(r) = record_batches.get(0) {
r
} else {
return Ok(SelectResult::default());
.build()),
Ok(Output::Stream(stream)) => collect(stream).await,
Ok(Output::RecordBatches(recordbatches)) => build_result(recordbatches),
Err(e) => return build_err_result(&e),
};
match result {
Ok(r) => r,
Err(e) => build_err_result(&e),
}
}
async fn collect(stream: SendableRecordBatchStream) -> Result<ObjectResult> {
let schema = stream.schema();
let recordbatches = util::collect(stream)
.await
.and_then(|batches| RecordBatches::try_new(schema, batches))
.context(error::CollectRecordBatchesSnafu)?;
let object_result = build_result(recordbatches)?;
Ok(object_result)
}
fn build_result(recordbatches: RecordBatches) -> Result<ObjectResult> {
let select_result = try_convert(recordbatches)?;
let object_result = ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.select_result(select_result)
.build();
Ok(object_result)
}
fn try_convert(record_batches: RecordBatches) -> Result<SelectResult> {
let schema = record_batches.schema();
let record_batches = record_batches.take();
let row_count: usize = record_batches
.iter()
.map(|r| r.df_recordbatch.num_rows())
.sum();
let schemas = first.schema.column_schemas();
let schemas = schema.column_schemas();
let mut columns = Vec::with_capacity(schemas.len());
for (idx, schema) in schemas.iter().enumerate() {
@@ -179,7 +178,7 @@ mod tests {
array::{Array, BooleanArray, PrimitiveArray},
datatypes::{DataType, Field},
};
use common_recordbatch::RecordBatch;
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::field_util::SchemaExt;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::{
@@ -193,8 +192,10 @@ mod tests {
#[test]
fn test_convert_record_batches_to_select_result() {
let r1 = mock_record_batch();
let schema = r1.schema.clone();
let r2 = mock_record_batch();
let record_batches = vec![r1, r2];
let record_batches = RecordBatches::try_new(schema, record_batches).unwrap();
let s = try_convert(record_batches).unwrap();

View File

@@ -3,9 +3,11 @@ use arrow::array::{
MutableUtf8Array, PrimitiveArray, Utf8Array,
};
use arrow::datatypes::DataType as ArrowDataType;
use common_time::timestamp::Timestamp;
use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result};
use crate::prelude::ConcreteDataType;
use crate::value::Value;
pub type BinaryArray = ArrowBinaryArray<i64>;
@@ -59,6 +61,14 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result<Value> {
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => {
Value::String(cast_array!(array, StringArray).value(idx).into())
}
ArrowDataType::Timestamp(t, _) => {
let value = cast_array!(array, PrimitiveArray::<i64>).value(idx);
let unit = match ConcreteDataType::from_arrow_time_unit(t) {
ConcreteDataType::Timestamp(t) => t.unit,
_ => unreachable!(),
};
Value::Timestamp(Timestamp::new(value, unit))
}
// TODO(sunng87): List
_ => unimplemented!("Arrow array datatype: {:?}", array.data_type()),
};
@@ -68,9 +78,13 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result<Value> {
#[cfg(test)]
mod test {
use arrow::array::Int64Array as ArrowI64Array;
use arrow::array::*;
use common_time::timestamp::TimeUnit;
use super::*;
use crate::prelude::Vector;
use crate::vectors::TimestampVector;
#[test]
fn test_arrow_array_access() {
@@ -88,8 +102,8 @@ mod test {
assert_eq!(Value::Int32(2), arrow_array_get(&array1, 1).unwrap());
let array1 = UInt32Array::from_vec(vec![1, 2, 3, 4]);
assert_eq!(Value::UInt32(2), arrow_array_get(&array1, 1).unwrap());
let array1 = Int64Array::from_vec(vec![1, 2, 3, 4]);
assert_eq!(Value::Int64(2), arrow_array_get(&array1, 1).unwrap());
let array = ArrowI64Array::from_vec(vec![1, 2, 3, 4]);
assert_eq!(Value::Int64(2), arrow_array_get(&array, 1).unwrap());
let array1 = UInt64Array::from_vec(vec![1, 2, 3, 4]);
assert_eq!(Value::UInt64(2), arrow_array_get(&array1, 1).unwrap());
let array1 = Float32Array::from_vec(vec![1f32, 2f32, 3f32, 4f32]);
@@ -120,5 +134,13 @@ mod test {
arrow_array_get(&array3, 0).unwrap()
);
assert_eq!(Value::Null, arrow_array_get(&array3, 1).unwrap());
let vector = TimestampVector::new(ArrowI64Array::from_vec(vec![1, 2, 3, 4]));
let array = vector.to_boxed_arrow_array();
let value = arrow_array_get(&*array, 1).unwrap();
assert_eq!(
value,
Value::Timestamp(Timestamp::new(2, TimeUnit::Millisecond))
);
}
}

View File

@@ -3,6 +3,8 @@ use std::ops::Deref;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_time::datetime::DateTime;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::{ColumnSchema, SchemaRef};
use opensrv_mysql::{
@@ -71,11 +73,6 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> {
query_result: QueryResult,
writer: QueryResultWriter<'a, W>,
) -> Result<()> {
if query_result.recordbatches.is_empty() {
writer.completed(OkResponse::default())?;
return Ok(());
}
match create_mysql_column_def(&query_result.schema) {
Ok(column_def) => {
let mut row_writer = writer.start(&column_def)?;
@@ -111,7 +108,7 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> {
Value::Date(v) => row_writer.write_col(v.val())?,
Value::DateTime(v) => row_writer.write_col(v.val())?,
Value::Timestamp(v) => row_writer
.write_col(v.convert_to(common_time::timestamp::TimeUnit::Second))?, // TODO(hl): Can we also write
.write_col(DateTime::new(v.convert_to(TimeUnit::Second)).to_string())?,
Value::List(_) => {
return Err(Error::Internal {
err_msg: format!(
@@ -152,6 +149,7 @@ fn create_mysql_column(column_schema: &ColumnSchema) -> Result<Column> {
ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
Ok(ColumnType::MYSQL_TYPE_VARCHAR)
}
ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME),
_ => error::InternalSnafu {
err_msg: format!(
"not implemented for column datatype {:?}",