feat: distinguish between different read paths (#3369)

* feat: distinguish between different read paths

* fix: reformat code
This commit is contained in:
Lei, HUANG
2024-02-23 20:40:39 +08:00
committed by GitHub
parent b144836935
commit 1f1d1b4f57
3 changed files with 192 additions and 51 deletions

View File

@@ -146,16 +146,24 @@ impl DataBuffer {
}
}
/// Freezes `DataBuffer` to bytes. Use `pk_weights` to sort rows and replace pk_index to pk_weights.
/// Freezes `DataBuffer` to bytes.
/// If `pk_weights` is present, it will be used to sort rows.
///
/// `freeze` clears the buffers of builders.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None);
pub fn freeze(
&mut self,
pk_weights: Option<&[u16]>,
replace_pk_index: bool,
) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index);
let parts = encoder.write(self)?;
Ok(parts)
}
/// Reads batches from data buffer without resetting builder's buffers.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataBufferReader> {
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
@@ -163,7 +171,10 @@ impl DataBuffer {
pk_weights,
true,
true,
true,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
false,
)?;
DataBufferReader::new(batch)
}
@@ -208,7 +219,7 @@ impl LazyMutableVectorBuilder {
fn data_buffer_to_record_batches(
schema: SchemaRef,
buffer: &mut DataBuffer,
pk_weights: &[u16],
pk_weights: Option<&[u16]>,
keep_data: bool,
dedup: bool,
replace_pk_index: bool,
@@ -408,7 +419,7 @@ impl Ord for InnerKey {
}
fn build_rows_to_sort(
pk_weights: &[u16],
pk_weights: Option<&[u16]>,
pk_index: &UInt16Vector,
ts: &VectorRef,
sequence: &UInt64Vector,
@@ -453,11 +464,16 @@ fn build_rows_to_sort(
.zip(sequence_values.iter())
.enumerate()
.map(|(idx, ((timestamp, pk_index), sequence))| {
let pk_weight = if let Some(weights) = pk_weights {
weights[*pk_index as usize] // if pk_weights is present, sort according to weight.
} else {
*pk_index // otherwise pk_index has already been replaced by weights.
};
(
idx,
InnerKey {
timestamp: *timestamp,
pk_weight: pk_weights[*pk_index as usize],
pk_weight,
sequence: *sequence,
},
)
@@ -493,21 +509,24 @@ fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {
struct DataPartEncoder<'a> {
schema: SchemaRef,
pk_weights: &'a [u16],
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
}
impl<'a> DataPartEncoder<'a> {
pub fn new(
metadata: &RegionMetadataRef,
pk_weights: &'a [u16],
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
) -> DataPartEncoder<'a> {
let schema = memtable_schema_to_encoded_schema(metadata);
Self {
schema,
pk_weights,
row_group_size,
replace_pk_index,
}
}
@@ -528,7 +547,7 @@ impl<'a> DataPartEncoder<'a> {
self.pk_weights,
false,
true,
true,
self.replace_pk_index,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
@@ -689,19 +708,20 @@ impl DataParts {
}
/// Freezes the active data buffer into frozen data parts.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<()> {
self.frozen.push(self.active.freeze(pk_weights)?);
pub fn freeze(&mut self) -> Result<()> {
self.frozen.push(self.active.freeze(None, false)?);
Ok(())
}
/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
/// todo(hl): read may not take any pk weights if is read by `Shard`.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataPartsReader> {
pub fn read(&mut self) -> Result<DataPartsReader> {
let mut nodes = Vec::with_capacity(self.frozen.len() + 1);
nodes.push(DataNode::new(DataSource::Buffer(
self.active.read(pk_weights)?,
// `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights.
// then we pass None to sort rows directly according to pk_index.
self.active.read(None)?,
)));
for p in &self.frozen {
nodes.push(DataNode::new(DataSource::Part(p.read()?)));
@@ -742,6 +762,7 @@ mod tests {
use parquet::data_type::AsBytes;
use super::*;
use crate::memtable::merge_tree::merger::timestamp_array_to_i64_slice;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
#[test]
@@ -773,9 +794,15 @@ mod tests {
write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3);
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true, true)
.unwrap();
let batch = data_buffer_to_record_batches(
schema,
&mut buffer,
Some(&[3, 1]),
keep_data,
true,
true,
)
.unwrap();
assert_eq!(
vec![1, 2, 1, 2],
@@ -839,7 +866,8 @@ mod tests {
assert_eq!(4, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, Some(&[0, 1]), true, true, true)
.unwrap();
assert_eq!(3, batch.num_rows());
assert_eq!(
@@ -893,7 +921,8 @@ mod tests {
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, Some(&[3, 1]), true, false, true)
.unwrap();
assert_eq!(
vec![1, 1, 3, 3, 3],
@@ -944,6 +973,80 @@ mod tests {
}
}
fn check_data_buffer_freeze(
pk_weights: Option<&[u16]>,
replace_pk_weights: bool,
expected: &[(u16, Vec<(i64, u64)>)],
) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
// write rows with null values.
write_rows_to_buffer(
&mut buffer,
&meta,
0,
vec![0, 1, 2],
vec![Some(1.0), None, Some(3.0)],
0,
);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3);
let mut res = Vec::with_capacity(3);
let mut reader = buffer
.freeze(pk_weights, replace_pk_weights)
.unwrap()
.read()
.unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let sequence = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index, ts_and_seq));
reader.next().unwrap();
}
assert_eq!(expected, res);
}
#[test]
fn test_data_buffer_freeze() {
check_data_buffer_freeze(
None,
false,
&[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
);
check_data_buffer_freeze(
Some(&[1, 2]),
false,
&[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
);
check_data_buffer_freeze(
Some(&[3, 2]),
true,
&[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])],
);
check_data_buffer_freeze(
Some(&[3, 2]),
false,
&[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])],
);
}
#[test]
fn test_encode_data_buffer() {
let meta = metadata_for_test();
@@ -965,7 +1068,7 @@ mod tests {
assert_eq!(4, buffer.num_rows());
let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true);
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};
@@ -1010,8 +1113,7 @@ mod tests {
assert_eq!(None, search_next_pk_range(&a, 6));
}
#[test]
fn test_iter_data_buffer() {
fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
@@ -1033,15 +1135,28 @@ mod tests {
2,
);
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]);
let mut iter = buffer.read(pk_weights).unwrap();
check_buffer_values_equal(&mut iter, expected);
}
#[test]
fn test_iter_data_buffer() {
check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]);
check_iter_data_buffer(
Some(&[0, 1, 2, 3]),
&[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]],
);
check_iter_data_buffer(
Some(&[3, 2, 1, 0]),
&[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]],
);
}
#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}
@@ -1095,7 +1210,7 @@ mod tests {
4,
);
let encoder = DataPartEncoder::new(&meta, weights, Some(4));
let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true);
let encoded = encoder.write(&mut buffer).unwrap();
let mut iter = encoded.read().unwrap();

View File

@@ -372,7 +372,7 @@ impl Node for DataNode {
}
}
fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
match arr.data_type() {
DataType::Timestamp(t, _) => match t {
TimeUnit::Second => arr
@@ -470,12 +470,16 @@ mod tests {
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq);
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap()));
let node1 = DataNode::new(DataSource::Part(
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap()));
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
check_merger_read(
vec![node1, node2],
@@ -497,15 +501,21 @@ mod tests {
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq);
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap()));
let node1 = DataNode::new(DataSource::Part(
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap()));
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap()));
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
));
check_merger_read(
vec![node1, node3, node2],
@@ -528,15 +538,21 @@ mod tests {
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap()));
let node1 = DataNode::new(DataSource::Part(
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap()));
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap()));
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
));
check_merger_read(
vec![node1, node3, node2],
@@ -558,18 +574,18 @@ mod tests {
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap()));
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(Some(weight)).unwrap()));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(weight).unwrap().read().unwrap(),
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(weight).unwrap().read().unwrap(),
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
));
check_merger_read(
@@ -592,18 +608,24 @@ mod tests {
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq);
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap()));
let node1 = DataNode::new(DataSource::Part(
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![2], &mut seq);
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq);
let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap()));
let mut buffer4 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer4, &metadata, 0, vec![2], &mut seq);
let node4 = DataNode::new(DataSource::Buffer(buffer4.read(weight).unwrap()));
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
));
check_merger_read(
vec![node1, node3, node4],
vec![node1, node2, node3],
&[
(0, vec![(1, 0)]),
(0, vec![(2, 4)]),
@@ -620,11 +642,15 @@ mod tests {
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![0, 1], &mut seq);
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap()));
let node1 = DataNode::new(DataSource::Part(
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap()));
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
check_merger_read(
vec![node1, node2],

View File

@@ -78,11 +78,11 @@ impl ShardBuilder {
let data_part = match &key_dict {
Some(dict) => {
let pk_weights = dict.pk_weights_to_sort_data();
self.data_buffer.freeze(&pk_weights)?
self.data_buffer.freeze(Some(&pk_weights), true)?
}
None => {
let pk_weights = [0];
self.data_buffer.freeze(&pk_weights)?
self.data_buffer.freeze(Some(&pk_weights), true)?
}
};