refactor: make sql function in scripts return a list of column vectors (#1243)

This commit is contained in:
dennis zhuang
2023-03-27 08:50:19 +08:00
committed by GitHub
parent 6f81717866
commit b4fc8c5b78
7 changed files with 51 additions and 31 deletions

View File

@@ -380,7 +380,7 @@ import greptime as gt
@copr(args=["number"], returns = ["number"], sql = "select * from numbers")
def test(number) -> vector[u32]:
from greptime import query
return query().sql("select * from numbers")[0][0]
return query().sql("select * from numbers")[0]
"#;
let script = script_engine
.compile(script, CompileContext::default())

View File

@@ -16,6 +16,6 @@ pub(crate) mod copr;
pub(crate) mod utils;
pub(crate) mod vector;
pub(crate) use copr::{check_args_anno_real_type, select_from_rb, Coprocessor};
pub(crate) use vector::PyVector;
pub(crate) use vector::{PyVector, PyVectorRef};
#[cfg(test)]
mod pair_tests;

View File

@@ -356,11 +356,9 @@ impl PyQueryEngine {
rbs.iter().map(|r| r.df_record_batch()),
)
.map_err(|e| format!("Concat batches failed for query {sql}: {e}"))?;
RecordBatch::try_from_df_record_batch(rbs.schema(), rb).map_err(|e|
format!(
"Convert datafusion record batch to record batch failed for query {sql}: {e}"
)
)
RecordBatch::try_from_df_record_batch(rbs.schema(), rb)
.map_err(|e| format!("Convert datafusion record batch to record batch failed for query {sql}: {e}"))
}
Either::AffectedRows(_) => Err(format!("Expect actual results from query {sql}")),
}
@@ -414,30 +412,36 @@ impl PyQueryEngine {
.map_err(|e| format!("Dedicated thread for sql query panic: {e:?}"))?
}
// TODO(discord9): find a better way to call sql query api, now we don't if we are in async context or not
/// return sql query results in List[List[PyVector]], or List[usize] for AffectedRows number if no recordbatches is returned
/// return sql query results in List[PyVector], or List[usize] for AffectedRows number if no recordbatches is returned
#[pymethod]
fn sql(&self, s: String, vm: &VirtualMachine) -> PyResult<PyListRef> {
self.query_with_new_thread(s)
.map_err(|e| vm.new_system_error(e))
.map(|rbs| match rbs {
Either::Rb(rbs) => {
let mut top_vec = Vec::with_capacity(rbs.iter().count());
for rb in rbs.iter() {
let mut vec_of_vec = Vec::with_capacity(rb.columns().len());
for v in rb.columns() {
let v = PyVector::from(v.clone());
vec_of_vec.push(v.to_pyobject(vm));
}
let vec_of_vec = PyList::new_ref(vec_of_vec, vm.as_ref()).to_pyobject(vm);
top_vec.push(vec_of_vec);
}
let top_vec = PyList::new_ref(top_vec, vm.as_ref());
top_vec
let rb = compute::concat_batches(
rbs.schema().arrow_schema(),
rbs.iter().map(|rb| rb.df_record_batch()),
)
.map_err(|e| {
vm.new_runtime_error(format!("Failed to concat batches: {e:#?}"))
})?;
let rb =
RecordBatch::try_from_df_record_batch(rbs.schema(), rb).map_err(|e| {
vm.new_runtime_error(format!("Failed to cast recordbatch: {e:#?}"))
})?;
let columns_vectors = rb
.columns()
.iter()
.map(|v| PyVector::from(v.clone()).to_pyobject(vm))
.collect::<Vec<_>>();
Ok(PyList::new_ref(columns_vectors, vm.as_ref()))
}
Either::AffectedRows(cnt) => {
PyList::new_ref(vec![vm.ctx.new_int(cnt).into()], vm.as_ref())
}
})
Either::AffectedRows(cnt) => Ok(PyList::new_ref(
vec![vm.ctx.new_int(cnt).into()],
vm.as_ref(),
)),
})?
}
}

View File

@@ -49,6 +49,8 @@ pub struct PyVector {
pub(crate) vector: VectorRef,
}
pub(crate) type PyVectorRef = PyRef<PyVector>;
impl From<VectorRef> for PyVector {
fn from(vector: VectorRef) -> Self {
Self { vector }

View File

@@ -236,6 +236,24 @@ impl PyVector {
fn __invert__(&self) -> PyResult<Self> {
Self::vector_invert(self).map_err(PyValueError::new_err)
}
#[pyo3(name = "concat")]
fn pyo3_concat(&self, py: Python<'_>, other: &Self) -> PyResult<Self> {
py.allow_threads(|| {
let left = self.to_arrow_array();
let right = other.to_arrow_array();
let res = compute::concat(&[left.as_ref(), right.as_ref()]);
let res = res.map_err(|err| PyValueError::new_err(format!("Arrow Error: {err:#?}")))?;
let ret = Helper::try_into_vector(res.clone()).map_err(|e| {
PyValueError::new_err(format!(
"Can't cast result into vector, result: {res:?}, err: {e:?}",
))
})?;
Ok(ret.into())
})
}
/// take a boolean array and filters the Array, returning elements matching the filter (i.e. where the values are true).
#[pyo3(name = "filter")]
fn pyo3_filter(&self, py: Python<'_>, other: &Self) -> PyResult<Self> {

View File

@@ -307,15 +307,13 @@ pub(crate) mod greptime_builtin {
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::vector::val_to_pyobj;
use crate::python::ffi_types::PyVector;
use crate::python::ffi_types::{PyVector, PyVectorRef};
use crate::python::rspython::builtins::{
all_to_f64, eval_aggr_fn, from_df_err, try_into_columnar_value, try_into_py_obj,
type_cast_error,
};
use crate::python::rspython::dataframe_impl::data_frame::{PyExpr, PyExprRef};
use crate::python::rspython::utils::{
is_instance, py_obj_to_value, py_obj_to_vec, PyVectorRef,
};
use crate::python::rspython::utils::{is_instance, py_obj_to_value, py_obj_to_vec};
#[pyattr]
#[pyclass(module = "greptime_builtin", name = "PyDataFrame")]

View File

@@ -22,7 +22,7 @@ use datatypes::vectors::{
BooleanVector, Float64Vector, Helper, Int64Vector, NullVector, StringVector, VectorRef,
};
use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr};
use rustpython_vm::{PyObjectRef, PyPayload, PyRef, PyResult, VirtualMachine};
use rustpython_vm::{PyObjectRef, PyPayload, PyResult, VirtualMachine};
use snafu::{Backtrace, GenerateImplicitData, OptionExt, ResultExt};
use crate::python::error;
@@ -30,8 +30,6 @@ use crate::python::error::ret_other_error_with;
use crate::python::ffi_types::PyVector;
use crate::python::rspython::builtins::try_into_columnar_value;
pub(crate) type PyVectorRef = PyRef<PyVector>;
/// use `rustpython`'s `is_instance` method to check if a PyObject is a instance of class.
/// if `PyResult` is Err, then this function return `false`
pub fn is_instance<T: PyPayload>(obj: &PyObjectRef, vm: &VirtualMachine) -> bool {