refactor: scripts perf and metrics (#1261)

* refactor: retrieve pyvector datatype by inner vector

* perf: replace all ok_or to ok_or_else

* feat: adds metrics for scripts execution
This commit is contained in:
dennis zhuang
2023-03-28 10:07:21 +08:00
committed by GitHub
parent 5edd2a3dbe
commit 0ffa628c22
11 changed files with 88 additions and 43 deletions

View File

@@ -16,6 +16,7 @@
mod engine;
pub mod error;
pub(crate) mod metric;
pub(crate) mod utils;
pub use self::engine::{PyEngine, PyScript};

View File

@@ -203,19 +203,19 @@ impl CoprStream {
) -> Result<Self> {
let mut schema = vec![];
for (ty, name) in copr.return_types.iter().zip(&copr.deco_args.ret_names) {
let ty = ty.clone().ok_or(
let ty = ty.clone().ok_or_else(|| {
PyRuntimeSnafu {
msg: "return type not annotated, can't generate schema",
}
.build(),
)?;
.build()
})?;
let is_nullable = ty.is_nullable;
let ty = ty.datatype.ok_or(
let ty = ty.datatype.ok_or_else(|| {
PyRuntimeSnafu {
msg: "return type not annotated, can't generate schema",
}
.build(),
)?;
.build()
})?;
let col_schema = ColumnSchema::new(name, ty, is_nullable);
schema.push(col_schema);
}

View File

@@ -20,7 +20,6 @@ use std::result::Result as StdResult;
use std::sync::{Arc, Weak};
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::arrow::array::Array;
use datatypes::arrow::compute;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
@@ -243,15 +242,14 @@ pub(crate) fn check_args_anno_real_type(
);
for (idx, arg) in args.iter().enumerate() {
let anno_ty = copr.arg_types[idx].clone();
let real_ty = arg.to_arrow_array().data_type().clone();
let real_ty = ConcreteDataType::from_arrow_type(&real_ty);
let real_ty = arg.data_type();
let arg_name = arg_names[idx].clone();
let col_idx = rb.schema.column_index_by_name(&arg_name).ok_or(
let col_idx = rb.schema.column_index_by_name(&arg_name).ok_or_else(|| {
OtherSnafu {
reason: format!("Can't find column by name {arg_name}"),
}
.build(),
)?;
.build()
})?;
let is_nullable: bool = rb.schema.column_schemas()[col_idx].is_nullable();
ensure!(
anno_ty

View File

@@ -25,7 +25,7 @@ use datatypes::arrow::compute::kernels::{arithmetic, comparison};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::data_type::DataType;
use datatypes::prelude::Value;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::value::{self, OrderedFloat};
use datatypes::vectors::{Helper, NullVector, VectorRef};
#[cfg(feature = "pyo3_backend")]
@@ -136,6 +136,16 @@ impl AsRef<PyVector> for PyVector {
}
impl PyVector {
#[inline]
pub(crate) fn data_type(&self) -> ConcreteDataType {
self.vector.data_type()
}
#[inline]
pub(crate) fn arrow_data_type(&self) -> ArrowDataType {
self.vector.data_type().as_arrow_type()
}
pub(crate) fn vector_and(left: &Self, right: &Self) -> Result<Self, String> {
let left = left.to_arrow_array();
let right = right.to_arrow_array();

View File

@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Script engine metrics
pub static METRIC_RSPY_INIT_ELAPSED: &str = "script.rspy.init_elapsed";
pub static METRIC_RSPY_EXEC_ELAPSED: &str = "script.rspy.exec_elapsed";
pub static METRIC_RSPY_EXEC_TOTAL_ELAPSED: &str = "script.rspy.exec_total_elapsed";
#[cfg(feature = "pyo3_backend")]
pub static METRIC_PYO3_EXEC_ELAPSED: &str = "script.pyo3.exec_elapsed";
#[cfg(feature = "pyo3_backend")]
pub static METRIC_PYO3_INIT_ELAPSED: &str = "script.pyo3.init_elapsed";
#[cfg(feature = "pyo3_backend")]
pub static METRIC_PYO3_EXEC_TOTAL_ELAPSED: &str = "script.pyo3.exec_total_elapsed";

View File

@@ -131,7 +131,7 @@ fn dataframe(py: Python) -> PyResult<PyDataFrame> {
let globals = get_globals(py)?;
let df = globals
.get_item("__dataframe__")
.ok_or(PyKeyError::new_err("No __dataframe__ variable is found"))?
.ok_or_else(|| PyKeyError::new_err("No __dataframe__ variable is found"))?
.extract::<PyDataFrame>()?;
Ok(df)
}
@@ -142,7 +142,7 @@ pub(crate) fn query_engine(py: Python) -> PyResult<PyQueryEngine> {
let globals = get_globals(py)?;
let query = globals
.get_item("__query__")
.ok_or(PyKeyError::new_err("No __query__ variable is found"))?
.ok_or_else(|| PyKeyError::new_err("No __query__ variable is found"))?
.extract::<PyQueryEngine>()?;
Ok(query)
}
@@ -310,7 +310,7 @@ macro_rules! bind_aggr_expr {
Arc::new(expressions::Column::new(stringify!($EXPR), $idx)) as _,
)*
stringify!($AGGR_FUNC),
$ARG_TY.to_arrow_array().data_type().to_owned()),
$ARG_TY.arrow_data_type().to_owned()),
&[$($ARG.to_arrow_array()),*]
)
}
@@ -326,7 +326,7 @@ fn approx_distinct(py: Python<'_>, v0: &PyVector) -> PyResult<PyObject> {
expressions::ApproxDistinct::new(
Arc::new(expressions::Column::new("expr0", 0)) as _,
"ApproxDistinct",
v0.to_arrow_array().data_type().to_owned(),
v0.arrow_data_type().to_owned(),
),
&[v0.to_arrow_array()],
);
@@ -349,7 +349,7 @@ fn approx_percentile_cont(py: Python<'_>, values: &PyVector, percent: f64) -> Py
Arc::new(percent) as _,
],
"ApproxPercentileCont",
(values.to_arrow_array().data_type()).to_owned(),
values.arrow_data_type().to_owned(),
)
.map_err(|e| PyValueError::new_err(format!("{e:?}")))?,
&[values.to_arrow_array()],

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use common_recordbatch::RecordBatch;
use common_telemetry::timer;
use datatypes::vectors::{Helper, VectorRef};
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::types::{PyDict, PyList, PyModule, PyTuple};
@@ -24,6 +25,7 @@ use snafu::{ensure, Backtrace, GenerateImplicitData, ResultExt};
use crate::python::error::{self, NewRecordBatchSnafu, OtherSnafu, Result};
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::{check_args_anno_real_type, select_from_rb, Coprocessor, PyVector};
use crate::python::metric;
use crate::python::pyo3::dataframe_impl::PyDataFrame;
use crate::python::pyo3::utils::{init_cpython_interpreter, pyo3_obj_try_to_typed_val};
@@ -62,6 +64,7 @@ pub(crate) fn pyo3_exec_parsed(
rb: &Option<RecordBatch>,
params: &HashMap<String, String>,
) -> Result<RecordBatch> {
let _t = timer!(metric::METRIC_PYO3_EXEC_TOTAL_ELAPSED);
// i.e params or use `vector(..)` to construct a PyVector
let arg_names = &copr.deco_args.arg_names.clone().unwrap_or(vec![]);
let args: Vec<PyVector> = if let Some(rb) = rb {
@@ -74,6 +77,8 @@ pub(crate) fn pyo3_exec_parsed(
// Just in case cpython is not inited
init_cpython_interpreter().unwrap();
Python::with_gil(|py| -> Result<_> {
let _t = timer!(metric::METRIC_PYO3_EXEC_ELAPSED);
let mut cols = (|| -> PyResult<_> {
let dummy_decorator = "
# Postponed evaluation of annotations(PEP 563) so annotation can be set freely
@@ -135,7 +140,7 @@ coprocessor = copr
// could generate a call in python code and use Python::run to run it, just like in RustPython
// Expect either: a PyVector Or a List/Tuple of PyVector
py.run(&script, Some(globals), Some(locals))?;
let result = locals.get_item("_return_from_coprocessor").ok_or(PyValueError::new_err("Can't find return value of coprocessor function"))?;
let result = locals.get_item("_return_from_coprocessor").ok_or_else(|| PyValueError::new_err("Can't find return value of coprocessor function"))?;
let col_len = rb.as_ref().map(|rb| rb.num_rows()).unwrap_or(1);
py_any_to_vec(result, col_len)

View File

@@ -15,7 +15,7 @@
use std::sync::Mutex;
use arrow::pyarrow::PyArrowException;
use common_telemetry::info;
use common_telemetry::{info, timer};
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
@@ -29,6 +29,7 @@ use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyTuple};
use crate::python::ffi_types::utils::{collect_diff_types_string, new_item_field};
use crate::python::ffi_types::PyVector;
use crate::python::metric;
use crate::python::pyo3::builtins::greptime_builtins;
/// prevent race condition of init cpython
@@ -39,6 +40,7 @@ pub(crate) fn to_py_err(err: impl ToString) -> PyErr {
/// init cpython interpreter with `greptime` builtins, if already inited, do nothing
pub(crate) fn init_cpython_interpreter() -> PyResult<()> {
let _t = timer!(metric::METRIC_PYO3_INIT_ELAPSED);
let mut start = START_PYO3.lock().unwrap();
if !*start {
pyo3::append_to_inittab!(greptime_builtins);

View File

@@ -610,7 +610,7 @@ pub(crate) mod greptime_builtin {
ApproxDistinct,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -621,7 +621,7 @@ pub(crate) mod greptime_builtin {
Median,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -635,7 +635,7 @@ pub(crate) mod greptime_builtin {
ApproxMedian,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -656,7 +656,7 @@ pub(crate) mod greptime_builtin {
Arc::new(percent) as _,
],
"ApproxPercentileCont",
(values.to_arrow_array().data_type()).clone(),
values.arrow_data_type(),
)
.map_err(|err| from_df_err(err, vm))?,
&[values.to_arrow_array()],
@@ -671,7 +671,7 @@ pub(crate) mod greptime_builtin {
ArrayAgg,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -683,7 +683,7 @@ pub(crate) mod greptime_builtin {
Avg,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -698,7 +698,7 @@ pub(crate) mod greptime_builtin {
Correlation,
vm,
&[arg0.to_arrow_array(), arg1.to_arrow_array()],
arg0.to_arrow_array().data_type(),
arg0.arrow_data_type(),
expr0,
expr1
);
@@ -710,7 +710,7 @@ pub(crate) mod greptime_builtin {
Count,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -725,7 +725,7 @@ pub(crate) mod greptime_builtin {
Covariance,
vm,
&[arg0.to_arrow_array(), arg1.to_arrow_array()],
arg0.to_arrow_array().data_type(),
arg0.arrow_data_type(),
expr0,
expr1
);
@@ -741,7 +741,7 @@ pub(crate) mod greptime_builtin {
CovariancePop,
vm,
&[arg0.to_arrow_array(), arg1.to_arrow_array()],
arg0.to_arrow_array().data_type(),
arg0.arrow_data_type(),
expr0,
expr1
);
@@ -753,7 +753,7 @@ pub(crate) mod greptime_builtin {
Max,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -764,7 +764,7 @@ pub(crate) mod greptime_builtin {
Min,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -775,7 +775,7 @@ pub(crate) mod greptime_builtin {
Stddev,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -786,7 +786,7 @@ pub(crate) mod greptime_builtin {
StddevPop,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -797,7 +797,7 @@ pub(crate) mod greptime_builtin {
Sum,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -808,7 +808,7 @@ pub(crate) mod greptime_builtin {
Variance,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}
@@ -819,7 +819,7 @@ pub(crate) mod greptime_builtin {
VariancePop,
vm,
&[values.to_arrow_array()],
values.to_arrow_array().data_type(),
values.arrow_data_type(),
expr0
);
}

View File

@@ -239,7 +239,7 @@ impl PyValue {
let vec_f64 = vec_f64
.as_any()
.downcast_ref::<Float64Array>()
.ok_or(format!("Can't cast {vec_f64:#?} to Float64Array!"))?;
.ok_or_else(|| format!("Can't cast {vec_f64:#?} to Float64Array!"))?;
let ret = vec_f64.into_iter().collect::<Vec<_>>();
if ret.iter().all(|x| x.is_some()) {
Ok(Self::FloatVec(
@@ -255,14 +255,14 @@ impl PyValue {
let vec_i64 = vec_int
.as_any()
.downcast_ref::<Int64Array>()
.ok_or(format!("Can't cast {vec_int:#?} to Int64Array!"))?;
.ok_or_else(|| format!("Can't cast {vec_int:#?} to Int64Array!"))?;
let ret: Vec<i64> = vec_i64
.into_iter()
.enumerate()
.map(|(idx, v)| {
v.ok_or(format!(
"No null element expected, found one in {idx} position"
))
v.ok_or_else(|| {
format!("No null element expected, found one in {idx} position")
})
})
.collect::<Result<_, String>>()?;
Ok(Self::IntVec(ret))

View File

@@ -18,7 +18,7 @@ use std::result::Result as StdResult;
use std::sync::Arc;
use common_recordbatch::RecordBatch;
use common_telemetry::info;
use common_telemetry::{info, timer};
use datatypes::vectors::VectorRef;
use rustpython_vm::builtins::{PyBaseExceptionRef, PyDict, PyStr, PyTuple};
use rustpython_vm::class::PyClassImpl;
@@ -30,6 +30,7 @@ use snafu::{OptionExt, ResultExt};
use crate::python::error::{ensure, ret_other_error_with, NewRecordBatchSnafu, OtherSnafu, Result};
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::{check_args_anno_real_type, select_from_rb, Coprocessor, PyVector};
use crate::python::metric;
use crate::python::rspython::builtins::init_greptime_builtins;
use crate::python::rspython::dataframe_impl::data_frame::set_dataframe_in_scope;
use crate::python::rspython::dataframe_impl::init_data_frame;
@@ -43,6 +44,7 @@ pub(crate) fn rspy_exec_parsed(
rb: &Option<RecordBatch>,
params: &HashMap<String, String>,
) -> Result<RecordBatch> {
let _t = timer!(metric::METRIC_RSPY_EXEC_TOTAL_ELAPSED);
// 3. get args from `rb`, and cast them into PyVector
let args: Vec<PyVector> = if let Some(rb) = rb {
let arg_names = copr.deco_args.arg_names.clone().unwrap_or(vec![]);
@@ -99,6 +101,8 @@ pub(crate) fn exec_with_cached_vm(
vm: &Arc<Interpreter>,
) -> Result<RecordBatch> {
vm.enter(|vm| -> Result<RecordBatch> {
let _t = timer!(metric::METRIC_RSPY_EXEC_ELAPSED);
// set arguments with given name and values
let scope = vm.new_scope_with_builtins();
if let Some(rb) = rb {
@@ -184,6 +188,7 @@ fn try_into_columns(
/// init interpreter with type PyVector and Module: greptime
pub(crate) fn init_interpreter() -> Arc<Interpreter> {
let _t = timer!(metric::METRIC_RSPY_INIT_ELAPSED);
INTERPRETER.with(|i| {
i.borrow_mut()
.get_or_insert_with(|| {