diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index e3214ee71a..e3c0e0a8ab 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -253,26 +253,39 @@ impl DataBuffer { Ok(parts) } - /// Reads batches from data buffer without resetting builder's buffers. - /// If pk_weights is present, yielded rows are sorted according to weights, - /// otherwise rows are sorted by "pk_weights" values as they are actually weights. - pub fn read(&self, pk_weights: Option<&[u16]>) -> Result { - let batch = { - let _timer = MERGE_TREE_READ_STAGE_ELAPSED - .with_label_values(&["read_data_buffer_to_batch"]) - .start_timer(); - read_data_buffer_to_record_batches( - self.data_part_schema.clone(), - self, - pk_weights, - self.dedup, - // replace_pk_index is always set to false since: - // - for DataBuffer in ShardBuilder, pk dict is not frozen - // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`. - false, - )? - }; - DataBufferReader::new(batch) + /// Builds a lazily initialized data buffer reader from [DataBuffer] + pub fn read(&self) -> Result { + let _timer = MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["read_data_buffer"]) + .start_timer(); + + let (pk_index, timestamp, sequence, op_type) = ( + self.pk_index_builder.finish_cloned(), + self.ts_builder.to_vector_cloned(), + self.sequence_builder.finish_cloned(), + self.op_type_builder.finish_cloned(), + ); + + let mut fields = Vec::with_capacity(self.field_builders.len()); + for b in self.field_builders.iter() { + let field = match b { + LazyMutableVectorBuilder::Type(ty) => LazyFieldVector::Type(ty.clone()), + LazyMutableVectorBuilder::Builder(builder) => { + LazyFieldVector::Vector(builder.to_vector_cloned()) + } + }; + fields.push(field); + } + + Ok(DataBufferReaderBuilder { + schema: self.data_part_schema.clone(), + pk_index, + timestamp, + sequence, + op_type, + fields, + dedup: self.dedup, + }) } /// Returns num of rows in data buffer. @@ -356,56 +369,6 @@ fn drain_data_buffer_to_record_batches( RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu) } -/// Reads `DataBuffer` to record batches, with rows sorted according to pk_weights without resetting `DataBuffer`. -/// `dedup`: whether to true to remove the duplicated rows inside `DataBuffer`. -/// `replace_pk_index`: whether to replace the pk_index values with corresponding pk weight. -fn read_data_buffer_to_record_batches( - schema: SchemaRef, - buffer: &DataBuffer, - pk_weights: Option<&[u16]>, - dedup: bool, - replace_pk_index: bool, -) -> Result { - let num_rows = buffer.ts_builder.len(); - - let (pk_index_v, ts_v, sequence_v, op_type_v) = ( - buffer.pk_index_builder.finish_cloned(), - buffer.ts_builder.to_vector_cloned(), - buffer.sequence_builder.finish_cloned(), - buffer.op_type_builder.finish_cloned(), - ); - - let (indices_to_take, mut columns) = build_row_sort_indices_and_columns( - pk_weights, - pk_index_v, - ts_v, - sequence_v, - op_type_v, - replace_pk_index, - dedup, - buffer.field_builders.len() + 4, - )?; - - for b in buffer.field_builders.iter() { - let array = match b { - LazyMutableVectorBuilder::Type(ty) => { - let mut single_null = ty.create_mutable_vector(num_rows); - single_null.push_nulls(num_rows); - single_null.to_vector().to_arrow_array() - } - LazyMutableVectorBuilder::Builder(builder) => { - builder.to_vector_cloned().to_arrow_array() - } - }; - columns.push( - arrow::compute::take(&array, &indices_to_take, None) - .context(error::ComputeArrowSnafu)?, - ); - } - - RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu) -} - #[allow(clippy::too_many_arguments)] fn build_row_sort_indices_and_columns( pk_weights: Option<&[u16]>, @@ -495,6 +458,61 @@ pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { } } +enum LazyFieldVector { + Type(ConcreteDataType), + Vector(VectorRef), +} + +pub(crate) struct DataBufferReaderBuilder { + schema: SchemaRef, + pk_index: UInt16Vector, + timestamp: VectorRef, + sequence: UInt64Vector, + op_type: UInt8Vector, + fields: Vec, + dedup: bool, +} + +impl DataBufferReaderBuilder { + fn build_record_batch(self, pk_weights: Option<&[u16]>) -> Result { + let num_rows = self.timestamp.len(); + let (indices_to_take, mut columns) = build_row_sort_indices_and_columns( + pk_weights, + self.pk_index, + self.timestamp, + self.sequence, + self.op_type, + // replace_pk_index is always set to false since: + // - for DataBuffer in ShardBuilder, pk dict is not frozen + // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`. + false, + self.dedup, + self.fields.len() + 4, + )?; + + for b in self.fields.iter() { + let array = match b { + LazyFieldVector::Type(ty) => { + let mut single_null = ty.create_mutable_vector(num_rows); + single_null.push_nulls(num_rows); + single_null.to_vector().to_arrow_array() + } + LazyFieldVector::Vector(vector) => vector.to_arrow_array(), + }; + columns.push( + arrow::compute::take(&array, &indices_to_take, None) + .context(error::ComputeArrowSnafu)?, + ); + } + RecordBatch::try_new(self.schema, columns).context(error::NewRecordBatchSnafu) + } + + pub fn build(self, pk_weights: Option<&[u16]>) -> Result { + self.build_record_batch(pk_weights) + .and_then(DataBufferReader::new) + } +} + #[derive(Debug)] pub(crate) struct DataBufferReader { batch: RecordBatch, @@ -942,19 +960,39 @@ impl DataParts { /// Reads data from all parts including active and frozen parts. /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. - pub fn read(&self) -> Result { + pub fn read(&self) -> Result { let _timer = MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["build_data_parts_reader"]) .start_timer(); - let mut nodes = Vec::with_capacity(self.frozen.len() + 1); + let buffer = self.active.read()?; + let mut parts = Vec::with_capacity(self.frozen.len()); + for p in &self.frozen { + parts.push(p.read()?); + } + Ok(DataPartsReaderBuilder { buffer, parts }) + } + + pub(crate) fn is_empty(&self) -> bool { + self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) + } +} + +pub struct DataPartsReaderBuilder { + buffer: DataBufferReaderBuilder, + parts: Vec, +} + +impl DataPartsReaderBuilder { + pub(crate) fn build(self) -> Result { + let mut nodes = Vec::with_capacity(self.parts.len() + 1); nodes.push(DataNode::new(DataSource::Buffer( // `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights. // then we pass None to sort rows directly according to pk_index. - self.active.read(None)?, + self.buffer.build(None)?, ))); - for p in &self.frozen { - nodes.push(DataNode::new(DataSource::Part(p.read()?))); + for p in self.parts { + nodes.push(DataNode::new(DataSource::Part(p))); } let merger = Merger::try_new(nodes)?; Ok(DataPartsReader { @@ -962,10 +1000,6 @@ impl DataParts { elapsed: Default::default(), }) } - - pub(crate) fn is_empty(&self) -> bool { - self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) - } } /// Reader for all parts inside a `DataParts`. @@ -1003,7 +1037,7 @@ impl DataPartsReader { #[cfg(test)] mod tests { use datafusion::arrow::array::Float64Array; - use datatypes::arrow::array::{TimestampMillisecondArray, UInt16Array, UInt64Array}; + use datatypes::arrow::array::UInt16Array; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::data_type::AsBytes; @@ -1032,73 +1066,6 @@ mod tests { } } - fn check_test_data_buffer_to_record_batches(keep_data: bool) { - let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); - - write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); - write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2); - write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); - assert_eq!(5, buffer.num_rows()); - let schema = memtable_schema_to_encoded_schema(&meta); - let batch = if keep_data { - read_data_buffer_to_record_batches(schema, &buffer, Some(&[3, 1]), true, true).unwrap() - } else { - drain_data_buffer_to_record_batches(schema, &mut buffer, Some(&[3, 1]), true, true) - .unwrap() - }; - - assert_eq!( - vec![1, 2, 1, 2], - batch - .column_by_name("ts") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - - assert_eq!( - vec![1, 1, 3, 3], - batch - .column_by_name(PK_INDEX_COLUMN_NAME) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - - assert_eq!( - vec![Some(1.1), None, Some(0.1), Some(1.1)], - batch - .column_by_name("v1") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .collect::>() - ); - - if keep_data { - assert_eq!(5, buffer.num_rows()); - } else { - assert_eq!(0, buffer.num_rows()); - } - } - - #[test] - fn test_data_buffer_to_record_batches() { - check_test_data_buffer_to_record_batches(true); - check_test_data_buffer_to_record_batches(false); - } - fn check_data_buffer_dedup(dedup: bool) { let metadata = metadata_for_test(); let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup); @@ -1119,7 +1086,7 @@ mod tests { 2, ); - let mut reader = buffer.read(Some(&[0])).unwrap(); + let mut reader = buffer.read().unwrap().build(Some(&[0])).unwrap(); let mut res = vec![]; while reader.is_valid() { let batch = reader.current_data_batch(); @@ -1139,100 +1106,6 @@ mod tests { check_data_buffer_dedup(false); } - #[test] - fn test_data_buffer_to_record_batches_with_dedup() { - let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); - - write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); - write_rows_to_buffer(&mut buffer, &meta, 1, vec![2], vec![Some(1.1)], 2); - write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); - assert_eq!(4, buffer.num_rows()); - let schema = memtable_schema_to_encoded_schema(&meta); - let batch = - read_data_buffer_to_record_batches(schema, &buffer, Some(&[0, 1]), true, true).unwrap(); - - assert_eq!(3, batch.num_rows()); - assert_eq!( - vec![0, 0, 1], - batch - .column_by_name(PK_INDEX_COLUMN_NAME) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - - assert_eq!( - vec![1, 2, 2], - batch - .column_by_name("ts") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - - assert_eq!( - vec![1, 3, 2], - batch - .column_by_name(SEQUENCE_COLUMN_NAME) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - } - - #[test] - fn test_data_buffer_to_record_batches_without_dedup() { - let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); - - write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); - write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2); - write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); - assert_eq!(5, buffer.num_rows()); - let schema = memtable_schema_to_encoded_schema(&meta); - let batch = read_data_buffer_to_record_batches(schema, &buffer, Some(&[3, 1]), false, true) - .unwrap(); - - assert_eq!( - vec![1, 1, 3, 3, 3], - batch - .column_by_name(PK_INDEX_COLUMN_NAME) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - - assert_eq!( - vec![1, 2, 1, 2, 2], - batch - .column_by_name("ts") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - } - fn check_data_buffer_freeze( pk_weights: Option<&[u16]>, replace_pk_weights: bool, @@ -1388,7 +1261,7 @@ mod tests { 2, ); - let mut iter = buffer.read(pk_weights).unwrap(); + let mut iter = buffer.read().unwrap().build(pk_weights).unwrap(); check_buffer_values_equal(&mut iter, expected); } @@ -1409,7 +1282,7 @@ mod tests { fn test_iter_empty_data_buffer() { let meta = metadata_for_test(); let buffer = DataBuffer::with_capacity(meta.clone(), 10, true); - let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap(); + let mut iter = buffer.read().unwrap().build(Some(&[0, 1, 3, 2])).unwrap(); check_buffer_values_equal(&mut iter, &[]); } diff --git a/src/mito2/src/memtable/merge_tree/dedup.rs b/src/mito2/src/memtable/merge_tree/dedup.rs index a955e3b33d..6f98601821 100644 --- a/src/mito2/src/memtable/merge_tree/dedup.rs +++ b/src/mito2/src/memtable/merge_tree/dedup.rs @@ -179,7 +179,8 @@ mod tests { let parts = DataParts::new(meta, 10, true).with_frozen(frozens); let mut res = Vec::with_capacity(expected.len()); - let mut reader = DedupReader::try_new(MockSource(parts.read().unwrap())).unwrap(); + let mut reader = + DedupReader::try_new(MockSource(parts.read().unwrap().build().unwrap())).unwrap(); while reader.is_valid() { let batch = reader.current_data_batch(); res.push(extract_data_batch(&batch)); diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index c5012e5ee8..5ba5f5aae4 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -466,7 +466,9 @@ mod tests { let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); - let node1 = DataNode::new(DataSource::Buffer(buffer1.read(Some(weight)).unwrap())); + let node1 = DataNode::new(DataSource::Buffer( + buffer1.read().unwrap().build(Some(weight)).unwrap(), + )); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index f12764a672..4d2675917a 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -108,22 +108,35 @@ impl Partition { /// Scans data in the partition. pub fn read(&self, mut context: ReadPartitionContext) -> Result { - let nodes = { + let (builder_source, shard_reader_builders) = { let inner = self.inner.read().unwrap(); - let mut nodes = Vec::with_capacity(inner.shards.len() + 1); - if !inner.shard_builder.is_empty() { + let mut shard_source = Vec::with_capacity(inner.shards.len() + 1); + let builder_reader = if !inner.shard_builder.is_empty() { let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?; - nodes.push(ShardNode::new(ShardSource::Builder(builder_reader))); - } + Some(builder_reader) + } else { + None + }; for shard in &inner.shards { if !shard.is_empty() { - let shard_reader = shard.read()?; - nodes.push(ShardNode::new(ShardSource::Shard(shard_reader))); + let shard_reader_builder = shard.read()?; + shard_source.push(shard_reader_builder); } } - nodes + (builder_reader, shard_source) }; + let mut nodes = shard_reader_builders + .into_iter() + .map(|builder| Ok(ShardNode::new(ShardSource::Shard(builder.build()?)))) + .collect::>>()?; + + if let Some(builder) = builder_source { + // Move the initialization of ShardBuilderReader out of read lock. + let shard_builder_reader = builder.build(Some(&context.pk_weights))?; + nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader))); + } + // Creating a shard merger will invoke next so we do it outside the lock. let merger = ShardMerger::try_new(nodes)?; if self.dedup { diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 9d832e9ada..30f0a80daa 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -20,7 +20,9 @@ use store_api::metadata::RegionMetadataRef; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{DataBatch, DataParts, DataPartsReader, DATA_INIT_CAP}; +use crate::memtable::merge_tree::data::{ + DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP, +}; use crate::memtable::merge_tree::dict::KeyDictRef; use crate::memtable::merge_tree::merger::{Merger, Node}; use crate::memtable::merge_tree::shard_builder::ShardBuilderReader; @@ -61,13 +63,13 @@ impl Shard { /// Scans the shard. // TODO(yingwen): Push down projection to data parts. - pub fn read(&self) -> Result { + pub fn read(&self) -> Result { let parts_reader = self.data_parts.read()?; - Ok(ShardReader { + Ok(ShardReaderBuilder { shard_id: self.shard_id, key_dict: self.key_dict.clone(), - parts_reader, + inner: parts_reader, }) } @@ -122,6 +124,28 @@ pub trait DataBatchSource { pub type BoxedDataBatchSource = Box; +pub struct ShardReaderBuilder { + shard_id: ShardId, + key_dict: Option, + inner: DataPartsReaderBuilder, +} + +impl ShardReaderBuilder { + pub(crate) fn build(self) -> Result { + let ShardReaderBuilder { + shard_id, + key_dict, + inner, + } = self; + let parts_reader = inner.build()?; + Ok(ShardReader { + shard_id, + key_dict, + parts_reader, + }) + } +} + /// Reader to read rows in a shard. pub struct ShardReader { shard_id: ShardId, @@ -398,7 +422,7 @@ mod tests { } assert!(!shard.is_empty()); - let mut reader = shard.read().unwrap(); + let mut reader = shard.read().unwrap().build().unwrap(); let mut timestamps = Vec::new(); while reader.is_valid() { let rb = reader.current_data_batch().slice_record_batch(); diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index c2185f2d35..0ffaa91e0e 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef; use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::data::{ - DataBatch, DataBuffer, DataBufferReader, DataParts, DATA_INIT_CAP, + DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP, }; use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder}; use crate::memtable::merge_tree::metrics::WriteMetrics; @@ -125,7 +125,7 @@ impl ShardBuilder { } /// Scans the shard builder. - pub fn read(&self, pk_weights_buffer: &mut Vec) -> Result { + pub fn read(&self, pk_weights_buffer: &mut Vec) -> Result { let dict_reader = { let _timer = MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["shard_builder_read_pk"]) @@ -140,8 +140,8 @@ impl ShardBuilder { dict_reader.pk_weights_to_sort_data(pk_weights_buffer); } - let data_reader = self.data_buffer.read(Some(pk_weights_buffer))?; - Ok(ShardBuilderReader { + let data_reader = self.data_buffer.read()?; + Ok(ShardBuilderReaderBuilder { shard_id: self.current_shard_id, dict_reader, data_reader, @@ -154,6 +154,23 @@ impl ShardBuilder { } } +pub(crate) struct ShardBuilderReaderBuilder { + shard_id: ShardId, + dict_reader: DictBuilderReader, + data_reader: DataBufferReaderBuilder, +} + +impl ShardBuilderReaderBuilder { + pub(crate) fn build(self, pk_weights: Option<&[u16]>) -> Result { + let data_reader = self.data_reader.build(pk_weights)?; + Ok(ShardBuilderReader { + shard_id: self.shard_id, + dict_reader: self.dict_reader, + data_reader, + }) + } +} + /// Reader to scan a shard builder. pub struct ShardBuilderReader { shard_id: ShardId, @@ -271,7 +288,11 @@ mod tests { } let mut pk_weights = Vec::new(); - let mut reader = shard_builder.read(&mut pk_weights).unwrap(); + let mut reader = shard_builder + .read(&mut pk_weights) + .unwrap() + .build(Some(&pk_weights)) + .unwrap(); let mut timestamps = Vec::new(); while reader.is_valid() { let rb = reader.current_data_batch().slice_record_batch();