feat: some improvements on python coprocessor (#423)

* feat: supports list array in arrow_array_get

* feat: supports string and list type conversions in python coprocessor

* test: add test cases for returning list in coprocessor
This commit is contained in:
dennis zhuang
2022-11-10 11:53:27 +08:00
committed by GitHub
parent 6288fdb6bc
commit e62b302fb2
7 changed files with 145 additions and 26 deletions

View File

@@ -1,6 +1,6 @@
use arrow::array::{
self, Array, BinaryArray as ArrowBinaryArray, MutableBinaryArray as ArrowMutableBinaryArray,
MutableUtf8Array, PrimitiveArray, Utf8Array,
self, Array, BinaryArray as ArrowBinaryArray, ListArray,
MutableBinaryArray as ArrowMutableBinaryArray, MutableUtf8Array, PrimitiveArray, Utf8Array,
};
use arrow::datatypes::DataType as ArrowDataType;
use common_time::timestamp::Timestamp;
@@ -8,7 +8,7 @@ use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result};
use crate::prelude::ConcreteDataType;
use crate::value::Value;
use crate::value::{ListValue, Value};
pub type BinaryArray = ArrowBinaryArray<i64>;
pub type MutableBinaryArray = ArrowMutableBinaryArray<i64>;
@@ -69,7 +69,14 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result<Value> {
};
Value::Timestamp(Timestamp::new(value, unit))
}
// TODO(sunng87): List
ArrowDataType::List(_) => {
let array = cast_array!(array, ListArray::<i32>).value(idx);
let inner_datatype = ConcreteDataType::try_from(array.data_type())?;
let values = (0..array.len())
.map(|i| arrow_array_get(&*array, i))
.collect::<Result<Vec<Value>>>()?;
Value::List(ListValue::new(Some(Box::new(values)), inner_datatype))
}
_ => unimplemented!("Arrow array datatype: {:?}", array.data_type()),
};
@@ -80,6 +87,7 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result<Value> {
mod test {
use arrow::array::Int64Array as ArrowI64Array;
use arrow::array::*;
use arrow::array::{MutableListArray, MutablePrimitiveArray, TryExtend};
use arrow::buffer::Buffer;
use arrow::datatypes::{DataType, TimeUnit as ArrowTimeUnit};
use common_time::timestamp::{TimeUnit, Timestamp};
@@ -164,5 +172,40 @@ mod test {
Value::Timestamp(Timestamp::new(1, TimeUnit::Nanosecond)),
arrow_array_get(&array4, 0).unwrap()
);
// test list array
let data = vec![
Some(vec![Some(1i32), Some(2), Some(3)]),
None,
Some(vec![Some(4), None, Some(6)]),
];
let mut arrow_array = MutableListArray::<i32, MutablePrimitiveArray<i32>>::new();
arrow_array.try_extend(data).unwrap();
let arrow_array: ListArray<i32> = arrow_array.into();
let v0 = arrow_array_get(&arrow_array, 0).unwrap();
match v0 {
Value::List(list) => {
assert!(matches!(list.datatype(), ConcreteDataType::Int32(_)));
let items = list.items().as_ref().unwrap();
assert_eq!(
**items,
vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)]
);
}
_ => unreachable!(),
}
assert_eq!(Value::Null, arrow_array_get(&arrow_array, 1).unwrap());
let v2 = arrow_array_get(&arrow_array, 2).unwrap();
match v2 {
Value::List(list) => {
assert!(matches!(list.datatype(), ConcreteDataType::Int32(_)));
let items = list.items().as_ref().unwrap();
assert_eq!(**items, vec![Value::Int32(4), Value::Null, Value::Int32(6)]);
}
_ => unreachable!(),
}
}
}

View File

@@ -15,7 +15,7 @@ use datatypes::vectors::Helper as HelperVec;
use rustpython_vm::builtins::PyList;
use rustpython_vm::pymodule;
use rustpython_vm::{
builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt},
builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyStr},
AsObject, PyObjectRef, PyPayload, PyResult, VirtualMachine,
};
@@ -50,14 +50,15 @@ fn collect_diff_types_string(values: &[ScalarValue], ty: &DataType) -> String {
///
/// supported scalar are(leftside is python data type, right side is rust type):
///
/// | Python | Rust |
/// | ------ | ---- |
/// | integer| i64 |
/// | float | f64 |
/// | bool | bool |
/// | vector | array|
/// | Python | Rust |
/// | ------ | ------ |
/// | integer| i64 |
/// | float | f64 |
/// | str | String |
/// | bool | bool |
/// | vector | array |
/// | list | `ScalarValue::List` |
fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<DFColValue> {
pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<DFColValue> {
if is_instance::<PyVector>(&obj, vm) {
let ret = obj
.payload::<PyVector>()
@@ -67,6 +68,9 @@ fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<DF
// Note that a `PyBool` is also a `PyInt`, so check if it is a bool first to get a more precise type
let ret = obj.try_into_value::<bool>(vm)?;
Ok(DFColValue::Scalar(ScalarValue::Boolean(Some(ret))))
} else if is_instance::<PyStr>(&obj, vm) {
let ret = obj.try_into_value::<String>(vm)?;
Ok(DFColValue::Scalar(ScalarValue::Utf8(Some(ret))))
} else if is_instance::<PyInt>(&obj, vm) {
let ret = obj.try_into_value::<i64>(vm)?;
Ok(DFColValue::Scalar(ScalarValue::Int64(Some(ret))))
@@ -92,10 +96,10 @@ fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<DF
.collect::<Result<_, _>>()?;
if ret.is_empty() {
//TODO(dennis): empty list, we set type as f64.
// TODO(dennis): empty list, we set type as null.
return Ok(DFColValue::Scalar(ScalarValue::List(
None,
Box::new(DataType::Float64),
Box::new(DataType::Null),
)));
}

View File

@@ -1,4 +1,4 @@
// This is the file for UDF&UDAF binding from datafusion,
// This is the file for UDF&UDAF binding from datafusion,
// including most test for those function(except ApproxMedian which datafusion didn't implement)
// check src/script/builtins/test.rs::run_builtin_fn_testcases() for more information
[
@@ -632,7 +632,7 @@ variance_pop(values)"#,
ty: Float64
))
),
// GrepTime's own UDF
TestCase(
@@ -782,7 +782,7 @@ sin(num)"#,
script: r#"
from greptime import *
sin(num)"#,
expect: Err("Can't cast object of type str into vector or scalar")
expect: Err("TypeError: Can't cast type Utf8 to Float64")
),
TestCase(
input: {},

View File

@@ -13,7 +13,7 @@ use datatypes::arrow::compute::cast::CastOptions;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::schema::Schema;
use datatypes::vectors::Helper;
use datatypes::vectors::{BooleanVector, Vector, VectorRef};
use datatypes::vectors::{BooleanVector, StringVector, Vector, VectorRef};
use rustpython_bytecode::CodeObject;
use rustpython_vm as vm;
use rustpython_vm::{class::PyClassImpl, AsObject};
@@ -186,10 +186,11 @@ fn try_into_py_vector(fetch_args: Vec<ArrayRef>) -> Result<Vec<PyVector>> {
DataType::Int16 => try_into_vector::<i16>(arg)?,
DataType::Int32 => try_into_vector::<i32>(arg)?,
DataType::Int64 => try_into_vector::<i64>(arg)?,
DataType::Utf8 => {
Arc::new(StringVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _
}
DataType::Boolean => {
let v: VectorRef =
Arc::new(BooleanVector::try_from_arrow_array(arg).context(TypeCastSnafu)?);
v
Arc::new(BooleanVector::try_from_arrow_array(arg).context(TypeCastSnafu)?) as _
}
_ => {
return ret_other_error_with(format!(

View File

@@ -1,5 +1,6 @@
use common_error::prelude::{ErrorCompat, ErrorExt, StatusCode};
use console::{style, Style};
use datafusion::error::DataFusionError;
use datatypes::arrow::error::ArrowError;
use datatypes::error::Error as DataTypeError;
use query::error::Error as QueryError;
@@ -50,6 +51,12 @@ pub enum Error {
source: ArrowError,
},
#[snafu(display("DataFusion error: {}", source))]
DataFusion {
backtrace: Backtrace,
source: DataFusionError,
},
/// errors in coprocessors' parse check for types and etc.
#[snafu(display("Coprocessor error: {} {}.", reason,
if let Some(loc) = loc{
@@ -93,9 +100,10 @@ impl From<QueryError> for Error {
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::Arrow { .. } | Error::PyRuntime { .. } | Error::Other { .. } => {
StatusCode::Internal
}
Error::DataFusion { .. }
| Error::Arrow { .. }
| Error::PyRuntime { .. }
| Error::Other { .. } => StatusCode::Internal,
Error::RecordBatch { source } => source.status_code(),
Error::DatabaseQuery { source } => source.status_code(),

View File

@@ -362,6 +362,39 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
]
)
),
(
name: "constant_list",
code: r#"
@copr(args=["cpu", "mem"], returns=["what"])
def a(cpu: vector[f32], mem: vector[f64]):
return ["apple" ,"banana", "cherry"]
"#,
predicate: ExecIsOk(
fields: [
(
datatype: Some(Utf8),
is_nullable: false,
),
],
columns: [
(
ty: Utf8,
len: 3
),
]
)
),
(
name: "constant_list_different_type",
code: r#"
@copr(args=["cpu", "mem"], returns=["what"])
def a(cpu: vector[f32], mem: vector[f64]):
return ["apple" ,3, "cherry"]
"#,
predicate: ExecIsErr(
reason: "All elements in a list should be same type to cast to Datafusion list!",
)
),
(
// expect 4 vector ,found 5
name: "ret_nums_wrong",

View File

@@ -1,11 +1,16 @@
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, BooleanArray, PrimitiveArray};
use rustpython_vm::builtins::{PyBool, PyFloat, PyInt};
use datafusion::arrow::array::{ArrayRef, BooleanArray, NullArray, PrimitiveArray, Utf8Array};
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue as DFColValue;
use datatypes::arrow::datatypes::DataType;
use rustpython_vm::builtins::{PyBool, PyFloat, PyInt, PyList, PyStr};
use rustpython_vm::{builtins::PyBaseExceptionRef, PyObjectRef, PyPayload, PyRef, VirtualMachine};
use snafu::OptionExt;
use snafu::ResultExt;
use snafu::{Backtrace, GenerateImplicitData};
use crate::python::builtins::try_into_columnar_value;
use crate::python::error;
use crate::python::error::ret_other_error_with;
use crate::python::PyVector;
@@ -39,6 +44,7 @@ pub fn py_vec_obj_to_array(
vm: &VirtualMachine,
col_len: usize,
) -> Result<ArrayRef, error::Error> {
// It's ugly, but we can't find a better way right now.
if is_instance::<PyVector>(obj, vm) {
let pyv = obj.payload::<PyVector>().with_context(|| {
ret_other_error_with(format!("can't cast obj {:?} to PyVector", obj))
@@ -66,6 +72,30 @@ pub fn py_vec_obj_to_array(
let ret = BooleanArray::from_iter(std::iter::repeat(Some(val)).take(col_len));
Ok(Arc::new(ret) as _)
} else if is_instance::<PyStr>(obj, vm) {
let val = obj
.to_owned()
.try_into_value::<String>(vm)
.map_err(|e| format_py_error(e, vm))?;
let ret = Utf8Array::<i32>::from_iter(std::iter::repeat(Some(val)).take(col_len));
Ok(Arc::new(ret) as _)
} else if is_instance::<PyList>(obj, vm) {
let columnar_value =
try_into_columnar_value(obj.clone(), vm).map_err(|e| format_py_error(e, vm))?;
match columnar_value {
DFColValue::Scalar(ScalarValue::List(scalars, _datatype)) => match scalars {
Some(scalars) => {
let array = ScalarValue::iter_to_array(scalars.into_iter())
.context(error::DataFusionSnafu)?;
Ok(array)
}
None => Ok(Arc::new(NullArray::new(DataType::Null, 0))),
},
_ => unreachable!(),
}
} else {
ret_other_error_with(format!("Expect a vector or a constant, found {:?}", obj)).fail()
}