Compare commits

...

1 Commits

Author SHA1 Message Date
luofucong
34da9ad838 refactor: memtable extend kvs 2025-03-24 21:03:37 +08:00
4 changed files with 116 additions and 16 deletions

View File

@@ -183,7 +183,7 @@ mod tests {
let expected = regions
.into_iter()
.zip(vec![encoded_wal_options; num_regions as usize])
.collect();
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
}

View File

@@ -18,11 +18,15 @@ use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use memcomparable::Deserializer;
use snafu::ensure;
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result};
use crate::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField, COLUMN_ID_ENCODE_SIZE,
};
/// Key value view of a mutation.
#[derive(Debug)]
@@ -72,6 +76,41 @@ impl KeyValues {
})
}
pub fn chunk_by_primary_key(
&self,
row_codec: &DensePrimaryKeyCodec,
) -> Result<HashMap<Vec<u8>, Vec<KeyValue>>> {
let Some(rows) = &self.mutation.rows else {
return Ok(HashMap::new());
};
let schema = &rows.schema;
let rows = &rows.rows;
let mut chunks: HashMap<Vec<u8>, Vec<KeyValue>> = HashMap::new();
for (row, sequence) in rows.iter().zip((self.mutation.sequence..).take(rows.len())) {
let kv = KeyValue {
row,
schema,
helper: &self.helper,
sequence,
op_type: OpType::try_from(self.mutation.op_type).expect("illegal op_type"),
primary_key_encoding: self.primary_key_encoding,
};
ensure!(
row_codec.num_fields() == kv.num_primary_keys(),
PrimaryKeyLengthMismatchSnafu {
expect: row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
let primary_key = row_codec.encode(kv.primary_keys())?;
chunks.entry(primary_key).or_default().push(kv);
}
Ok(chunks)
}
/// Returns number of rows.
pub fn num_rows(&self) -> usize {
// Safety: rows is not None.

View File

@@ -71,6 +71,13 @@ impl WriteMetrics {
}
}
}
pub(crate) fn merge(&mut self, other: Self) {
self.key_bytes += other.key_bytes;
self.value_bytes += other.value_bytes;
self.min_ts = self.min_ts.min(other.min_ts);
self.max_ts = self.max_ts.max(other.max_ts);
}
}
impl Default for WriteMetrics {

View File

@@ -39,8 +39,8 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result,
UnsupportedOperationSnafu,
ComputeArrowSnafu, ConvertVectorSnafu, FieldTypeMismatchSnafu, PrimaryKeyLengthMismatchSnafu,
Result, UnsupportedOperationSnafu,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
@@ -158,7 +158,11 @@ impl TimeSeriesMemtable {
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let primary_key_encoded = if kv.num_primary_keys() == 0 {
vec![]
} else {
self.row_codec.encode(kv.primary_keys())?
};
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
stats.key_bytes += series_allocated;
@@ -194,8 +198,14 @@ impl Memtable for TimeSeriesMemtable {
let mut local_stats = WriteMetrics::default();
for kv in kvs.iter() {
self.write_key_value(kv, &mut local_stats)?;
let chunks = kvs.chunk_by_primary_key(&self.row_codec)?;
for (primary_key, kvs) in chunks {
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key);
local_stats.key_bytes += series_allocated;
let mut guard = series.write().unwrap();
let stats = guard.extend(kvs)?;
local_stats.merge(stats);
}
local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
@@ -650,6 +660,15 @@ impl Series {
self.active.push(ts, sequence, op_type as u8, values)
}
fn extend(&mut self, kvs: Vec<KeyValue>) -> Result<WriteMetrics> {
let metrics = self.active.extend(kvs)?;
if self.active.len() >= BUILDER_CAPACITY {
let region_metadata = self.region_metadata.clone();
self.freeze(&region_metadata);
}
Ok(metrics)
}
fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
self.pk_cache = Some(pk_values);
}
@@ -665,17 +684,15 @@ impl Series {
/// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
/// Returns the frozen and compacted values.
fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<Values> {
fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
self.freeze(region_metadata);
let mut frozen = self.frozen.clone();
let frozen = &self.frozen;
// Each series must contain at least one row
debug_assert!(!frozen.is_empty());
let values = if frozen.len() == 1 {
frozen.pop().unwrap()
} else {
if frozen.len() > 1 {
// TODO(hl): We should keep track of min/max timestamps for each values and avoid
// cloning and sorting when values do not overlap with each other.
@@ -699,10 +716,9 @@ impl Series {
debug_assert_eq!(concatenated.len(), column_size);
let values = Values::from_columns(&concatenated)?;
self.frozen = vec![values.clone()];
values
self.frozen = vec![values];
};
Ok(values)
Ok(&self.frozen[0])
}
}
@@ -785,6 +801,44 @@ impl ValueBuilder {
size
}
fn extend(&mut self, kvs: Vec<KeyValue>) -> Result<WriteMetrics> {
self.timestamp.reserve(kvs.len());
self.sequence.reserve(kvs.len());
self.op_type.reserve(kvs.len());
let mut metrics = WriteMetrics::default();
for kv in kvs {
// safety: timestamp of kv must be both present and valid until here
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
metrics.max_ts = metrics.max_ts.max(ts);
self.timestamp.push(ts);
self.sequence.push(kv.sequence());
self.op_type.push(kv.op_type() as u8);
for (idx, field) in kv.fields().enumerate() {
metrics.value_bytes += field.data_size();
if field.is_null() && self.fields[idx].is_none() {
// do nothing, postpone nulls filling to the next non-null value insertion
continue;
}
let fields = self.fields[idx].get_or_insert_with(|| {
let mut fields = self.field_types[idx]
.create_mutable_vector(self.timestamp.len().max(INITIAL_BUILDER_CAPACITY));
fields.push_nulls(self.timestamp.len() - 1);
fields
});
fields
.try_push_value_ref(field)
.context(FieldTypeMismatchSnafu)?;
}
}
Ok(metrics)
}
/// Returns the length of [ValueBuilder]
fn len(&self) -> usize {
let sequence_len = self.sequence.len();
@@ -1007,7 +1061,7 @@ mod tests {
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
}
fn check_values(values: Values, expect: &[(i64, u64, u8, i64, f64)]) {
fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
let ts = values
.timestamp
.as_any()