fix: freeze data buffer in shard (#3468)

* feat: call freeze if the active data buffer in a shard is full

* chore: more metrics

* chore: print metrics

* chore: enlarge freeze threshold

* test: test freeze

* test: fix config test
This commit is contained in:
Yingwen
2024-03-11 22:51:06 +08:00
committed by GitHub
parent 0a4444a43a
commit 06dcd0f6ed
7 changed files with 151 additions and 42 deletions

View File

@@ -85,7 +85,7 @@ impl Default for MergeTreeConfig {
Self { Self {
index_max_keys_per_shard: 8192, index_max_keys_per_shard: 8192,
data_freeze_threshold: 32768, data_freeze_threshold: 131072,
dedup: true, dedup: true,
fork_dictionary_bytes, fork_dictionary_bytes,
} }

View File

@@ -957,6 +957,18 @@ impl DataParts {
self.active.write_row(pk_index, kv) self.active.write_row(pk_index, kv)
} }
/// Returns the number of rows in the active buffer.
pub fn num_active_rows(&self) -> usize {
self.active.num_rows()
}
/// Freezes active buffer and creates a new active buffer.
pub fn freeze(&mut self) -> Result<()> {
let part = self.active.freeze(None, false)?;
self.frozen.push(part);
Ok(())
}
/// Reads data from all parts including active and frozen parts. /// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time. /// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights. /// The order of yielding primary keys is determined by provided weights.
@@ -976,6 +988,11 @@ impl DataParts {
pub(crate) fn is_empty(&self) -> bool { pub(crate) fn is_empty(&self) -> bool {
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
} }
#[cfg(test)]
pub(crate) fn frozen_len(&self) -> usize {
self.frozen.len()
}
} }
pub struct DataPartsReaderBuilder { pub struct DataPartsReaderBuilder {

View File

@@ -78,7 +78,7 @@ impl Partition {
// Finds key in shards, now we ensure one key only exists in one shard. // Finds key in shards, now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) { if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
inner.write_to_shard(pk_id, &key_value); inner.write_to_shard(pk_id, &key_value)?;
inner.num_rows += 1; inner.num_rows += 1;
return Ok(()); return Ok(());
} }
@@ -106,7 +106,7 @@ impl Partition {
} }
/// Writes to the partition without a primary key. /// Writes to the partition without a primary key.
pub fn write_no_key(&self, key_value: KeyValue) { pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
let mut inner = self.inner.write().unwrap(); let mut inner = self.inner.write().unwrap();
// If no primary key, always write to the first shard. // If no primary key, always write to the first shard.
debug_assert!(!inner.shards.is_empty()); debug_assert!(!inner.shards.is_empty());
@@ -117,12 +117,15 @@ impl Partition {
shard_id: 0, shard_id: 0,
pk_index: 0, pk_index: 0,
}; };
inner.shards[0].write_with_pk_id(pk_id, &key_value); inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
inner.num_rows += 1; inner.num_rows += 1;
Ok(())
} }
/// Scans data in the partition. /// Scans data in the partition.
pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> { pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
let start = Instant::now();
let key_filter = if context.need_prune_key { let key_filter = if context.need_prune_key {
Some(PrimaryKeyFilter::new( Some(PrimaryKeyFilter::new(
context.metadata.clone(), context.metadata.clone(),
@@ -150,7 +153,7 @@ impl Partition {
(builder_reader, shard_source) (builder_reader, shard_source)
}; };
context.metrics.num_shards = shard_reader_builders.len(); context.metrics.num_shards += shard_reader_builders.len();
let mut nodes = shard_reader_builders let mut nodes = shard_reader_builders
.into_iter() .into_iter()
.map(|builder| { .map(|builder| {
@@ -161,7 +164,7 @@ impl Partition {
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
if let Some(builder) = builder_source { if let Some(builder) = builder_source {
context.metrics.read_builder = true; context.metrics.num_builder += 1;
// Move the initialization of ShardBuilderReader out of read lock. // Move the initialization of ShardBuilderReader out of read lock.
let shard_builder_reader = let shard_builder_reader =
builder.build(Some(&context.pk_weights), key_filter.clone())?; builder.build(Some(&context.pk_weights), key_filter.clone())?;
@@ -172,8 +175,10 @@ impl Partition {
let merger = ShardMerger::try_new(nodes)?; let merger = ShardMerger::try_new(nodes)?;
if self.dedup { if self.dedup {
let source = DedupReader::try_new(merger)?; let source = DedupReader::try_new(merger)?;
context.metrics.build_partition_reader += start.elapsed();
PartitionReader::new(context, Box::new(source)) PartitionReader::new(context, Box::new(source))
} else { } else {
context.metrics.build_partition_reader += start.elapsed();
PartitionReader::new(context, Box::new(merger)) PartitionReader::new(context, Box::new(merger))
} }
} }
@@ -282,9 +287,10 @@ pub(crate) struct PartitionStats {
#[derive(Default)] #[derive(Default)]
struct PartitionReaderMetrics { struct PartitionReaderMetrics {
build_partition_reader: Duration,
read_source: Duration, read_source: Duration,
data_batch_to_batch: Duration, data_batch_to_batch: Duration,
read_builder: bool, num_builder: usize,
num_shards: usize, num_shards: usize,
} }
@@ -440,9 +446,15 @@ impl Drop for ReadPartitionContext {
.observe(partition_data_batch_to_batch); .observe(partition_data_batch_to_batch);
common_telemetry::debug!( common_telemetry::debug!(
"TreeIter partitions metrics, read_builder: {}, num_shards: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s", "TreeIter partitions metrics, \
self.metrics.read_builder, num_builder: {}, \
num_shards: {}, \
build_partition_reader: {}s, \
partition_read_source: {}s, \
partition_data_batch_to_batch: {}s",
self.metrics.num_builder,
self.metrics.num_shards, self.metrics.num_shards,
self.metrics.build_partition_reader.as_secs_f64(),
partition_read_source, partition_read_source,
partition_data_batch_to_batch, partition_data_batch_to_batch,
); );
@@ -549,7 +561,16 @@ impl Inner {
fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
let (shards, current_shard_id) = if metadata.primary_key.is_empty() { let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup); let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
(vec![Shard::new(0, None, data_parts, config.dedup)], 1) (
vec![Shard::new(
0,
None,
data_parts,
config.dedup,
config.data_freeze_threshold,
)],
1,
)
} else { } else {
(Vec::new(), 0) (Vec::new(), 0)
}; };
@@ -569,18 +590,22 @@ impl Inner {
self.pk_to_pk_id.get(primary_key).copied() self.pk_to_pk_id.get(primary_key).copied()
} }
fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) { fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
if pk_id.shard_id == self.shard_builder.current_shard_id() { if pk_id.shard_id == self.shard_builder.current_shard_id() {
self.shard_builder.write_with_pk_id(pk_id, key_value); self.shard_builder.write_with_pk_id(pk_id, key_value);
return; return Ok(());
}
for shard in &mut self.shards {
if shard.shard_id == pk_id.shard_id {
shard.write_with_pk_id(pk_id, key_value);
self.num_rows += 1;
return;
}
} }
// Safety: We find the shard by shard id.
let shard = self
.shards
.iter_mut()
.find(|shard| shard.shard_id == pk_id.shard_id)
.unwrap();
shard.write_with_pk_id(pk_id, key_value)?;
self.num_rows += 1;
Ok(())
} }
fn freeze_active_shard(&mut self) -> Result<()> { fn freeze_active_shard(&mut self) -> Result<()> {

View File

@@ -39,6 +39,8 @@ pub struct Shard {
/// Data in the shard. /// Data in the shard.
data_parts: DataParts, data_parts: DataParts,
dedup: bool, dedup: bool,
/// Number of rows to freeze a data part.
data_freeze_threshold: usize,
} }
impl Shard { impl Shard {
@@ -48,20 +50,29 @@ impl Shard {
key_dict: Option<KeyDictRef>, key_dict: Option<KeyDictRef>,
data_parts: DataParts, data_parts: DataParts,
dedup: bool, dedup: bool,
data_freeze_threshold: usize,
) -> Shard { ) -> Shard {
Shard { Shard {
shard_id, shard_id,
key_dict, key_dict,
data_parts, data_parts,
dedup, dedup,
data_freeze_threshold,
} }
} }
/// Writes a key value into the shard. /// Writes a key value into the shard.
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) { ///
/// It will freezes the active buffer if it is full.
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
debug_assert_eq!(self.shard_id, pk_id.shard_id); debug_assert_eq!(self.shard_id, pk_id.shard_id);
if self.data_parts.num_active_rows() >= self.data_freeze_threshold {
self.data_parts.freeze()?;
}
self.data_parts.write_row(pk_id.pk_index, key_value); self.data_parts.write_row(pk_id.pk_index, key_value);
Ok(())
} }
/// Scans the shard. /// Scans the shard.
@@ -83,6 +94,7 @@ impl Shard {
key_dict: self.key_dict.clone(), key_dict: self.key_dict.clone(),
data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup), data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
dedup: self.dedup, dedup: self.dedup,
data_freeze_threshold: self.data_freeze_threshold,
} }
} }
@@ -467,6 +479,7 @@ mod tests {
shard_id: ShardId, shard_id: ShardId,
metadata: RegionMetadataRef, metadata: RegionMetadataRef,
input: &[(KeyValues, PkIndex)], input: &[(KeyValues, PkIndex)],
data_freeze_threshold: usize,
) -> Shard { ) -> Shard {
let mut dict_builder = KeyDictBuilder::new(1024); let mut dict_builder = KeyDictBuilder::new(1024);
let mut metrics = WriteMetrics::default(); let mut metrics = WriteMetrics::default();
@@ -481,26 +494,16 @@ mod tests {
let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap(); let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true); let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true) Shard::new(
shard_id,
Some(Arc::new(dict)),
data_parts,
true,
data_freeze_threshold,
)
} }
#[test] fn collect_timestamps(shard: &Shard) -> Vec<i64> {
fn test_write_read_shard() {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let mut shard = new_shard_with_dict(8, metadata, &input);
assert!(shard.is_empty());
for (key_values, pk_index) in &input {
for kv in key_values.iter() {
let pk_id = PkId {
shard_id: shard.shard_id,
pk_index: *pk_index,
};
shard.write_with_pk_id(pk_id, &kv);
}
}
assert!(!shard.is_empty());
let mut reader = shard.read().unwrap().build(None).unwrap(); let mut reader = shard.read().unwrap().build(None).unwrap();
let mut timestamps = Vec::new(); let mut timestamps = Vec::new();
while reader.is_valid() { while reader.is_valid() {
@@ -511,6 +514,64 @@ mod tests {
reader.next().unwrap(); reader.next().unwrap();
} }
timestamps
}
#[test]
fn test_write_read_shard() {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let mut shard = new_shard_with_dict(8, metadata, &input, 100);
assert!(shard.is_empty());
for (key_values, pk_index) in &input {
for kv in key_values.iter() {
let pk_id = PkId {
shard_id: shard.shard_id,
pk_index: *pk_index,
};
shard.write_with_pk_id(pk_id, &kv).unwrap();
}
}
assert!(!shard.is_empty());
let timestamps = collect_timestamps(&shard);
assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps); assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
} }
#[test]
fn test_shard_freeze() {
let metadata = metadata_for_test();
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"shard".to_string(),
0,
[0].into_iter(),
[Some(0.0)].into_iter(),
0,
);
let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50);
let expected: Vec<_> = (0..200).collect();
for i in &expected {
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"shard".to_string(),
0,
[*i].into_iter(),
[Some(0.0)].into_iter(),
*i as u64,
);
let pk_id = PkId {
shard_id: shard.shard_id,
pk_index: *i as PkIndex,
};
for kv in kvs.iter() {
shard.write_with_pk_id(pk_id, &kv).unwrap();
}
}
assert!(!shard.is_empty());
assert_eq!(3, shard.data_parts.frozen_len());
let timestamps = collect_timestamps(&shard);
assert_eq!(expected, timestamps);
}
} }

View File

@@ -138,7 +138,13 @@ impl ShardBuilder {
let shard_id = self.current_shard_id; let shard_id = self.current_shard_id;
self.current_shard_id += 1; self.current_shard_id += 1;
Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup))) Ok(Some(Shard::new(
shard_id,
key_dict,
data_parts,
self.dedup,
self.data_freeze_threshold,
)))
} }
/// Scans the shard builder. /// Scans the shard builder.

View File

@@ -124,7 +124,7 @@ impl MergeTree {
if !has_pk { if !has_pk {
// No primary key. // No primary key.
self.write_no_key(kv); self.write_no_key(kv)?;
continue; continue;
} }
@@ -299,7 +299,7 @@ impl MergeTree {
) )
} }
fn write_no_key(&self, key_value: KeyValue) { fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned); let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
let partition = self.get_or_create_partition(partition_key); let partition = self.get_or_create_partition(partition_key);

View File

@@ -789,7 +789,7 @@ intermediate_path = ""
[datanode.region_engine.mito.memtable] [datanode.region_engine.mito.memtable]
type = "experimental" type = "experimental"
index_max_keys_per_shard = 8192 index_max_keys_per_shard = 8192
data_freeze_threshold = 32768 data_freeze_threshold = 131072
dedup = true dedup = true
fork_dictionary_bytes = "1GiB" fork_dictionary_bytes = "1GiB"