refactor: put dataframe & query into greptime module (#1172)

* feat: impl getitem for `vector`

* feat: mv `query`&`dataframe` into `greptime` for PyO3

* refactor: allow call dataframe&query

* refactor: pyo3 query&dataframe

* chore: CR advices
This commit is contained in:
discord9
2023-03-15 11:01:43 +08:00
committed by GitHub
parent 242ce5c2aa
commit cbf64e65b9
15 changed files with 193 additions and 35 deletions

View File

@@ -379,7 +379,8 @@ import greptime as gt
@copr(args=["number"], returns = ["number"], sql = "select * from numbers")
def test(number) -> vector[u32]:
return query.sql("select * from numbers")[0][0]
from greptime import query
return query().sql("select * from numbers")[0][0]
"#;
let script = script_engine
.compile(script, CompileContext::default())
@@ -438,7 +439,8 @@ from greptime import col
@copr(args=["number"], returns = ["number"], sql = "select * from numbers")
def test(number) -> vector[u32]:
return dataframe.filter(col("number")==col("number")).collect()[0][0]
from greptime import dataframe
return dataframe().filter(col("number")==col("number")).collect()[0][0]
"#;
let script = script_engine
.compile(script, CompileContext::default())

View File

@@ -140,7 +140,7 @@ impl Coprocessor {
cols.len() == names.len() && names.len() == anno.len(),
OtherSnafu {
reason: format!(
"Unmatched length for cols({}), names({}) and anno({})",
"Unmatched length for cols({}), names({}) and annotation({})",
cols.len(),
names.len(),
anno.len()
@@ -335,7 +335,7 @@ pub fn exec_coprocessor(script: &str, rb: &Option<RecordBatch>) -> Result<Record
#[cfg_attr(feature = "pyo3_backend", pyo3class(name = "query_engine"))]
#[rspyclass(module = false, name = "query_engine")]
#[derive(Debug, PyPayload)]
#[derive(Debug, PyPayload, Clone)]
pub struct PyQueryEngine {
inner: QueryEngineWeakRef,
}

View File

@@ -184,7 +184,7 @@ fn eval_rspy(case: CodeBlockTestCase) {
#[cfg(feature = "pyo3_backend")]
fn eval_pyo3(case: CodeBlockTestCase) {
init_cpython_interpreter();
init_cpython_interpreter().unwrap();
Python::with_gil(|py| {
let locals = {
let locals_dict = PyDict::new(py);

View File

@@ -38,6 +38,55 @@ pub(super) fn generate_copr_intgrate_tests() -> Vec<CoprTestCase> {
vec![
CoprTestCase {
script: r#"
from greptime import vector
@copr(returns=["value"])
def boolean_array() -> vector[f64]:
v = vector([1.0, 2.0, 3.0])
# This returns a vector([2.0])
return v[(v > 1) & (v < 3)]
"#
.to_string(),
expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))),
},
#[cfg(feature = "pyo3_backend")]
CoprTestCase {
script: r#"
@copr(returns=["value"], backend="pyo3")
def boolean_array() -> vector[f64]:
from greptime import vector
from greptime import query, dataframe
try:
print("query()=", query())
except KeyError as e:
print("query()=", e)
try:
print("dataframe()=", dataframe())
except KeyError as e:
print("dataframe()=", e)
v = vector([1.0, 2.0, 3.0])
# This returns a vector([2.0])
return v[(v > 1) & (v < 3)]
"#
.to_string(),
expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))),
},
#[cfg(feature = "pyo3_backend")]
CoprTestCase {
script: r#"
@copr(returns=["value"], backend="pyo3")
def boolean_array() -> vector[f64]:
from greptime import vector
v = vector([1.0, 2.0, 3.0])
# This returns a vector([2.0])
return v[(v > 1) & (v < 3)]
"#
.to_string(),
expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))),
},
CoprTestCase {
script: r#"
@copr(args=["number", "number"],
returns=["value"],
sql="select number from numbers limit 5", backend="rspy")
@@ -107,9 +156,9 @@ def answer() -> vector[i64]:
script: r#"
@copr(args=[], returns = ["number"], sql = "select * from numbers", backend="rspy")
def answer() -> vector[i64]:
from greptime import vector, col, lit
from greptime import vector, col, lit, dataframe
expr_0 = (col("number")<lit(3)) & (col("number")>0)
ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0]
ret = dataframe().select([col("number")]).filter(expr_0).collect()[0][0]
return ret
"#
.to_string(),
@@ -120,10 +169,10 @@ def answer() -> vector[i64]:
script: r#"
@copr(args=[], returns = ["number"], sql = "select * from numbers", backend="pyo3")
def answer() -> vector[i64]:
from greptime import vector, col, lit
from greptime import vector, col, lit, dataframe
# Bitwise Operator pred comparison operator
expr_0 = (col("number")<lit(3)) & (col("number")>0)
ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0]
ret = dataframe().select([col("number")]).filter(expr_0).collect()[0][0]
return ret
"#
.to_string(),

View File

@@ -133,7 +133,7 @@ fn get_test_cases() -> Vec<TestCase> {
}
#[cfg(feature = "pyo3_backend")]
fn eval_pyo3(testcase: TestCase, locals: HashMap<String, PyVector>) {
init_cpython_interpreter();
init_cpython_interpreter().unwrap();
Python::with_gil(|py| {
let locals = {
let locals_dict = PyDict::new(py);

View File

@@ -20,10 +20,13 @@ use datafusion::physical_plan::expressions;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::{math_expressions, AggregateExpr};
use datatypes::vectors::VectorRef;
use pyo3::exceptions::PyValueError;
use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use super::dataframe_impl::PyDataFrame;
use super::utils::scalar_value_to_py_any;
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::utils::all_to_f64;
use crate::python::ffi_types::PyVector;
use crate::python::pyo3::dataframe_impl::{col, lit};
@@ -58,9 +61,12 @@ macro_rules! batch_import {
#[pyo3(name = "greptime")]
pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<PyVector>()?;
use self::query_engine;
batch_import!(
m,
[
dataframe,
query_engine,
lit,
col,
pow,
@@ -112,6 +118,34 @@ pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
Ok(())
}
fn get_globals(py: Python) -> PyResult<&PyDict> {
// TODO(discord9): check if this is sound(in python)
let py_main = PyModule::import(py, "__main__")?;
let globals = py_main.dict();
Ok(globals)
}
#[pyfunction]
fn dataframe(py: Python) -> PyResult<PyDataFrame> {
let globals = get_globals(py)?;
let df = globals
.get_item("__dataframe__")
.ok_or(PyKeyError::new_err("No __dataframe__ variable is found"))?
.extract::<PyDataFrame>()?;
Ok(df)
}
#[pyfunction]
#[pyo3(name = "query")]
fn query_engine(py: Python) -> PyResult<PyQueryEngine> {
let globals = get_globals(py)?;
let query = globals
.get_item("__query__")
.ok_or(PyKeyError::new_err("No __query__ variable is found"))?
.extract::<PyQueryEngine>()?;
Ok(query)
}
fn eval_func(py: Python<'_>, name: &str, v: &[&PyObject]) -> PyResult<PyVector> {
let v = to_array_of_py_vec(py, v)?;
py.allow_threads(|| {

View File

@@ -24,7 +24,6 @@ use snafu::{ensure, Backtrace, GenerateImplicitData, ResultExt};
use crate::python::error::{self, NewRecordBatchSnafu, OtherSnafu, Result};
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::{check_args_anno_real_type, select_from_rb, Coprocessor, PyVector};
use crate::python::pyo3::builtins::greptime_builtins;
use crate::python::pyo3::dataframe_impl::PyDataFrame;
use crate::python::pyo3::utils::{init_cpython_interpreter, pyo3_obj_try_to_typed_val};
@@ -57,6 +56,7 @@ impl PyQueryEngine {
}
// TODO: put this into greptime module
}
/// Execute a `Coprocessor` with given `RecordBatch`
pub(crate) fn pyo3_exec_parsed(
copr: &Coprocessor,
@@ -73,7 +73,7 @@ pub(crate) fn pyo3_exec_parsed(
Vec::new()
};
// Just in case cpython is not inited
init_cpython_interpreter();
init_cpython_interpreter().unwrap();
Python::with_gil(|py| -> Result<_> {
let mut cols = (|| -> PyResult<_> {
let dummy_decorator = "
@@ -86,7 +86,6 @@ def copr(*dummy, **kwdummy):
return func
return inner
coprocessor = copr
from greptime import vector
";
let gen_call = format!("\n_return_from_coprocessor = {}(*_args_for_coprocessor, **_kwargs_for_coprocessor)", copr.name);
let script = format!("{}{}{}", dummy_decorator, copr.script, gen_call);
@@ -109,14 +108,11 @@ from greptime import vector
let globals = py_main.dict();
let locals = PyDict::new(py);
let greptime = PyModule::new(py, "greptime")?;
greptime_builtins(py, greptime)?;
locals.set_item("greptime", greptime)?;
if let Some(engine) = &copr.query_engine {
let query_engine = PyQueryEngine::from_weakref(engine.clone());
let query_engine = PyCell::new(py, query_engine)?;
globals.set_item("query", query_engine)?;
globals.set_item("__query__", query_engine)?;
}
// TODO(discord9): find out why `dataframe` is not in scope
@@ -129,12 +125,12 @@ from greptime import vector
)
)?;
let dataframe = PyCell::new(py, dataframe)?;
globals.set_item("dataframe", dataframe)?;
globals.set_item("__dataframe__", dataframe)?;
}
locals.set_item("_args_for_coprocessor", args)?;
locals.set_item("_kwargs_for_coprocessor", kwargs)?;
// `greptime` is already import when init interpreter, so no need to set in here
// TODO(discord9): find a better way to set `dataframe` and `query` in scope/ or set it into module(latter might be impossible and not idomatic even in python)
// set `dataframe` and `query` in scope/ or set it into module
@@ -219,10 +215,10 @@ mod copr_test {
@copr(args=["cpu", "mem"], returns=["ref"], backend="pyo3")
def a(cpu, mem, **kwargs):
import greptime as gt
from greptime import vector, log2, sum, pow, col, lit
from greptime import vector, log2, sum, pow, col, lit, dataframe
for k, v in kwargs.items():
print("%s == %s" % (k, v))
print(dataframe.select([col("cpu")<lit(0.3)]).collect())
print(dataframe().select([col("cpu")<lit(0.3)]).collect())
return (0.5 < cpu) & ~( cpu >= 0.75)
"#;
let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]);

View File

@@ -26,6 +26,8 @@ use crate::python::ffi_types::PyVector;
use crate::python::pyo3::utils::pyo3_obj_try_to_typed_scalar_value;
use crate::python::utils::block_on_async;
type PyExprRef = Py<PyExpr>;
#[derive(Debug, Clone)]
#[pyclass]
pub(crate) struct PyDataFrame {
inner: DfDataFrame,
@@ -47,6 +49,9 @@ impl PyDataFrame {
#[pymethods]
impl PyDataFrame {
fn __call__(&self) -> PyResult<Self> {
Ok(self.clone())
}
fn select_columns(&self, columns: Vec<String>) -> PyResult<Self> {
Ok(self
.inner
@@ -252,6 +257,9 @@ pub(crate) fn col(name: String) -> PyExpr {
#[pymethods]
impl PyExpr {
fn __call__(&self) -> PyResult<Self> {
Ok(self.clone())
}
fn __richcmp__(&self, py: Python<'_>, other: PyObject, op: CompareOp) -> PyResult<Self> {
let other = other.extract::<Self>(py).or_else(|_| lit(py, other))?;
let op = match op {

View File

@@ -36,7 +36,9 @@ static START_PYO3: Lazy<Mutex<bool>> = Lazy::new(|| Mutex::new(false));
pub(crate) fn to_py_err(err: impl ToString) -> PyErr {
PyArrowException::new_err(err.to_string())
}
pub(crate) fn init_cpython_interpreter() {
/// init cpython interpreter with `greptime` builtins, if already inited, do nothing
pub(crate) fn init_cpython_interpreter() -> PyResult<()> {
let mut start = START_PYO3.lock().unwrap();
if !*start {
pyo3::append_to_inittab!(greptime_builtins);
@@ -44,6 +46,7 @@ pub(crate) fn init_cpython_interpreter() {
*start = true;
info!("Started CPython Interpreter");
}
Ok(())
}
pub fn val_to_py_any(py: Python<'_>, val: Value) -> PyResult<PyObject> {

View File

@@ -21,11 +21,12 @@ use datatypes::arrow::array::{Array, ArrayRef};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::vectors::Helper;
use pyo3::exceptions::PyValueError;
use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::pyclass::CompareOp;
use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyString, PyType};
use super::utils::val_to_py_any;
use crate::python::ffi_types::vector::{arrow_rtruediv, wrap_bool_result, wrap_result, PyVector};
use crate::python::pyo3::utils::{pyo3_obj_try_to_typed_val, to_py_err};
@@ -278,6 +279,45 @@ impl PyVector {
let v = Helper::try_into_vector(array).map_err(to_py_err)?;
Ok(v.into())
}
/// PyO3's Magic Method for slicing and indexing
fn __getitem__(&self, py: Python, needle: PyObject) -> PyResult<PyObject> {
if let Ok(needle) = needle.extract::<PyVector>(py) {
let mask = needle.to_arrow_array();
let mask = mask
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
PyValueError::new_err(
"A Boolean Array is requested for slicing, found {mask:?}",
)
})?;
let result = compute::filter(&self.to_arrow_array(), mask)
.map_err(|err| PyRuntimeError::new_err(format!("Arrow Error: {err:#?}")))?;
let ret = Helper::try_into_vector(result.clone()).map_err(|e| {
PyRuntimeError::new_err(format!("Can't cast result into vector, err: {e:?}"))
})?;
let ret = Self::from(ret).into_py(py);
Ok(ret)
} else if let Ok(index) = needle.extract::<isize>(py) {
// deal with negative index
let len = self.len() as isize;
let index = if index < 0 { len + index } else { index };
if index < 0 || index >= len {
return Err(PyIndexError::new_err(format!(
"Index out of bound, index: {index}, len: {len}",
index = index,
len = len
)));
}
let val = self.as_vector_ref().get(index as usize);
val_to_py_any(py, val)
} else {
Err(PyValueError::new_err(
"{needle:?} is neither a Vector nor a int, can't use for slicing or indexing",
))
}
}
}
#[cfg(test)]
@@ -317,7 +357,7 @@ mod test {
}
#[test]
fn test_py_vector_api() {
init_cpython_interpreter();
init_cpython_interpreter().unwrap();
Python::with_gil(|py| {
let module = PyModule::new(py, "gt").unwrap();
module.add_class::<PyVector>().unwrap();

View File

@@ -306,13 +306,38 @@ pub(crate) mod greptime_builtin {
all_to_f64, eval_aggr_fn, from_df_err, try_into_columnar_value, try_into_py_obj,
type_cast_error,
};
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::vector::val_to_pyobj;
use crate::python::ffi_types::PyVector;
use crate::python::rspython::dataframe_impl::data_frame::{PyExpr, PyExprRef};
use crate::python::rspython::dataframe_impl::data_frame::{PyDataFrame, PyExpr, PyExprRef};
use crate::python::rspython::utils::{
is_instance, py_obj_to_value, py_obj_to_vec, PyVectorRef,
};
/// get `__dataframe__` from globals and return it
/// TODO(discord9): this is a terrible hack, we should find a better way to get `__dataframe__`
#[pyfunction]
fn dataframe(vm: &VirtualMachine) -> PyResult<PyDataFrame> {
let df = vm.current_globals().get_item("__dataframe__", vm)?;
let df = df
.payload::<PyDataFrame>()
.ok_or_else(|| vm.new_type_error(format!("object {:?} is not a DataFrame", df)))?;
let df = df.clone();
Ok(df)
}
/// get `__query__` from globals and return it
/// TODO(discord9): this is a terrible hack, we should find a better way to get `__dataframe__`
#[pyfunction]
fn query(vm: &VirtualMachine) -> PyResult<PyQueryEngine> {
let query_engine = vm.current_globals().get_item("__query__", vm)?;
let query_engine = query_engine.payload::<PyQueryEngine>().ok_or_else(|| {
vm.new_type_error(format!("object {:?} is not a QueryEngine", query_engine))
})?;
let query_engine = query_engine.clone();
Ok(query_engine)
}
#[pyfunction]
fn vector(args: OptionalArg<PyObjectRef>, vm: &VirtualMachine) -> PyResult<PyVector> {
PyVector::new(args, vm)

View File

@@ -81,12 +81,13 @@ fn set_items_in_scope(
fn set_query_engine_in_scope(
scope: &Scope,
vm: &VirtualMachine,
name: &str,
query_engine: PyQueryEngine,
) -> Result<()> {
scope
.locals
.as_object()
.set_item("query", query_engine.to_pyobject(vm), vm)
.set_item(name, query_engine.to_pyobject(vm), vm)
.map_err(|e| format_py_error(e, vm))
}
@@ -101,7 +102,7 @@ pub(crate) fn exec_with_cached_vm(
// set arguments with given name and values
let scope = vm.new_scope_with_builtins();
if let Some(rb) = rb {
set_dataframe_in_scope(&scope, vm, "dataframe", rb)?;
set_dataframe_in_scope(&scope, vm, "__dataframe__", rb)?;
}
if let Some(arg_names) = &copr.deco_args.arg_names {
@@ -113,7 +114,7 @@ pub(crate) fn exec_with_cached_vm(
let query_engine = PyQueryEngine::from_weakref(engine.clone());
// put a object named with query of class PyQueryEngine in scope
set_query_engine_in_scope(&scope, vm, query_engine)?;
set_query_engine_in_scope(&scope, vm, "__query__", query_engine)?;
}
if let Some(kwarg) = &copr.kwarg {

View File

@@ -38,7 +38,7 @@ pub(crate) mod data_frame {
use crate::python::rspython::builtins::greptime_builtin::lit;
use crate::python::utils::block_on_async;
#[rspyclass(module = "data_frame", name = "DataFrame")]
#[derive(PyPayload, Debug)]
#[derive(PyPayload, Debug, Clone)]
pub struct PyDataFrame {
pub inner: DfDataFrame,
}

View File

@@ -287,7 +287,7 @@ def a(cpu, mem):
abc = vector([v[0] > v[1] for v in zip(cpu, mem)])
fed = cpu.filter(abc)
ref = log2(fed/prev(fed))
return (0.5 < cpu) & ~( cpu >= 0.75)
return cpu[(cpu > 0.5) & ~( cpu >= 0.75)]
"#;
let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]);
let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]);

View File

@@ -559,11 +559,11 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
// constant column(int)
name: "test_data_frame",
code: r#"
from greptime import col
from greptime import col, dataframe
@copr(args=["cpu", "mem"], returns=["perf", "what"])
def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
vector[f32]):
ret = dataframe.select([col("cpu"), col("mem")]).collect()[0]
ret = dataframe().select([col("cpu"), col("mem")]).collect()[0]
return ret[0], ret[1]
"#,
predicate: ExecIsOk(
@@ -593,11 +593,11 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
// constant column(int)
name: "test_data_frame",
code: r#"
from greptime import col
from greptime import col, dataframe
@copr(args=["cpu", "mem"], returns=["perf", "what"])
def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
vector[f32]):
ret = dataframe.filter(col("cpu")>col("mem")).collect()[0]
ret = dataframe().filter(col("cpu")>col("mem")).collect()[0]
return ret[0], ret[1]
"#,
predicate: ExecIsOk(