mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-24 07:00:00 +00:00
Compare commits
1 Commits
feature/df
...
refactor/m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34da9ad838 |
@@ -183,7 +183,7 @@ mod tests {
|
||||
let expected = regions
|
||||
.into_iter()
|
||||
.zip(vec![encoded_wal_options; num_regions as usize])
|
||||
.collect();
|
||||
.collect::<HashMap<_, _>>();
|
||||
assert_eq!(got, expected);
|
||||
}
|
||||
|
||||
|
||||
@@ -18,11 +18,15 @@ use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::ValueRef;
|
||||
use memcomparable::Deserializer;
|
||||
use snafu::ensure;
|
||||
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
|
||||
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result};
|
||||
use crate::row_converter::{
|
||||
DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField, COLUMN_ID_ENCODE_SIZE,
|
||||
};
|
||||
|
||||
/// Key value view of a mutation.
|
||||
#[derive(Debug)]
|
||||
@@ -72,6 +76,41 @@ impl KeyValues {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn chunk_by_primary_key(
|
||||
&self,
|
||||
row_codec: &DensePrimaryKeyCodec,
|
||||
) -> Result<HashMap<Vec<u8>, Vec<KeyValue>>> {
|
||||
let Some(rows) = &self.mutation.rows else {
|
||||
return Ok(HashMap::new());
|
||||
};
|
||||
let schema = &rows.schema;
|
||||
let rows = &rows.rows;
|
||||
|
||||
let mut chunks: HashMap<Vec<u8>, Vec<KeyValue>> = HashMap::new();
|
||||
for (row, sequence) in rows.iter().zip((self.mutation.sequence..).take(rows.len())) {
|
||||
let kv = KeyValue {
|
||||
row,
|
||||
schema,
|
||||
helper: &self.helper,
|
||||
sequence,
|
||||
op_type: OpType::try_from(self.mutation.op_type).expect("illegal op_type"),
|
||||
primary_key_encoding: self.primary_key_encoding,
|
||||
};
|
||||
|
||||
ensure!(
|
||||
row_codec.num_fields() == kv.num_primary_keys(),
|
||||
PrimaryKeyLengthMismatchSnafu {
|
||||
expect: row_codec.num_fields(),
|
||||
actual: kv.num_primary_keys(),
|
||||
}
|
||||
);
|
||||
|
||||
let primary_key = row_codec.encode(kv.primary_keys())?;
|
||||
chunks.entry(primary_key).or_default().push(kv);
|
||||
}
|
||||
Ok(chunks)
|
||||
}
|
||||
|
||||
/// Returns number of rows.
|
||||
pub fn num_rows(&self) -> usize {
|
||||
// Safety: rows is not None.
|
||||
|
||||
@@ -71,6 +71,13 @@ impl WriteMetrics {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn merge(&mut self, other: Self) {
|
||||
self.key_bytes += other.key_bytes;
|
||||
self.value_bytes += other.value_bytes;
|
||||
self.min_ts = self.min_ts.min(other.min_ts);
|
||||
self.max_ts = self.max_ts.max(other.max_ts);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WriteMetrics {
|
||||
|
||||
@@ -39,8 +39,8 @@ use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{
|
||||
ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result,
|
||||
UnsupportedOperationSnafu,
|
||||
ComputeArrowSnafu, ConvertVectorSnafu, FieldTypeMismatchSnafu, PrimaryKeyLengthMismatchSnafu,
|
||||
Result, UnsupportedOperationSnafu,
|
||||
};
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
@@ -158,7 +158,11 @@ impl TimeSeriesMemtable {
|
||||
}
|
||||
);
|
||||
|
||||
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
|
||||
let primary_key_encoded = if kv.num_primary_keys() == 0 {
|
||||
vec![]
|
||||
} else {
|
||||
self.row_codec.encode(kv.primary_keys())?
|
||||
};
|
||||
|
||||
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
|
||||
stats.key_bytes += series_allocated;
|
||||
@@ -194,8 +198,14 @@ impl Memtable for TimeSeriesMemtable {
|
||||
|
||||
let mut local_stats = WriteMetrics::default();
|
||||
|
||||
for kv in kvs.iter() {
|
||||
self.write_key_value(kv, &mut local_stats)?;
|
||||
let chunks = kvs.chunk_by_primary_key(&self.row_codec)?;
|
||||
for (primary_key, kvs) in chunks {
|
||||
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key);
|
||||
local_stats.key_bytes += series_allocated;
|
||||
|
||||
let mut guard = series.write().unwrap();
|
||||
let stats = guard.extend(kvs)?;
|
||||
local_stats.merge(stats);
|
||||
}
|
||||
local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
|
||||
local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
|
||||
@@ -650,6 +660,15 @@ impl Series {
|
||||
self.active.push(ts, sequence, op_type as u8, values)
|
||||
}
|
||||
|
||||
fn extend(&mut self, kvs: Vec<KeyValue>) -> Result<WriteMetrics> {
|
||||
let metrics = self.active.extend(kvs)?;
|
||||
if self.active.len() >= BUILDER_CAPACITY {
|
||||
let region_metadata = self.region_metadata.clone();
|
||||
self.freeze(®ion_metadata);
|
||||
}
|
||||
Ok(metrics)
|
||||
}
|
||||
|
||||
fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
|
||||
self.pk_cache = Some(pk_values);
|
||||
}
|
||||
@@ -665,17 +684,15 @@ impl Series {
|
||||
|
||||
/// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
|
||||
/// Returns the frozen and compacted values.
|
||||
fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<Values> {
|
||||
fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
|
||||
self.freeze(region_metadata);
|
||||
|
||||
let mut frozen = self.frozen.clone();
|
||||
let frozen = &self.frozen;
|
||||
|
||||
// Each series must contain at least one row
|
||||
debug_assert!(!frozen.is_empty());
|
||||
|
||||
let values = if frozen.len() == 1 {
|
||||
frozen.pop().unwrap()
|
||||
} else {
|
||||
if frozen.len() > 1 {
|
||||
// TODO(hl): We should keep track of min/max timestamps for each values and avoid
|
||||
// cloning and sorting when values do not overlap with each other.
|
||||
|
||||
@@ -699,10 +716,9 @@ impl Series {
|
||||
|
||||
debug_assert_eq!(concatenated.len(), column_size);
|
||||
let values = Values::from_columns(&concatenated)?;
|
||||
self.frozen = vec![values.clone()];
|
||||
values
|
||||
self.frozen = vec![values];
|
||||
};
|
||||
Ok(values)
|
||||
Ok(&self.frozen[0])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -785,6 +801,44 @@ impl ValueBuilder {
|
||||
size
|
||||
}
|
||||
|
||||
fn extend(&mut self, kvs: Vec<KeyValue>) -> Result<WriteMetrics> {
|
||||
self.timestamp.reserve(kvs.len());
|
||||
self.sequence.reserve(kvs.len());
|
||||
self.op_type.reserve(kvs.len());
|
||||
|
||||
let mut metrics = WriteMetrics::default();
|
||||
for kv in kvs {
|
||||
// safety: timestamp of kv must be both present and valid until here
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
|
||||
metrics.min_ts = metrics.min_ts.min(ts);
|
||||
metrics.max_ts = metrics.max_ts.max(ts);
|
||||
|
||||
self.timestamp.push(ts);
|
||||
self.sequence.push(kv.sequence());
|
||||
self.op_type.push(kv.op_type() as u8);
|
||||
|
||||
for (idx, field) in kv.fields().enumerate() {
|
||||
metrics.value_bytes += field.data_size();
|
||||
|
||||
if field.is_null() && self.fields[idx].is_none() {
|
||||
// do nothing, postpone nulls filling to the next non-null value insertion
|
||||
continue;
|
||||
}
|
||||
|
||||
let fields = self.fields[idx].get_or_insert_with(|| {
|
||||
let mut fields = self.field_types[idx]
|
||||
.create_mutable_vector(self.timestamp.len().max(INITIAL_BUILDER_CAPACITY));
|
||||
fields.push_nulls(self.timestamp.len() - 1);
|
||||
fields
|
||||
});
|
||||
fields
|
||||
.try_push_value_ref(field)
|
||||
.context(FieldTypeMismatchSnafu)?;
|
||||
}
|
||||
}
|
||||
Ok(metrics)
|
||||
}
|
||||
|
||||
/// Returns the length of [ValueBuilder]
|
||||
fn len(&self) -> usize {
|
||||
let sequence_len = self.sequence.len();
|
||||
@@ -1007,7 +1061,7 @@ mod tests {
|
||||
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
|
||||
}
|
||||
|
||||
fn check_values(values: Values, expect: &[(i64, u64, u8, i64, f64)]) {
|
||||
fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
|
||||
let ts = values
|
||||
.timestamp
|
||||
.as_any()
|
||||
|
||||
Reference in New Issue
Block a user