mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-15 09:42:58 +00:00
fix(metric engine): label mismatch in metric engine (#3927)
* fix: label mismatch * test: add unit test * chore: avoid updating full primary keys * fix: style * chore: add some doc for PkIndexMap * chore: update some doc
This commit is contained in:
@@ -311,14 +311,24 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{Row, Rows, SemanticType};
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::Int64Vector;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
use crate::test_util::memtable_util::{
|
||||
self, collect_iter_timestamps, region_metadata_to_row_schema,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_memtable_sorted_input() {
|
||||
@@ -562,4 +572,166 @@ mod tests {
|
||||
assert!(config.dedup);
|
||||
assert_eq!(PartitionTreeConfig::default(), config);
|
||||
}
|
||||
|
||||
fn metadata_for_metric_engine() -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"__table_id",
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 2147483652,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"__tsid",
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 2147483651,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"test_label",
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"greptime_timestamp",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 0,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"greptime_value",
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 1,
|
||||
})
|
||||
.primary_key(vec![2147483652, 2147483651, 2]);
|
||||
let region_metadata = builder.build().unwrap();
|
||||
Arc::new(region_metadata)
|
||||
}
|
||||
|
||||
fn build_key_values(
|
||||
metadata: RegionMetadataRef,
|
||||
labels: &[&str],
|
||||
table_id: &[u32],
|
||||
ts_id: &[u64],
|
||||
ts: &[i64],
|
||||
values: &[f64],
|
||||
sequence: u64,
|
||||
) -> KeyValues {
|
||||
let column_schema = region_metadata_to_row_schema(&metadata);
|
||||
|
||||
let rows = ts
|
||||
.iter()
|
||||
.zip(table_id.iter())
|
||||
.zip(ts_id.iter())
|
||||
.zip(labels.iter())
|
||||
.zip(values.iter())
|
||||
.map(|((((ts, table_id), ts_id), label), val)| Row {
|
||||
values: vec![
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::U32Value(*table_id)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::U64Value(*ts_id)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::StringValue(label.to_string())),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::F64Value(*val)),
|
||||
},
|
||||
],
|
||||
})
|
||||
.collect();
|
||||
let mutation = api::v1::Mutation {
|
||||
op_type: 1,
|
||||
sequence,
|
||||
rows: Some(Rows {
|
||||
schema: column_schema,
|
||||
rows,
|
||||
}),
|
||||
};
|
||||
KeyValues::new(metadata.as_ref(), mutation).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_freeze() {
|
||||
let metadata = metadata_for_metric_engine();
|
||||
let memtable = PartitionTreeMemtableBuilder::new(
|
||||
PartitionTreeConfig {
|
||||
index_max_keys_per_shard: 40,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
)
|
||||
.build(1, &metadata);
|
||||
|
||||
let codec = McmpRowCodec::new(
|
||||
metadata
|
||||
.primary_key_columns()
|
||||
.map(|c| SortField::new(c.column_schema.data_type.clone()))
|
||||
.collect(),
|
||||
);
|
||||
|
||||
memtable
|
||||
.write(&build_key_values(
|
||||
metadata.clone(),
|
||||
&["daily", "10min", "daily", "10min"],
|
||||
&[1025, 1025, 1025, 1025],
|
||||
&[
|
||||
16442255374049317291,
|
||||
5686004715529701024,
|
||||
16442255374049317291,
|
||||
5686004715529701024,
|
||||
],
|
||||
&[1712070000000, 1712717731000, 1712761200000, 1712761200000],
|
||||
&[0.0, 0.0, 0.0, 0.0],
|
||||
1,
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
memtable.freeze().unwrap();
|
||||
let new_memtable = memtable.fork(2, &metadata);
|
||||
|
||||
new_memtable
|
||||
.write(&build_key_values(
|
||||
metadata.clone(),
|
||||
&["10min"],
|
||||
&[1025],
|
||||
&[5686004715529701024],
|
||||
&[1714643131000],
|
||||
&[0.1],
|
||||
2,
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut reader = new_memtable.iter(None, None).unwrap();
|
||||
let batch = reader.next().unwrap().unwrap();
|
||||
let pk = codec.decode(batch.primary_key()).unwrap();
|
||||
if let Value::String(s) = &pk[2] {
|
||||
assert_eq!("10min", s.as_utf8());
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,9 @@ use crate::metrics::MEMTABLE_DICT_BYTES;
|
||||
/// Maximum keys in a [DictBlock].
|
||||
const MAX_KEYS_PER_BLOCK: u16 = 256;
|
||||
|
||||
type PkIndexMap = BTreeMap<Vec<u8>, PkIndex>;
|
||||
/// The key is mcmp-encoded primary keys, while the values are the pk index and
|
||||
/// optionally sparsely encoded primary keys.
|
||||
type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;
|
||||
|
||||
/// Builder to build a key dictionary.
|
||||
pub struct KeyDictBuilder {
|
||||
@@ -66,10 +68,15 @@ impl KeyDictBuilder {
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the builder is full.
|
||||
pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex {
|
||||
pub fn insert_key(
|
||||
&mut self,
|
||||
full_primary_key: &[u8],
|
||||
sparse_key: Option<&[u8]>,
|
||||
metrics: &mut WriteMetrics,
|
||||
) -> PkIndex {
|
||||
assert!(!self.is_full());
|
||||
|
||||
if let Some(pk_index) = self.pk_to_index.get(key).copied() {
|
||||
if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
|
||||
// Already in the builder.
|
||||
return pk_index;
|
||||
}
|
||||
@@ -81,16 +88,22 @@ impl KeyDictBuilder {
|
||||
}
|
||||
|
||||
// Safety: we have checked the buffer length.
|
||||
let pk_index = self.key_buffer.push_key(key);
|
||||
self.pk_to_index.insert(key.to_vec(), pk_index);
|
||||
let pk_index = self.key_buffer.push_key(full_primary_key);
|
||||
let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
|
||||
(Some(sparse_key.to_vec()), sparse_key.len())
|
||||
} else {
|
||||
(None, 0)
|
||||
};
|
||||
self.pk_to_index
|
||||
.insert(full_primary_key.to_vec(), (pk_index, sparse_key));
|
||||
self.num_keys += 1;
|
||||
|
||||
// Since we store the key twice so the bytes usage doubled.
|
||||
metrics.key_bytes += key.len() * 2;
|
||||
self.key_bytes_in_index += key.len();
|
||||
metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
|
||||
self.key_bytes_in_index += full_primary_key.len();
|
||||
|
||||
// Adds key size of index to the metrics.
|
||||
MEMTABLE_DICT_BYTES.add(key.len() as i64);
|
||||
MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);
|
||||
|
||||
pk_index
|
||||
}
|
||||
@@ -107,38 +120,47 @@ impl KeyDictBuilder {
|
||||
.sum::<usize>()
|
||||
}
|
||||
|
||||
/// Finishes the builder.
|
||||
pub fn finish(&mut self, pk_to_index: &mut BTreeMap<Vec<u8>, PkIndex>) -> Option<KeyDict> {
|
||||
/// Finishes the builder. The key of the second BTreeMap is sparse-encoded bytes.
|
||||
pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
|
||||
if self.key_buffer.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let mut pk_to_index_map = BTreeMap::new();
|
||||
|
||||
// Finishes current dict block and resets the pk index.
|
||||
let dict_block = self.key_buffer.finish(true);
|
||||
self.dict_blocks.push(dict_block);
|
||||
// Computes key position and then alter pk index.
|
||||
let mut key_positions = vec![0; self.pk_to_index.len()];
|
||||
for (i, pk_index) in self.pk_to_index.values_mut().enumerate() {
|
||||
|
||||
for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
{
|
||||
// The position of the i-th key is the old pk index.
|
||||
key_positions[i] = *pk_index;
|
||||
// Overwrites the pk index.
|
||||
*pk_index = i as PkIndex;
|
||||
key_positions[i] = pk_index;
|
||||
if let Some(sparse_key) = sparse_key {
|
||||
pk_to_index_map.insert(sparse_key, i as PkIndex);
|
||||
}
|
||||
}
|
||||
|
||||
self.num_keys = 0;
|
||||
let key_bytes_in_index = self.key_bytes_in_index;
|
||||
self.key_bytes_in_index = 0;
|
||||
*pk_to_index = std::mem::take(&mut self.pk_to_index);
|
||||
|
||||
Some(KeyDict {
|
||||
dict_blocks: std::mem::take(&mut self.dict_blocks),
|
||||
key_positions,
|
||||
key_bytes_in_index,
|
||||
})
|
||||
Some((
|
||||
KeyDict {
|
||||
dict_blocks: std::mem::take(&mut self.dict_blocks),
|
||||
key_positions,
|
||||
key_bytes_in_index,
|
||||
},
|
||||
pk_to_index_map,
|
||||
))
|
||||
}
|
||||
|
||||
/// Reads the builder.
|
||||
pub fn read(&self) -> DictBuilderReader {
|
||||
let sorted_pk_indices = self.pk_to_index.values().copied().collect();
|
||||
let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
|
||||
let block = self.key_buffer.finish_cloned();
|
||||
let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
|
||||
blocks.extend_from_slice(&self.dict_blocks);
|
||||
@@ -394,7 +416,7 @@ mod tests {
|
||||
let mut metrics = WriteMetrics::default();
|
||||
for key in &keys {
|
||||
assert!(!builder.is_full());
|
||||
let pk_index = builder.insert_key(key, &mut metrics);
|
||||
let pk_index = builder.insert_key(key, None, &mut metrics);
|
||||
last_pk_index = Some(pk_index);
|
||||
}
|
||||
assert_eq!(num_keys - 1, last_pk_index.unwrap());
|
||||
@@ -426,14 +448,14 @@ mod tests {
|
||||
for i in 0..num_keys {
|
||||
// Each key is 5 bytes.
|
||||
let key = format!("{i:05}");
|
||||
builder.insert_key(key.as_bytes(), &mut metrics);
|
||||
builder.insert_key(key.as_bytes(), None, &mut metrics);
|
||||
}
|
||||
let key_bytes = num_keys as usize * 5;
|
||||
assert_eq!(key_bytes * 2, metrics.key_bytes);
|
||||
assert_eq!(key_bytes, builder.key_bytes_in_index);
|
||||
assert_eq!(8850, builder.memory_size());
|
||||
|
||||
let dict = builder.finish(&mut BTreeMap::new()).unwrap();
|
||||
let (dict, _) = builder.finish().unwrap();
|
||||
assert_eq!(0, builder.key_bytes_in_index);
|
||||
assert_eq!(key_bytes, dict.key_bytes_in_index);
|
||||
assert!(dict.shared_memory_size() > key_bytes);
|
||||
@@ -446,12 +468,12 @@ mod tests {
|
||||
for i in 0..MAX_KEYS_PER_BLOCK * 2 {
|
||||
let key = format!("{i:010}");
|
||||
assert!(!builder.is_full());
|
||||
builder.insert_key(key.as_bytes(), &mut metrics);
|
||||
builder.insert_key(key.as_bytes(), None, &mut metrics);
|
||||
}
|
||||
assert!(builder.is_full());
|
||||
builder.finish(&mut BTreeMap::new());
|
||||
builder.finish();
|
||||
|
||||
assert!(!builder.is_full());
|
||||
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
|
||||
assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,15 +89,18 @@ impl Partition {
|
||||
let sparse_key = primary_key.clone();
|
||||
primary_key.clear();
|
||||
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
|
||||
let pk_id = inner
|
||||
.shard_builder
|
||||
.write_with_key(primary_key, &key_value, metrics);
|
||||
let pk_id = inner.shard_builder.write_with_key(
|
||||
primary_key,
|
||||
Some(&sparse_key),
|
||||
&key_value,
|
||||
metrics,
|
||||
);
|
||||
inner.pk_to_pk_id.insert(sparse_key, pk_id);
|
||||
} else {
|
||||
// `primary_key` is already the full primary key.
|
||||
let pk_id = inner
|
||||
.shard_builder
|
||||
.write_with_key(primary_key, &key_value, metrics);
|
||||
.write_with_key(primary_key, None, &key_value, metrics);
|
||||
inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
|
||||
};
|
||||
|
||||
|
||||
@@ -423,7 +423,6 @@ impl Node for ShardNode {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
@@ -488,10 +487,10 @@ mod tests {
|
||||
encode_keys(&metadata, kvs, &mut keys);
|
||||
}
|
||||
for key in &keys {
|
||||
dict_builder.insert_key(key, &mut metrics);
|
||||
dict_builder.insert_key(key, None, &mut metrics);
|
||||
}
|
||||
|
||||
let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
|
||||
let (dict, _) = dict_builder.finish().unwrap();
|
||||
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
|
||||
|
||||
Shard::new(
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Builder of a shard.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -71,12 +71,15 @@ impl ShardBuilder {
|
||||
/// Write a key value with its encoded primary key.
|
||||
pub fn write_with_key(
|
||||
&mut self,
|
||||
primary_key: &[u8],
|
||||
full_primary_key: &[u8],
|
||||
sparse_key: Option<&[u8]>,
|
||||
key_value: &KeyValue,
|
||||
metrics: &mut WriteMetrics,
|
||||
) -> PkId {
|
||||
// Safety: we check whether the builder need to freeze before.
|
||||
let pk_index = self.dict_builder.insert_key(primary_key, metrics);
|
||||
let pk_index = self
|
||||
.dict_builder
|
||||
.insert_key(full_primary_key, sparse_key, metrics);
|
||||
self.data_buffer.write_row(pk_index, key_value);
|
||||
PkId {
|
||||
shard_id: self.current_shard_id,
|
||||
@@ -106,10 +109,8 @@ impl ShardBuilder {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut pk_to_index = BTreeMap::new();
|
||||
let key_dict = self.dict_builder.finish(&mut pk_to_index);
|
||||
let data_part = match &key_dict {
|
||||
Some(dict) => {
|
||||
let (data_part, key_dict) = match self.dict_builder.finish() {
|
||||
Some((dict, pk_to_index)) => {
|
||||
// Adds mapping to the map.
|
||||
pk_to_pk_id.reserve(pk_to_index.len());
|
||||
for (k, pk_index) in pk_to_index {
|
||||
@@ -123,11 +124,12 @@ impl ShardBuilder {
|
||||
}
|
||||
|
||||
let pk_weights = dict.pk_weights_to_sort_data();
|
||||
self.data_buffer.freeze(Some(&pk_weights), true)?
|
||||
let part = self.data_buffer.freeze(Some(&pk_weights), true)?;
|
||||
(part, Some(dict))
|
||||
}
|
||||
None => {
|
||||
let pk_weights = [0];
|
||||
self.data_buffer.freeze(Some(&pk_weights), true)?
|
||||
(self.data_buffer.freeze(Some(&pk_weights), true)?, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -367,7 +369,7 @@ mod tests {
|
||||
for key_values in &input {
|
||||
for kv in key_values.iter() {
|
||||
let key = encode_key_by_kv(&kv);
|
||||
shard_builder.write_with_key(&key, &kv, &mut metrics);
|
||||
shard_builder.write_with_key(&key, None, &kv, &mut metrics);
|
||||
}
|
||||
}
|
||||
let shard = shard_builder
|
||||
@@ -389,7 +391,7 @@ mod tests {
|
||||
for key_values in &input {
|
||||
for kv in key_values.iter() {
|
||||
let key = encode_key_by_kv(&kv);
|
||||
shard_builder.write_with_key(&key, &kv, &mut metrics);
|
||||
shard_builder.write_with_key(&key, None, &kv, &mut metrics);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -115,7 +115,7 @@ pub fn metadata_with_primary_key(
|
||||
enable_table_id: bool,
|
||||
) -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
|
||||
let maybe_table_id = if enable_table_id { "table_id" } else { "k1" };
|
||||
let maybe_table_id = if enable_table_id { "__table_id" } else { "k1" };
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
|
||||
|
||||
Reference in New Issue
Block a user