Files
greptimedb/src/query/tests/mean_test.rs
evenyag 7f8195861e feat: Adds push_value_ref and extend_slice_of to MutableVector (#215)
* feat: Impl cmp_element() for Vector

* chore: Add doc comments to MutableVector

* feat: Add create_mutable() to DataType

Add `create_mutable()` to create a MutableVector for each DataType.
Implement ListVectorBuilder and NullVectorBuilder for ListType and
NullType.

* feat: Add ValueRef

ValueRef is a reference to value, could be used to avoid some allocation
when getting data from Vector. To support ValueRef, also implement a
ListValueRef for ListValue, but comparision of ListValueRef still
requires some allocation, due to the complexity of ListValue and
ListVector.

Impl some From trait for ValueRef

* feat: Implement get_ref for Vector

* feat: Remove cmp_element from Vector

`cmp_element` could be replaced by `get_ref` and then compare

* feat: Implement push/extend for PrimitiveVectorBuilder

Implement push_value_ref() and extend_slice_of() for
PrimitiveVectorBuilder.

Also refactor the DataTypeBuilder trait for
primitive types to PrimitiveElement trait, adds necessary cast helper
methods to it.
- Cast a reference to Vector to reference arrow's primitive array
- Cast a ValueRef to primitive type
- Also make PrimitiveElement super trait of Primitive

* feat: Implement push/extend for all vector builders

Implement push_value_ref() and extend_slice_of() for remaining vector
builders. Add some helpful cast method to ValueRef and a method to
cast Value to ValueRef.

Change the behavior of PrimitiveElement::cast_xxx to panic when unable
to cast, since push_value_ref() and extend_slice_of() always panic
when given invalid input data type.

* feat: MutableVector returns error if data type unmatch

* test: Add tests for ValueRef

* feat: Add tests for Vector::get_ref

* feat: NullVector returns error if data type unmatch

* test: Add tests for vector builders

* fix: Fix compile error in python coprocessor

* refactor: Add lifetime param to IntoValueRef

The Primitive trait just use the `IntoValueRef<'static>` bound. Also
rename create_mutable to create_mutable_vector.

* chore: Address CR comments

* feat: Customize PartialOrd/Ord for Value/ValueRef

Panics if values/refs have different data type

* style: Fix clippy

* refactor: Use macro to generate body of ValueRef::as_xxx
2022-09-06 13:44:48 +08:00

88 lines
2.8 KiB
Rust

use std::sync::Arc;
mod function;
use common_recordbatch::error::Result as RecordResult;
use common_recordbatch::{util, RecordBatch};
use datafusion::field_util::FieldExt;
use datafusion::field_util::SchemaExt;
use datatypes::for_all_primitive_types;
use datatypes::prelude::*;
use datatypes::types::PrimitiveElement;
use datatypes::value::OrderedFloat;
use format_num::NumberFormat;
use function::{create_query_engine, get_numbers_from_table};
use num_traits::AsPrimitive;
use query::error::Result;
use query::query_engine::Output;
use query::QueryEngine;
#[tokio::test]
async fn test_mean_aggregator() -> Result<()> {
common_telemetry::init_default_ut_logging();
let engine = create_query_engine();
macro_rules! test_mean {
([], $( { $T:ty } ),*) => {
$(
let column_name = format!("{}_number", std::any::type_name::<$T>());
test_mean_success::<$T>(&column_name, "numbers", engine.clone()).await?;
)*
}
}
for_all_primitive_types! { test_mean }
Ok(())
}
async fn test_mean_success<T>(
column_name: &str,
table_name: &str,
engine: Arc<dyn QueryEngine>,
) -> Result<()>
where
T: PrimitiveElement + AsPrimitive<f64>,
for<'a> T: Scalar<RefType<'a> = T>,
{
let result = execute_mean(column_name, table_name, engine.clone())
.await
.unwrap();
assert_eq!(1, result.len());
assert_eq!(result[0].df_recordbatch.num_columns(), 1);
assert_eq!(1, result[0].schema.arrow_schema().fields().len());
assert_eq!("mean", result[0].schema.arrow_schema().field(0).name());
let columns = result[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(columns[0].len(), 1);
let v = VectorHelper::try_into_vector(&columns[0]).unwrap();
assert_eq!(1, v.len());
let value = v.get(0);
let numbers = get_numbers_from_table::<T>(column_name, table_name, engine.clone()).await;
let expected_value = numbers.iter().map(|&n| n.as_()).collect::<Vec<f64>>();
let expected_value = inc_stats::mean(expected_value.iter().cloned()).unwrap();
if let Value::Float64(OrderedFloat(value)) = value {
let num = NumberFormat::new();
let value = num.format(".6e", value);
let expected_value = num.format(".6e", expected_value);
assert_eq!(value, expected_value);
}
Ok(())
}
async fn execute_mean<'a>(
column_name: &'a str,
table_name: &'a str,
engine: Arc<dyn QueryEngine>,
) -> RecordResult<Vec<RecordBatch>> {
let sql = format!("select MEAN({}) as mean from {}", column_name, table_name);
let plan = engine.sql_to_plan(&sql).unwrap();
let output = engine.execute(&plan).await.unwrap();
let recordbatch_stream = match output {
Output::RecordBatch(batch) => batch,
_ => unreachable!(),
};
util::collect(recordbatch_stream).await
}