From e62b302fb21329016797471db6aec685751ad8a1 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Thu, 10 Nov 2022 11:53:27 +0800 Subject: [PATCH] feat: some improvements on python coprocessor (#423) * feat: supports list array in arrow_array_get * feat: supports string and list type conversions in python coprocessor * test: add test cases for returning list in coprocessor --- src/datatypes/src/arrow_array.rs | 51 ++++++++++++++++++-- src/script/src/python/builtins/mod.rs | 24 +++++---- src/script/src/python/builtins/testcases.ron | 6 +-- src/script/src/python/coprocessor.rs | 9 ++-- src/script/src/python/error.rs | 14 ++++-- src/script/src/python/testcases.ron | 33 +++++++++++++ src/script/src/python/utils.rs | 34 ++++++++++++- 7 files changed, 145 insertions(+), 26 deletions(-) diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index 8107754ddc..8b8f234ee0 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -1,6 +1,6 @@ use arrow::array::{ - self, Array, BinaryArray as ArrowBinaryArray, MutableBinaryArray as ArrowMutableBinaryArray, - MutableUtf8Array, PrimitiveArray, Utf8Array, + self, Array, BinaryArray as ArrowBinaryArray, ListArray, + MutableBinaryArray as ArrowMutableBinaryArray, MutableUtf8Array, PrimitiveArray, Utf8Array, }; use arrow::datatypes::DataType as ArrowDataType; use common_time::timestamp::Timestamp; @@ -8,7 +8,7 @@ use snafu::OptionExt; use crate::error::{ConversionSnafu, Result}; use crate::prelude::ConcreteDataType; -use crate::value::Value; +use crate::value::{ListValue, Value}; pub type BinaryArray = ArrowBinaryArray; pub type MutableBinaryArray = ArrowMutableBinaryArray; @@ -69,7 +69,14 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result { }; Value::Timestamp(Timestamp::new(value, unit)) } - // TODO(sunng87): List + ArrowDataType::List(_) => { + let array = cast_array!(array, ListArray::).value(idx); + let inner_datatype = ConcreteDataType::try_from(array.data_type())?; + let values = (0..array.len()) + .map(|i| arrow_array_get(&*array, i)) + .collect::>>()?; + Value::List(ListValue::new(Some(Box::new(values)), inner_datatype)) + } _ => unimplemented!("Arrow array datatype: {:?}", array.data_type()), }; @@ -80,6 +87,7 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result { mod test { use arrow::array::Int64Array as ArrowI64Array; use arrow::array::*; + use arrow::array::{MutableListArray, MutablePrimitiveArray, TryExtend}; use arrow::buffer::Buffer; use arrow::datatypes::{DataType, TimeUnit as ArrowTimeUnit}; use common_time::timestamp::{TimeUnit, Timestamp}; @@ -164,5 +172,40 @@ mod test { Value::Timestamp(Timestamp::new(1, TimeUnit::Nanosecond)), arrow_array_get(&array4, 0).unwrap() ); + + // test list array + let data = vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + None, + Some(vec![Some(4), None, Some(6)]), + ]; + + let mut arrow_array = MutableListArray::>::new(); + arrow_array.try_extend(data).unwrap(); + let arrow_array: ListArray = arrow_array.into(); + + let v0 = arrow_array_get(&arrow_array, 0).unwrap(); + match v0 { + Value::List(list) => { + assert!(matches!(list.datatype(), ConcreteDataType::Int32(_))); + let items = list.items().as_ref().unwrap(); + assert_eq!( + **items, + vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)] + ); + } + _ => unreachable!(), + } + + assert_eq!(Value::Null, arrow_array_get(&arrow_array, 1).unwrap()); + let v2 = arrow_array_get(&arrow_array, 2).unwrap(); + match v2 { + Value::List(list) => { + assert!(matches!(list.datatype(), ConcreteDataType::Int32(_))); + let items = list.items().as_ref().unwrap(); + assert_eq!(**items, vec![Value::Int32(4), Value::Null, Value::Int32(6)]); + } + _ => unreachable!(), + } } } diff --git a/src/script/src/python/builtins/mod.rs b/src/script/src/python/builtins/mod.rs index e077d0031a..fcce9c71e3 100644 --- a/src/script/src/python/builtins/mod.rs +++ b/src/script/src/python/builtins/mod.rs @@ -15,7 +15,7 @@ use datatypes::vectors::Helper as HelperVec; use rustpython_vm::builtins::PyList; use rustpython_vm::pymodule; use rustpython_vm::{ - builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt}, + builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyStr}, AsObject, PyObjectRef, PyPayload, PyResult, VirtualMachine, }; @@ -50,14 +50,15 @@ fn collect_diff_types_string(values: &[ScalarValue], ty: &DataType) -> String { /// /// supported scalar are(leftside is python data type, right side is rust type): /// -/// | Python | Rust | -/// | ------ | ---- | -/// | integer| i64 | -/// | float | f64 | -/// | bool | bool | -/// | vector | array| +/// | Python | Rust | +/// | ------ | ------ | +/// | integer| i64 | +/// | float | f64 | +/// | str | String | +/// | bool | bool | +/// | vector | array | /// | list | `ScalarValue::List` | -fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult { +pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult { if is_instance::(&obj, vm) { let ret = obj .payload::() @@ -67,6 +68,9 @@ fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult(vm)?; Ok(DFColValue::Scalar(ScalarValue::Boolean(Some(ret)))) + } else if is_instance::(&obj, vm) { + let ret = obj.try_into_value::(vm)?; + Ok(DFColValue::Scalar(ScalarValue::Utf8(Some(ret)))) } else if is_instance::(&obj, vm) { let ret = obj.try_into_value::(vm)?; Ok(DFColValue::Scalar(ScalarValue::Int64(Some(ret)))) @@ -92,10 +96,10 @@ fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult>()?; if ret.is_empty() { - //TODO(dennis): empty list, we set type as f64. + // TODO(dennis): empty list, we set type as null. return Ok(DFColValue::Scalar(ScalarValue::List( None, - Box::new(DataType::Float64), + Box::new(DataType::Null), ))); } diff --git a/src/script/src/python/builtins/testcases.ron b/src/script/src/python/builtins/testcases.ron index 994c50b811..5254678ebf 100644 --- a/src/script/src/python/builtins/testcases.ron +++ b/src/script/src/python/builtins/testcases.ron @@ -1,4 +1,4 @@ -// This is the file for UDF&UDAF binding from datafusion, +// This is the file for UDF&UDAF binding from datafusion, // including most test for those function(except ApproxMedian which datafusion didn't implement) // check src/script/builtins/test.rs::run_builtin_fn_testcases() for more information [ @@ -632,7 +632,7 @@ variance_pop(values)"#, ty: Float64 )) ), - + // GrepTime's own UDF TestCase( @@ -782,7 +782,7 @@ sin(num)"#, script: r#" from greptime import * sin(num)"#, - expect: Err("Can't cast object of type str into vector or scalar") + expect: Err("TypeError: Can't cast type Utf8 to Float64") ), TestCase( input: {}, diff --git a/src/script/src/python/coprocessor.rs b/src/script/src/python/coprocessor.rs index 6b1bb7d145..c47fd7578f 100644 --- a/src/script/src/python/coprocessor.rs +++ b/src/script/src/python/coprocessor.rs @@ -13,7 +13,7 @@ use datatypes::arrow::compute::cast::CastOptions; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datatypes::schema::Schema; use datatypes::vectors::Helper; -use datatypes::vectors::{BooleanVector, Vector, VectorRef}; +use datatypes::vectors::{BooleanVector, StringVector, Vector, VectorRef}; use rustpython_bytecode::CodeObject; use rustpython_vm as vm; use rustpython_vm::{class::PyClassImpl, AsObject}; @@ -186,10 +186,11 @@ fn try_into_py_vector(fetch_args: Vec) -> Result> { DataType::Int16 => try_into_vector::(arg)?, DataType::Int32 => try_into_vector::(arg)?, DataType::Int64 => try_into_vector::(arg)?, + DataType::Utf8 => { + Arc::new(StringVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _ + } DataType::Boolean => { - let v: VectorRef = - Arc::new(BooleanVector::try_from_arrow_array(arg).context(TypeCastSnafu)?); - v + Arc::new(BooleanVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _ } _ => { return ret_other_error_with(format!( diff --git a/src/script/src/python/error.rs b/src/script/src/python/error.rs index d1482bfde3..43c7c46c13 100644 --- a/src/script/src/python/error.rs +++ b/src/script/src/python/error.rs @@ -1,5 +1,6 @@ use common_error::prelude::{ErrorCompat, ErrorExt, StatusCode}; use console::{style, Style}; +use datafusion::error::DataFusionError; use datatypes::arrow::error::ArrowError; use datatypes::error::Error as DataTypeError; use query::error::Error as QueryError; @@ -50,6 +51,12 @@ pub enum Error { source: ArrowError, }, + #[snafu(display("DataFusion error: {}", source))] + DataFusion { + backtrace: Backtrace, + source: DataFusionError, + }, + /// errors in coprocessors' parse check for types and etc. #[snafu(display("Coprocessor error: {} {}.", reason, if let Some(loc) = loc{ @@ -93,9 +100,10 @@ impl From for Error { impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::Arrow { .. } | Error::PyRuntime { .. } | Error::Other { .. } => { - StatusCode::Internal - } + Error::DataFusion { .. } + | Error::Arrow { .. } + | Error::PyRuntime { .. } + | Error::Other { .. } => StatusCode::Internal, Error::RecordBatch { source } => source.status_code(), Error::DatabaseQuery { source } => source.status_code(), diff --git a/src/script/src/python/testcases.ron b/src/script/src/python/testcases.ron index 44301446a1..91d736070e 100644 --- a/src/script/src/python/testcases.ron +++ b/src/script/src/python/testcases.ron @@ -362,6 +362,39 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], ] ) ), + ( + name: "constant_list", + code: r#" +@copr(args=["cpu", "mem"], returns=["what"]) +def a(cpu: vector[f32], mem: vector[f64]): + return ["apple" ,"banana", "cherry"] +"#, + predicate: ExecIsOk( + fields: [ + ( + datatype: Some(Utf8), + is_nullable: false, + ), + ], + columns: [ + ( + ty: Utf8, + len: 3 + ), + ] + ) + ), + ( + name: "constant_list_different_type", + code: r#" +@copr(args=["cpu", "mem"], returns=["what"]) +def a(cpu: vector[f32], mem: vector[f64]): + return ["apple" ,3, "cherry"] +"#, + predicate: ExecIsErr( + reason: "All elements in a list should be same type to cast to Datafusion list!", + ) + ), ( // expect 4 vector ,found 5 name: "ret_nums_wrong", diff --git a/src/script/src/python/utils.rs b/src/script/src/python/utils.rs index 810ff842f9..4b06e552f6 100644 --- a/src/script/src/python/utils.rs +++ b/src/script/src/python/utils.rs @@ -1,11 +1,16 @@ use std::sync::Arc; -use datafusion::arrow::array::{ArrayRef, BooleanArray, PrimitiveArray}; -use rustpython_vm::builtins::{PyBool, PyFloat, PyInt}; +use datafusion::arrow::array::{ArrayRef, BooleanArray, NullArray, PrimitiveArray, Utf8Array}; +use datafusion_common::ScalarValue; +use datafusion_expr::ColumnarValue as DFColValue; +use datatypes::arrow::datatypes::DataType; +use rustpython_vm::builtins::{PyBool, PyFloat, PyInt, PyList, PyStr}; use rustpython_vm::{builtins::PyBaseExceptionRef, PyObjectRef, PyPayload, PyRef, VirtualMachine}; use snafu::OptionExt; +use snafu::ResultExt; use snafu::{Backtrace, GenerateImplicitData}; +use crate::python::builtins::try_into_columnar_value; use crate::python::error; use crate::python::error::ret_other_error_with; use crate::python::PyVector; @@ -39,6 +44,7 @@ pub fn py_vec_obj_to_array( vm: &VirtualMachine, col_len: usize, ) -> Result { + // It's ugly, but we can't find a better way right now. if is_instance::(obj, vm) { let pyv = obj.payload::().with_context(|| { ret_other_error_with(format!("can't cast obj {:?} to PyVector", obj)) @@ -66,6 +72,30 @@ pub fn py_vec_obj_to_array( let ret = BooleanArray::from_iter(std::iter::repeat(Some(val)).take(col_len)); Ok(Arc::new(ret) as _) + } else if is_instance::(obj, vm) { + let val = obj + .to_owned() + .try_into_value::(vm) + .map_err(|e| format_py_error(e, vm))?; + + let ret = Utf8Array::::from_iter(std::iter::repeat(Some(val)).take(col_len)); + Ok(Arc::new(ret) as _) + } else if is_instance::(obj, vm) { + let columnar_value = + try_into_columnar_value(obj.clone(), vm).map_err(|e| format_py_error(e, vm))?; + + match columnar_value { + DFColValue::Scalar(ScalarValue::List(scalars, _datatype)) => match scalars { + Some(scalars) => { + let array = ScalarValue::iter_to_array(scalars.into_iter()) + .context(error::DataFusionSnafu)?; + + Ok(array) + } + None => Ok(Arc::new(NullArray::new(DataType::Null, 0))), + }, + _ => unreachable!(), + } } else { ret_other_error_with(format!("Expect a vector or a constant, found {:?}", obj)).fail() }