chore: expose symbols (#7417)

* refactor/expose-symbols:
 ## Refactor `bulk/part.rs` to Simplify Mutation Handling

 - Removed the `mutations_to_record_batch` function and its associated helper functions, including `ArraysSorter`, `timestamp_array_to_iter`, and `binary_array_to_dictionary`, to simplify the mutation handling logic in `bulk/part.rs`.
 - Deleted related test functions `check_binary_array_to_dictionary` and `check_mutations_to_record_batches` from the test module, along with their associated test cases.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/expose-symbols:
 ### Commit Message

 **Refactor and Enhance Deduplication Logic**

 - **`flush.rs`**: Refactored `maybe_dedup_one` function to accept `append_mode` and `merge_mode` as parameters instead of `RegionOptions`. This change enhances flexibility in deduplication logic.
 - **`memtable/bulk.rs`**: Made `BulkRangeIterBuilder` struct and its fields public to allow external access and modification, improving extensibility.
 - **`sst.rs`**: Corrected a typo in the schema documentation, changing `__prmary_key` to `__primary_key` for clarity and accuracy.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-12-17 09:29:36 +08:00
committed by GitHub
parent a35a39f726
commit da964880f5
5 changed files with 18 additions and 560 deletions

View File

@@ -774,7 +774,12 @@ fn memtable_flat_sources(
let iter = only_range.build_record_batch_iter(None)?;
// Dedup according to append mode and merge mode.
// Even single range may have duplicate rows.
let iter = maybe_dedup_one(options, field_column_start, iter);
let iter = maybe_dedup_one(
options.append_mode,
options.merge_mode(),
field_column_start,
iter,
);
flat_sources.sources.push(FlatSource::Iter(iter));
};
} else {
@@ -842,17 +847,18 @@ fn merge_and_dedup(
Ok(maybe_dedup)
}
fn maybe_dedup_one(
options: &RegionOptions,
pub fn maybe_dedup_one(
append_mode: bool,
merge_mode: MergeMode,
field_column_start: usize,
input_iter: BoxedRecordBatchIterator,
) -> BoxedRecordBatchIterator {
if options.append_mode {
if append_mode {
// No dedup in append mode
input_iter
} else {
// Dedup according to merge mode.
match options.merge_mode() {
match merge_mode {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
}

View File

@@ -55,10 +55,8 @@ pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
#[cfg(any(test, feature = "test"))]
pub use bulk::part::BulkPart;
pub use bulk::part::{
BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
sort_primary_key_record_batch,
};
#[cfg(any(test, feature = "test"))]

View File

@@ -668,10 +668,10 @@ impl BulkMemtable {
}
/// Iterator builder for bulk range
struct BulkRangeIterBuilder {
part: BulkPart,
context: Arc<BulkIterContext>,
sequence: Option<SequenceRange>,
pub struct BulkRangeIterBuilder {
pub part: BulkPart,
pub context: Arc<BulkIterContext>,
pub sequence: Option<SequenceRange>,
}
impl IterBuilder for BulkRangeIterBuilder {
@@ -1188,7 +1188,6 @@ impl MemtableBuilder for BulkMemtableBuilder {
#[cfg(test)]
mod tests {
use mito_codec::row_converter::build_primary_key_codec;
use super::*;

View File

@@ -1211,343 +1211,24 @@ impl BulkPartEncoder {
}
}
/// Converts mutations to record batches.
fn mutations_to_record_batch(
mutations: &[Mutation],
metadata: &RegionMetadataRef,
pk_encoder: &DensePrimaryKeyCodec,
dedup: bool,
) -> Result<Option<(RecordBatch, i64, i64)>> {
let total_rows: usize = mutations
.iter()
.map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
.sum();
if total_rows == 0 {
return Ok(None);
}
let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
let mut ts_vector: Box<dyn MutableVector> = metadata
.time_index_column()
.column_schema
.data_type
.create_mutable_vector(total_rows);
let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
.field_columns()
.map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
.collect();
let mut pk_buffer = vec![];
for m in mutations {
let Some(key_values) = KeyValuesRef::new(metadata, m) else {
continue;
};
for row in key_values.iter() {
pk_buffer.clear();
pk_encoder
.encode_to_vec(row.primary_keys(), &mut pk_buffer)
.context(EncodeSnafu)?;
pk_builder.append_value(pk_buffer.as_bytes());
ts_vector.push_value_ref(&row.timestamp());
sequence_builder.append_value(row.sequence());
op_type_builder.append_value(row.op_type() as u8);
for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
builder.push_value_ref(&field);
}
}
}
let arrow_schema = to_sst_arrow_schema(metadata);
// safety: timestamp column must be valid, and values must not be None.
let timestamp_unit = metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap()
.unit();
let sorter = ArraysSorter {
encoded_primary_keys: pk_builder.finish(),
timestamp_unit,
timestamp: ts_vector.to_vector().to_arrow_array(),
sequence: sequence_builder.finish(),
op_type: op_type_builder.finish(),
fields: field_builders
.iter_mut()
.map(|f| f.to_vector().to_arrow_array()),
dedup,
arrow_schema,
};
sorter.sort().map(Some)
}
struct ArraysSorter<I> {
encoded_primary_keys: BinaryArray,
timestamp_unit: TimeUnit,
timestamp: ArrayRef,
sequence: UInt64Array,
op_type: UInt8Array,
fields: I,
dedup: bool,
arrow_schema: SchemaRef,
}
impl<I> ArraysSorter<I>
where
I: Iterator<Item = ArrayRef>,
{
/// Converts arrays to record batch.
fn sort(self) -> Result<(RecordBatch, i64, i64)> {
debug_assert!(!self.timestamp.is_empty());
debug_assert!(self.timestamp.len() == self.sequence.len());
debug_assert!(self.timestamp.len() == self.op_type.len());
debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
let mut to_sort = self
.encoded_primary_keys
.iter()
.zip(timestamp_iter)
.zip(self.sequence.iter())
.map(|((pk, timestamp), sequence)| {
max_timestamp = max_timestamp.max(*timestamp);
min_timestamp = min_timestamp.min(*timestamp);
(pk, timestamp, sequence)
})
.enumerate()
.collect::<Vec<_>>();
to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
l_pk.cmp(r_pk)
.then(l_ts.cmp(r_ts))
.then(l_seq.cmp(r_seq).reverse())
});
if self.dedup {
// Dedup by timestamps while ignore sequence.
to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
l_pk == r_pk && l_ts == r_ts
});
}
let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
let pk_dictionary = Arc::new(binary_array_to_dictionary(
// safety: pk must be BinaryArray
arrow::compute::take(
&self.encoded_primary_keys,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap(),
)?) as ArrayRef;
let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
for arr in self.fields {
arrays.push(
arrow::compute::take(
&arr,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
}
let timestamp = arrow::compute::take(
&self.timestamp,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?;
arrays.push(timestamp);
arrays.push(pk_dictionary);
arrays.push(
arrow::compute::take(
&self.sequence,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
arrays.push(
arrow::compute::take(
&self.op_type,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
Ok((batch, min_timestamp, max_timestamp))
}
}
/// Converts timestamp array to an iter of i64 values.
fn timestamp_array_to_iter(
timestamp_unit: TimeUnit,
timestamp: &ArrayRef,
) -> impl Iterator<Item = &i64> {
match timestamp_unit {
// safety: timestamp column must be valid.
TimeUnit::Second => timestamp
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Millisecond => timestamp
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Microsecond => timestamp
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Nanosecond => timestamp
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.values()
.iter(),
}
}
/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
if input.is_empty() {
return Ok(DictionaryArray::new(
UInt32Array::from(Vec::<u32>::new()),
Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
));
}
let mut keys = Vec::with_capacity(16);
let mut values = BinaryBuilder::new();
let mut prev: usize = 0;
keys.push(prev as u32);
values.append_value(input.value(prev));
for current_bytes in input.iter().skip(1) {
// safety: encoded pk must present.
let current_bytes = current_bytes.unwrap();
let prev_bytes = input.value(prev);
if current_bytes != prev_bytes {
values.append_value(current_bytes);
prev += 1;
}
keys.push(prev as u32);
}
Ok(DictionaryArray::new(
UInt32Array::from(keys),
Arc::new(values.finish()) as ArrayRef,
))
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use api::v1::{Row, SemanticType, WriteHint};
use datafusion_common::ScalarValue;
use datatypes::arrow::array::Float64Array;
use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::memtable::bulk::context::BulkIterContext;
use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
};
fn check_binary_array_to_dictionary(
input: &[&[u8]],
expected_keys: &[u32],
expected_values: &[&[u8]],
) {
let input = BinaryArray::from_iter_values(input.iter());
let array = binary_array_to_dictionary(&input).unwrap();
assert_eq!(
&expected_keys,
&array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
);
assert_eq!(
expected_values,
&array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>()
);
}
#[test]
fn test_binary_array_to_dictionary() {
check_binary_array_to_dictionary(&[], &[], &[]);
check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
check_binary_array_to_dictionary(
&["a".as_bytes(), "a".as_bytes()],
&[0, 0],
&["a".as_bytes()],
);
check_binary_array_to_dictionary(
&["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
&[0, 0, 1],
&["a".as_bytes(), "b".as_bytes()],
);
check_binary_array_to_dictionary(
&[
"a".as_bytes(),
"a".as_bytes(),
"b".as_bytes(),
"c".as_bytes(),
],
&[0, 0, 1, 2],
&["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
);
}
struct MutationInput<'a> {
k0: &'a str,
k1: u32,
@@ -1563,232 +1244,6 @@ mod tests {
v1: &'a [Option<f64>],
}
fn check_mutations_to_record_batches(
input: &[MutationInput],
expected: &[BatchOutput],
expected_timestamp: (i64, i64),
dedup: bool,
) {
let metadata = metadata_for_test();
let mutations = input
.iter()
.map(|m| {
build_key_values_with_ts_seq_values(
&metadata,
m.k0.to_string(),
m.k1,
m.timestamps.iter().copied(),
m.v1.iter().copied(),
m.sequence,
)
.mutation
})
.collect::<Vec<_>>();
let total_rows: usize = mutations
.iter()
.flat_map(|m| m.rows.iter())
.map(|r| r.rows.len())
.sum();
let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
.unwrap()
.unwrap();
let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
let mut batches = VecDeque::new();
read_format
.convert_record_batch(&batch, None, &mut batches)
.unwrap();
if !dedup {
assert_eq!(
total_rows,
batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
);
}
let batch_values = batches
.into_iter()
.map(|b| {
let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
let timestamps = b
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let float_values = b.fields()[1]
.data
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>();
(pk_values, timestamps, float_values)
})
.collect::<Vec<_>>();
assert_eq!(expected.len(), batch_values.len());
for idx in 0..expected.len() {
assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
assert_eq!(expected[idx].v1, &batch_values[idx].2);
}
}
#[test]
fn test_mutations_to_record_batch() {
check_mutations_to_record_batches(
&[MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
}],
&[BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.1)],
}],
(0, 0),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[1],
v1: &[Some(0.2)],
sequence: 1,
},
MutationInput {
k0: "a",
k1: 1,
timestamps: &[1],
v1: &[Some(0.3)],
sequence: 2,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 1],
v1: &[Some(0.1), Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(1)],
timestamps: &[1],
v1: &[Some(0.3)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.0)],
},
],
(0, 1),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.2)],
sequence: 1,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.0)],
},
],
(0, 0),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.2)],
sequence: 1,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 0],
v1: &[Some(0.2), Some(0.1)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.0)],
},
],
(0, 0),
false,
);
}
fn encode(input: &[MutationInput]) -> EncodedBulkPart {
let metadata = metadata_for_test();
let kvs = input

View File

@@ -121,7 +121,7 @@ impl FlatSchemaOptions {
///
/// The schema is:
/// ```text
/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type
/// primary key columns, field columns, time index, __primary_key, __sequence, __op_type
/// ```
///
/// # Panics