mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
feat: lazy initialize vector builder on write (#3210)
* feat: lazy initialize vector builder on write Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * avoid using ConstantVector Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * simplify expression Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * Update src/metric-engine/src/engine/create.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
@@ -211,9 +211,16 @@ pub trait MutableVector: Send + Sync {
|
||||
});
|
||||
}
|
||||
|
||||
// Push null to this mutable vector.
|
||||
/// Push null to this mutable vector.
|
||||
fn push_null(&mut self);
|
||||
|
||||
/// Push nulls to this mutable vector.
|
||||
fn push_nulls(&mut self, num_nulls: usize) {
|
||||
for _ in 0..num_nulls {
|
||||
self.push_null();
|
||||
}
|
||||
}
|
||||
|
||||
/// Extend this mutable vector by slice of `vector`.
|
||||
///
|
||||
/// Returns error if data types mismatch.
|
||||
|
||||
@@ -28,7 +28,7 @@ use datafusion_expr::ColumnarValue;
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{ArrayRef, BooleanArray};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::data_type::{ConcreteDataType, DataType};
|
||||
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
|
||||
use datatypes::value::ValueRef;
|
||||
use datatypes::vectors::{
|
||||
@@ -53,7 +53,7 @@ use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
|
||||
/// Initial vector builder capacity.
|
||||
const INITIAL_BUILDER_CAPACITY: usize = 32;
|
||||
const INITIAL_BUILDER_CAPACITY: usize = 0;
|
||||
|
||||
/// Builder to build [TimeSeriesMemtable].
|
||||
#[derive(Debug, Default)]
|
||||
@@ -607,7 +607,8 @@ struct ValueBuilder {
|
||||
timestamp: Box<dyn MutableVector>,
|
||||
sequence: UInt64VectorBuilder,
|
||||
op_type: UInt8VectorBuilder,
|
||||
fields: Vec<Box<dyn MutableVector>>,
|
||||
fields: Vec<Option<Box<dyn MutableVector>>>,
|
||||
field_types: Vec<ConcreteDataType>,
|
||||
}
|
||||
|
||||
impl ValueBuilder {
|
||||
@@ -620,16 +621,18 @@ impl ValueBuilder {
|
||||
let sequence = UInt64VectorBuilder::with_capacity(capacity);
|
||||
let op_type = UInt8VectorBuilder::with_capacity(capacity);
|
||||
|
||||
let fields = region_metadata
|
||||
let field_types = region_metadata
|
||||
.field_columns()
|
||||
.map(|c| c.column_schema.data_type.create_mutable_vector(capacity))
|
||||
.collect();
|
||||
.map(|c| c.column_schema.data_type.clone())
|
||||
.collect::<Vec<_>>();
|
||||
let fields = (0..field_types.len()).map(|_| None).collect();
|
||||
|
||||
Self {
|
||||
timestamp,
|
||||
sequence,
|
||||
op_type,
|
||||
fields,
|
||||
field_types,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -640,8 +643,20 @@ impl ValueBuilder {
|
||||
self.timestamp.push_value_ref(ts);
|
||||
self.sequence.push_value_ref(ValueRef::UInt64(sequence));
|
||||
self.op_type.push_value_ref(ValueRef::UInt8(op_type));
|
||||
let num_rows = self.timestamp.len();
|
||||
for (idx, field_value) in fields.into_iter().enumerate() {
|
||||
self.fields[idx].push_value_ref(field_value);
|
||||
if !field_value.is_null() || self.fields[idx].is_some() {
|
||||
self.fields[idx]
|
||||
.get_or_insert_with(|| {
|
||||
// lazy initialize on first non-null value
|
||||
let mut mutable_vector =
|
||||
self.field_types[idx].create_mutable_vector(num_rows);
|
||||
// fill previous rows with nulls
|
||||
mutable_vector.push_nulls(num_rows - 1);
|
||||
mutable_vector
|
||||
})
|
||||
.push_value_ref(field_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -726,10 +741,20 @@ impl Values {
|
||||
|
||||
impl From<ValueBuilder> for Values {
|
||||
fn from(mut value: ValueBuilder) -> Self {
|
||||
let num_rows = value.len();
|
||||
let fields = value
|
||||
.fields
|
||||
.iter_mut()
|
||||
.map(|v| v.to_vector())
|
||||
.enumerate()
|
||||
.map(|(i, v)| {
|
||||
if let Some(v) = v {
|
||||
v.to_vector()
|
||||
} else {
|
||||
let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
|
||||
single_null.push_nulls(num_rows);
|
||||
single_null.to_vector()
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let sequence = Arc::new(value.sequence.finish());
|
||||
let op_type = Arc::new(value.op_type.finish());
|
||||
@@ -863,6 +888,41 @@ mod tests {
|
||||
assert_eq!(1, series.frozen.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_series_with_nulls() {
|
||||
let region_metadata = schema_for_test();
|
||||
let mut series = Series::new(®ion_metadata);
|
||||
// col1: NULL 1 2 3
|
||||
// col2: NULL NULL 10.2 NULL
|
||||
series.push(
|
||||
ts_value_ref(1),
|
||||
0,
|
||||
OpType::Put,
|
||||
vec![ValueRef::Null, ValueRef::Null],
|
||||
);
|
||||
series.push(
|
||||
ts_value_ref(1),
|
||||
0,
|
||||
OpType::Put,
|
||||
vec![ValueRef::Int64(1), ValueRef::Null],
|
||||
);
|
||||
series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
|
||||
series.push(
|
||||
ts_value_ref(1),
|
||||
3,
|
||||
OpType::Put,
|
||||
vec![ValueRef::Int64(2), ValueRef::Null],
|
||||
);
|
||||
assert_eq!(4, series.active.timestamp.len());
|
||||
assert_eq!(0, series.frozen.len());
|
||||
|
||||
let values = series.compact(®ion_metadata).unwrap();
|
||||
assert_eq!(values.fields[0].null_count(), 1);
|
||||
assert_eq!(values.fields[1].null_count(), 3);
|
||||
assert_eq!(0, series.active.timestamp.len());
|
||||
assert_eq!(1, series.frozen.len());
|
||||
}
|
||||
|
||||
fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
|
||||
let ts_len = batch.timestamps().len();
|
||||
assert_eq!(batch.sequences().len(), ts_len);
|
||||
|
||||
Reference in New Issue
Block a user