diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 5f80ba746a..1789959adf 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -85,7 +85,7 @@ impl Default for MergeTreeConfig { Self { index_max_keys_per_shard: 8192, - data_freeze_threshold: 32768, + data_freeze_threshold: 131072, dedup: true, fork_dictionary_bytes, } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index e43d06a223..3f6557ee03 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -957,6 +957,18 @@ impl DataParts { self.active.write_row(pk_index, kv) } + /// Returns the number of rows in the active buffer. + pub fn num_active_rows(&self) -> usize { + self.active.num_rows() + } + + /// Freezes active buffer and creates a new active buffer. + pub fn freeze(&mut self) -> Result<()> { + let part = self.active.freeze(None, false)?; + self.frozen.push(part); + Ok(()) + } + /// Reads data from all parts including active and frozen parts. /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. @@ -976,6 +988,11 @@ impl DataParts { pub(crate) fn is_empty(&self) -> bool { self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) } + + #[cfg(test)] + pub(crate) fn frozen_len(&self) -> usize { + self.frozen.len() + } } pub struct DataPartsReaderBuilder { diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index d4bd0644b5..f031de57eb 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -78,7 +78,7 @@ impl Partition { // Finds key in shards, now we ensure one key only exists in one shard. if let Some(pk_id) = inner.find_key_in_shards(primary_key) { - inner.write_to_shard(pk_id, &key_value); + inner.write_to_shard(pk_id, &key_value)?; inner.num_rows += 1; return Ok(()); } @@ -106,7 +106,7 @@ impl Partition { } /// Writes to the partition without a primary key. - pub fn write_no_key(&self, key_value: KeyValue) { + pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> { let mut inner = self.inner.write().unwrap(); // If no primary key, always write to the first shard. debug_assert!(!inner.shards.is_empty()); @@ -117,12 +117,15 @@ impl Partition { shard_id: 0, pk_index: 0, }; - inner.shards[0].write_with_pk_id(pk_id, &key_value); + inner.shards[0].write_with_pk_id(pk_id, &key_value)?; inner.num_rows += 1; + + Ok(()) } /// Scans data in the partition. pub fn read(&self, mut context: ReadPartitionContext) -> Result { + let start = Instant::now(); let key_filter = if context.need_prune_key { Some(PrimaryKeyFilter::new( context.metadata.clone(), @@ -150,7 +153,7 @@ impl Partition { (builder_reader, shard_source) }; - context.metrics.num_shards = shard_reader_builders.len(); + context.metrics.num_shards += shard_reader_builders.len(); let mut nodes = shard_reader_builders .into_iter() .map(|builder| { @@ -161,7 +164,7 @@ impl Partition { .collect::>>()?; if let Some(builder) = builder_source { - context.metrics.read_builder = true; + context.metrics.num_builder += 1; // Move the initialization of ShardBuilderReader out of read lock. let shard_builder_reader = builder.build(Some(&context.pk_weights), key_filter.clone())?; @@ -172,8 +175,10 @@ impl Partition { let merger = ShardMerger::try_new(nodes)?; if self.dedup { let source = DedupReader::try_new(merger)?; + context.metrics.build_partition_reader += start.elapsed(); PartitionReader::new(context, Box::new(source)) } else { + context.metrics.build_partition_reader += start.elapsed(); PartitionReader::new(context, Box::new(merger)) } } @@ -282,9 +287,10 @@ pub(crate) struct PartitionStats { #[derive(Default)] struct PartitionReaderMetrics { + build_partition_reader: Duration, read_source: Duration, data_batch_to_batch: Duration, - read_builder: bool, + num_builder: usize, num_shards: usize, } @@ -440,9 +446,15 @@ impl Drop for ReadPartitionContext { .observe(partition_data_batch_to_batch); common_telemetry::debug!( - "TreeIter partitions metrics, read_builder: {}, num_shards: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s", - self.metrics.read_builder, + "TreeIter partitions metrics, \ + num_builder: {}, \ + num_shards: {}, \ + build_partition_reader: {}s, \ + partition_read_source: {}s, \ + partition_data_batch_to_batch: {}s", + self.metrics.num_builder, self.metrics.num_shards, + self.metrics.build_partition_reader.as_secs_f64(), partition_read_source, partition_data_batch_to_batch, ); @@ -549,7 +561,16 @@ impl Inner { fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { let (shards, current_shard_id) = if metadata.primary_key.is_empty() { let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup); - (vec![Shard::new(0, None, data_parts, config.dedup)], 1) + ( + vec![Shard::new( + 0, + None, + data_parts, + config.dedup, + config.data_freeze_threshold, + )], + 1, + ) } else { (Vec::new(), 0) }; @@ -569,18 +590,22 @@ impl Inner { self.pk_to_pk_id.get(primary_key).copied() } - fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) { + fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> { if pk_id.shard_id == self.shard_builder.current_shard_id() { self.shard_builder.write_with_pk_id(pk_id, key_value); - return; - } - for shard in &mut self.shards { - if shard.shard_id == pk_id.shard_id { - shard.write_with_pk_id(pk_id, key_value); - self.num_rows += 1; - return; - } + return Ok(()); } + + // Safety: We find the shard by shard id. + let shard = self + .shards + .iter_mut() + .find(|shard| shard.shard_id == pk_id.shard_id) + .unwrap(); + shard.write_with_pk_id(pk_id, key_value)?; + self.num_rows += 1; + + Ok(()) } fn freeze_active_shard(&mut self) -> Result<()> { diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 7f981f9162..2ac1ee90bc 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -39,6 +39,8 @@ pub struct Shard { /// Data in the shard. data_parts: DataParts, dedup: bool, + /// Number of rows to freeze a data part. + data_freeze_threshold: usize, } impl Shard { @@ -48,20 +50,29 @@ impl Shard { key_dict: Option, data_parts: DataParts, dedup: bool, + data_freeze_threshold: usize, ) -> Shard { Shard { shard_id, key_dict, data_parts, dedup, + data_freeze_threshold, } } /// Writes a key value into the shard. - pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) { + /// + /// It will freezes the active buffer if it is full. + pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> { debug_assert_eq!(self.shard_id, pk_id.shard_id); + if self.data_parts.num_active_rows() >= self.data_freeze_threshold { + self.data_parts.freeze()?; + } + self.data_parts.write_row(pk_id.pk_index, key_value); + Ok(()) } /// Scans the shard. @@ -83,6 +94,7 @@ impl Shard { key_dict: self.key_dict.clone(), data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup), dedup: self.dedup, + data_freeze_threshold: self.data_freeze_threshold, } } @@ -467,6 +479,7 @@ mod tests { shard_id: ShardId, metadata: RegionMetadataRef, input: &[(KeyValues, PkIndex)], + data_freeze_threshold: usize, ) -> Shard { let mut dict_builder = KeyDictBuilder::new(1024); let mut metrics = WriteMetrics::default(); @@ -481,26 +494,16 @@ mod tests { let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap(); let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true); - Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true) + Shard::new( + shard_id, + Some(Arc::new(dict)), + data_parts, + true, + data_freeze_threshold, + ) } - #[test] - fn test_write_read_shard() { - let metadata = metadata_for_test(); - let input = input_with_key(&metadata); - let mut shard = new_shard_with_dict(8, metadata, &input); - assert!(shard.is_empty()); - for (key_values, pk_index) in &input { - for kv in key_values.iter() { - let pk_id = PkId { - shard_id: shard.shard_id, - pk_index: *pk_index, - }; - shard.write_with_pk_id(pk_id, &kv); - } - } - assert!(!shard.is_empty()); - + fn collect_timestamps(shard: &Shard) -> Vec { let mut reader = shard.read().unwrap().build(None).unwrap(); let mut timestamps = Vec::new(); while reader.is_valid() { @@ -511,6 +514,64 @@ mod tests { reader.next().unwrap(); } + timestamps + } + + #[test] + fn test_write_read_shard() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let mut shard = new_shard_with_dict(8, metadata, &input, 100); + assert!(shard.is_empty()); + for (key_values, pk_index) in &input { + for kv in key_values.iter() { + let pk_id = PkId { + shard_id: shard.shard_id, + pk_index: *pk_index, + }; + shard.write_with_pk_id(pk_id, &kv).unwrap(); + } + } + assert!(!shard.is_empty()); + + let timestamps = collect_timestamps(&shard); assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps); } + + #[test] + fn test_shard_freeze() { + let metadata = metadata_for_test(); + let kvs = build_key_values_with_ts_seq_values( + &metadata, + "shard".to_string(), + 0, + [0].into_iter(), + [Some(0.0)].into_iter(), + 0, + ); + let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50); + let expected: Vec<_> = (0..200).collect(); + for i in &expected { + let kvs = build_key_values_with_ts_seq_values( + &metadata, + "shard".to_string(), + 0, + [*i].into_iter(), + [Some(0.0)].into_iter(), + *i as u64, + ); + let pk_id = PkId { + shard_id: shard.shard_id, + pk_index: *i as PkIndex, + }; + for kv in kvs.iter() { + shard.write_with_pk_id(pk_id, &kv).unwrap(); + } + } + assert!(!shard.is_empty()); + assert_eq!(3, shard.data_parts.frozen_len()); + + let timestamps = collect_timestamps(&shard); + assert_eq!(expected, timestamps); + } } diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 2b007ebd87..01cb2de25a 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -138,7 +138,13 @@ impl ShardBuilder { let shard_id = self.current_shard_id; self.current_shard_id += 1; - Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup))) + Ok(Some(Shard::new( + shard_id, + key_dict, + data_parts, + self.dedup, + self.data_freeze_threshold, + ))) } /// Scans the shard builder. diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 94c87f7583..0a42e13fde 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -124,7 +124,7 @@ impl MergeTree { if !has_pk { // No primary key. - self.write_no_key(kv); + self.write_no_key(kv)?; continue; } @@ -299,7 +299,7 @@ impl MergeTree { ) } - fn write_no_key(&self, key_value: KeyValue) { + fn write_no_key(&self, key_value: KeyValue) -> Result<()> { let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned); let partition = self.get_or_create_partition(partition_key); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 32feebe161..a75e9e2196 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -789,7 +789,7 @@ intermediate_path = "" [datanode.region_engine.mito.memtable] type = "experimental" index_max_keys_per_shard = 8192 -data_freeze_threshold = 32768 +data_freeze_threshold = 131072 dedup = true fork_dictionary_bytes = "1GiB"