perf: optimize time series memtable ingestion (#5451)

* initialize with capacity

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* avoid collect

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* optimize zip

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename variable

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ignore type checking in the upper level

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change to two-step capacity

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-06 17:12:29 +08:00
committed by GitHub
parent ab4663ec2b
commit fa09e181be
2 changed files with 60 additions and 27 deletions

View File

@@ -56,7 +56,10 @@ use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 0;
const INITIAL_BUILDER_CAPACITY: usize = 16;
/// Vector builder capacity.
const BUILDER_CAPACITY: usize = 512;
/// Builder to build [TimeSeriesMemtable].
#[derive(Debug, Default)]
@@ -156,9 +159,7 @@ impl TimeSeriesMemtable {
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();
stats.value_bytes += fields.iter().map(|v| v.data_size()).sum::<usize>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
stats.key_bytes += series_allocated;
@@ -168,7 +169,8 @@ impl TimeSeriesMemtable {
stats.max_ts = stats.max_ts.max(ts);
let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
let size = guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
stats.value_bytes += size;
Ok(())
}
@@ -619,6 +621,7 @@ struct Series {
pk_cache: Option<Vec<Value>>,
active: ValueBuilder,
frozen: Vec<Values>,
region_metadata: RegionMetadataRef,
}
impl Series {
@@ -627,12 +630,24 @@ impl Series {
pk_cache: None,
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
frozen: vec![],
region_metadata: region_metadata.clone(),
}
}
/// Pushes a row of values into Series.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: OpType, values: Vec<ValueRef>) {
self.active.push(ts, sequence, op_type as u8, values);
/// Pushes a row of values into Series. Return the size of values.
fn push<'a>(
&mut self,
ts: ValueRef<'a>,
sequence: u64,
op_type: OpType,
values: impl Iterator<Item = ValueRef<'a>>,
) -> usize {
// + 10 to avoid potential reallocation.
if self.active.len() + 10 > BUILDER_CAPACITY {
let region_metadata = self.region_metadata.clone();
self.freeze(&region_metadata);
}
self.active.push(ts, sequence, op_type as u8, values)
}
fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
@@ -729,27 +744,45 @@ impl ValueBuilder {
/// Pushes a new row to `ValueBuilder`.
/// We don't need primary keys since they've already be encoded.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec<ValueRef>) {
debug_assert_eq!(fields.len(), self.fields.len());
/// Returns the size of field values.
///
/// In this method, we don't check the data type of the value, because it is already checked in the caller.
fn push<'a>(
&mut self,
ts: ValueRef,
sequence: u64,
op_type: u8,
fields: impl Iterator<Item = ValueRef<'a>>,
) -> usize {
#[cfg(debug_assertions)]
let fields = {
let field_vec = fields.collect::<Vec<_>>();
debug_assert_eq!(field_vec.len(), self.fields.len());
field_vec.into_iter()
};
self.timestamp
.push(ts.as_timestamp().unwrap().unwrap().value());
self.sequence.push(sequence);
self.op_type.push(op_type);
let num_rows = self.timestamp.len();
for (idx, field_value) in fields.into_iter().enumerate() {
let mut size = 0;
for (idx, field_value) in fields.enumerate() {
size += field_value.data_size();
if !field_value.is_null() || self.fields[idx].is_some() {
self.fields[idx]
.get_or_insert_with(|| {
// lazy initialize on first non-null value
let mut mutable_vector =
self.field_types[idx].create_mutable_vector(num_rows);
// fill previous rows with nulls
mutable_vector.push_nulls(num_rows - 1);
mutable_vector
})
.push_value_ref(field_value);
if let Some(field) = self.fields[idx].as_mut() {
let _ = field.try_push_value_ref(field_value);
} else {
let mut mutable_vector = self.field_types[idx]
.create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY));
mutable_vector.push_nulls(num_rows - 1);
let _ = mutable_vector.try_push_value_ref(field_value);
self.fields[idx] = Some(mutable_vector);
}
}
}
size
}
/// Returns the length of [ValueBuilder]
@@ -970,8 +1003,8 @@ mod tests {
ValueRef::Timestamp(Timestamp::new_millisecond(val))
}
fn field_value_ref(v0: i64, v1: f64) -> Vec<ValueRef<'static>> {
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))]
fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
}
fn check_values(values: Values, expect: &[(i64, u64, u8, i64, f64)]) {
@@ -1033,20 +1066,20 @@ mod tests {
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Null, ValueRef::Null],
vec![ValueRef::Null, ValueRef::Null].into_iter(),
);
series.push(
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Int64(1), ValueRef::Null],
vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
);
series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
series.push(
ts_value_ref(1),
3,
OpType::Put,
vec![ValueRef::Int64(2), ValueRef::Null],
vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
);
assert_eq!(4, series.active.timestamp.len());
assert_eq!(0, series.frozen.len());

View File

@@ -348,8 +348,8 @@ impl DensePrimaryKeyCodec {
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, (_, field)) in row.zip(self.ordered_primary_key_columns.iter()) {
field.serialize(&mut serializer, &value)?;
for (idx, value) in row.enumerate() {
self.field_at(idx).serialize(&mut serializer, &value)?;
}
Ok(())
}