From b4fc8c5b782231ce5ffac81d351f40e83d9f1680 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 27 Mar 2023 08:50:19 +0800 Subject: [PATCH] refactor: make sql function in scripts return a list of column vectors (#1243) --- src/script/src/python/engine.rs | 2 +- src/script/src/python/ffi_types.rs | 2 +- src/script/src/python/ffi_types/copr.rs | 48 ++++++++++++---------- src/script/src/python/ffi_types/vector.rs | 2 + src/script/src/python/pyo3/vector_impl.rs | 18 ++++++++ src/script/src/python/rspython/builtins.rs | 6 +-- src/script/src/python/rspython/utils.rs | 4 +- 7 files changed, 51 insertions(+), 31 deletions(-) diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 60e3f02e7d..73bdb7a575 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -380,7 +380,7 @@ import greptime as gt @copr(args=["number"], returns = ["number"], sql = "select * from numbers") def test(number) -> vector[u32]: from greptime import query - return query().sql("select * from numbers")[0][0] + return query().sql("select * from numbers")[0] "#; let script = script_engine .compile(script, CompileContext::default()) diff --git a/src/script/src/python/ffi_types.rs b/src/script/src/python/ffi_types.rs index 10e4c02a83..6506585e23 100644 --- a/src/script/src/python/ffi_types.rs +++ b/src/script/src/python/ffi_types.rs @@ -16,6 +16,6 @@ pub(crate) mod copr; pub(crate) mod utils; pub(crate) mod vector; pub(crate) use copr::{check_args_anno_real_type, select_from_rb, Coprocessor}; -pub(crate) use vector::PyVector; +pub(crate) use vector::{PyVector, PyVectorRef}; #[cfg(test)] mod pair_tests; diff --git a/src/script/src/python/ffi_types/copr.rs b/src/script/src/python/ffi_types/copr.rs index 1ca9f9b482..c62108175c 100644 --- a/src/script/src/python/ffi_types/copr.rs +++ b/src/script/src/python/ffi_types/copr.rs @@ -356,11 +356,9 @@ impl PyQueryEngine { rbs.iter().map(|r| r.df_record_batch()), ) .map_err(|e| format!("Concat batches failed for query {sql}: {e}"))?; - RecordBatch::try_from_df_record_batch(rbs.schema(), rb).map_err(|e| - format!( - "Convert datafusion record batch to record batch failed for query {sql}: {e}" - ) - ) + + RecordBatch::try_from_df_record_batch(rbs.schema(), rb) + .map_err(|e| format!("Convert datafusion record batch to record batch failed for query {sql}: {e}")) } Either::AffectedRows(_) => Err(format!("Expect actual results from query {sql}")), } @@ -414,30 +412,36 @@ impl PyQueryEngine { .map_err(|e| format!("Dedicated thread for sql query panic: {e:?}"))? } // TODO(discord9): find a better way to call sql query api, now we don't if we are in async context or not - /// return sql query results in List[List[PyVector]], or List[usize] for AffectedRows number if no recordbatches is returned + /// return sql query results in List[PyVector], or List[usize] for AffectedRows number if no recordbatches is returned #[pymethod] fn sql(&self, s: String, vm: &VirtualMachine) -> PyResult { self.query_with_new_thread(s) .map_err(|e| vm.new_system_error(e)) .map(|rbs| match rbs { Either::Rb(rbs) => { - let mut top_vec = Vec::with_capacity(rbs.iter().count()); - for rb in rbs.iter() { - let mut vec_of_vec = Vec::with_capacity(rb.columns().len()); - for v in rb.columns() { - let v = PyVector::from(v.clone()); - vec_of_vec.push(v.to_pyobject(vm)); - } - let vec_of_vec = PyList::new_ref(vec_of_vec, vm.as_ref()).to_pyobject(vm); - top_vec.push(vec_of_vec); - } - let top_vec = PyList::new_ref(top_vec, vm.as_ref()); - top_vec + let rb = compute::concat_batches( + rbs.schema().arrow_schema(), + rbs.iter().map(|rb| rb.df_record_batch()), + ) + .map_err(|e| { + vm.new_runtime_error(format!("Failed to concat batches: {e:#?}")) + })?; + let rb = + RecordBatch::try_from_df_record_batch(rbs.schema(), rb).map_err(|e| { + vm.new_runtime_error(format!("Failed to cast recordbatch: {e:#?}")) + })?; + let columns_vectors = rb + .columns() + .iter() + .map(|v| PyVector::from(v.clone()).to_pyobject(vm)) + .collect::>(); + Ok(PyList::new_ref(columns_vectors, vm.as_ref())) } - Either::AffectedRows(cnt) => { - PyList::new_ref(vec![vm.ctx.new_int(cnt).into()], vm.as_ref()) - } - }) + Either::AffectedRows(cnt) => Ok(PyList::new_ref( + vec![vm.ctx.new_int(cnt).into()], + vm.as_ref(), + )), + })? } } diff --git a/src/script/src/python/ffi_types/vector.rs b/src/script/src/python/ffi_types/vector.rs index 3660e6c43d..fa1e0e4a4c 100644 --- a/src/script/src/python/ffi_types/vector.rs +++ b/src/script/src/python/ffi_types/vector.rs @@ -49,6 +49,8 @@ pub struct PyVector { pub(crate) vector: VectorRef, } +pub(crate) type PyVectorRef = PyRef; + impl From for PyVector { fn from(vector: VectorRef) -> Self { Self { vector } diff --git a/src/script/src/python/pyo3/vector_impl.rs b/src/script/src/python/pyo3/vector_impl.rs index d26309aacd..ffbb570d60 100644 --- a/src/script/src/python/pyo3/vector_impl.rs +++ b/src/script/src/python/pyo3/vector_impl.rs @@ -236,6 +236,24 @@ impl PyVector { fn __invert__(&self) -> PyResult { Self::vector_invert(self).map_err(PyValueError::new_err) } + + #[pyo3(name = "concat")] + fn pyo3_concat(&self, py: Python<'_>, other: &Self) -> PyResult { + py.allow_threads(|| { + let left = self.to_arrow_array(); + let right = other.to_arrow_array(); + + let res = compute::concat(&[left.as_ref(), right.as_ref()]); + let res = res.map_err(|err| PyValueError::new_err(format!("Arrow Error: {err:#?}")))?; + let ret = Helper::try_into_vector(res.clone()).map_err(|e| { + PyValueError::new_err(format!( + "Can't cast result into vector, result: {res:?}, err: {e:?}", + )) + })?; + Ok(ret.into()) + }) + } + /// take a boolean array and filters the Array, returning elements matching the filter (i.e. where the values are true). #[pyo3(name = "filter")] fn pyo3_filter(&self, py: Python<'_>, other: &Self) -> PyResult { diff --git a/src/script/src/python/rspython/builtins.rs b/src/script/src/python/rspython/builtins.rs index 82a6d77b2b..c2d80f1471 100644 --- a/src/script/src/python/rspython/builtins.rs +++ b/src/script/src/python/rspython/builtins.rs @@ -307,15 +307,13 @@ pub(crate) mod greptime_builtin { use crate::python::ffi_types::copr::PyQueryEngine; use crate::python::ffi_types::vector::val_to_pyobj; - use crate::python::ffi_types::PyVector; + use crate::python::ffi_types::{PyVector, PyVectorRef}; use crate::python::rspython::builtins::{ all_to_f64, eval_aggr_fn, from_df_err, try_into_columnar_value, try_into_py_obj, type_cast_error, }; use crate::python::rspython::dataframe_impl::data_frame::{PyExpr, PyExprRef}; - use crate::python::rspython::utils::{ - is_instance, py_obj_to_value, py_obj_to_vec, PyVectorRef, - }; + use crate::python::rspython::utils::{is_instance, py_obj_to_value, py_obj_to_vec}; #[pyattr] #[pyclass(module = "greptime_builtin", name = "PyDataFrame")] diff --git a/src/script/src/python/rspython/utils.rs b/src/script/src/python/rspython/utils.rs index 7482067f25..24f34894a6 100644 --- a/src/script/src/python/rspython/utils.rs +++ b/src/script/src/python/rspython/utils.rs @@ -22,7 +22,7 @@ use datatypes::vectors::{ BooleanVector, Float64Vector, Helper, Int64Vector, NullVector, StringVector, VectorRef, }; use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr}; -use rustpython_vm::{PyObjectRef, PyPayload, PyRef, PyResult, VirtualMachine}; +use rustpython_vm::{PyObjectRef, PyPayload, PyResult, VirtualMachine}; use snafu::{Backtrace, GenerateImplicitData, OptionExt, ResultExt}; use crate::python::error; @@ -30,8 +30,6 @@ use crate::python::error::ret_other_error_with; use crate::python::ffi_types::PyVector; use crate::python::rspython::builtins::try_into_columnar_value; -pub(crate) type PyVectorRef = PyRef; - /// use `rustpython`'s `is_instance` method to check if a PyObject is a instance of class. /// if `PyResult` is Err, then this function return `false` pub fn is_instance(obj: &PyObjectRef, vm: &VirtualMachine) -> bool {