From a8b512dded4dbf220de8a353c801a37f5a7d27a8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 22 Dec 2025 13:39:03 +0800 Subject: [PATCH] chore: expose symbols (#7451) * chore/expose-symbols: ### Commit Message Enhance `merge_and_dedup` Functionality in `flush.rs` - **Function Signature Update**: Modified the `merge_and_dedup` function to accept `append_mode` and `merge_mode` as separate parameters instead of using `options`. - **Function Accessibility**: Changed the visibility of `merge_and_dedup` to `pub` to allow external access. - **Function Calls Update**: Updated calls to `merge_and_dedup` within `memtable_flat_sources` to align with the new function signature, passing `options.append_mode` and `options.merge_mode()` directly. Signed-off-by: Lei, HUANG * chore/expose-symbols: ### Add Merge and Deduplication Functionality - **File**: `src/mito2/src/flush.rs` - Introduced `merge_and_dedup` function to merge multiple record batch iterators and apply deduplication based on specified modes. - Added detailed documentation for the function, explaining its arguments, behavior, and usage examples. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/mito2/src/flush.rs | 64 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 058c5272c2..17bdc9aaeb 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -801,7 +801,8 @@ fn memtable_flat_sources( if last_iter_rows > min_flush_rows { let maybe_dedup = merge_and_dedup( &schema, - options, + options.append_mode, + options.merge_mode(), field_column_start, std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), )?; @@ -813,7 +814,13 @@ fn memtable_flat_sources( // Handle remaining iters. if !input_iters.is_empty() { - let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?; + let maybe_dedup = merge_and_dedup( + &schema, + options.append_mode, + options.merge_mode(), + field_column_start, + input_iters, + )?; flat_sources.sources.push(FlatSource::Iter(maybe_dedup)); } @@ -822,19 +829,64 @@ fn memtable_flat_sources( Ok(flat_sources) } -fn merge_and_dedup( +/// Merges multiple record batch iterators and applies deduplication based on the specified mode. +/// +/// This function is used during the flush process to combine data from multiple memtable ranges +/// into a single stream while handling duplicate records according to the configured merge strategy. +/// +/// # Arguments +/// +/// * `schema` - The Arrow schema reference that defines the structure of the record batches +/// * `append_mode` - When true, no deduplication is performed and all records are preserved. +/// This is used for append-only workloads where duplicate handling is not required. +/// * `merge_mode` - The strategy used for deduplication when not in append mode: +/// - `MergeMode::LastRow`: Keeps the last record for each primary key +/// - `MergeMode::LastNonNull`: Keeps the last non-null values for each field +/// * `field_column_start` - The starting column index for fields in the record batch. +/// Used when `MergeMode::LastNonNull` to identify which columns +/// contain field values versus primary key columns. +/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated +/// +/// # Returns +/// +/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated +/// record batches. +/// +/// # Behavior +/// +/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on +/// primary key and timestamp +/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication +/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that +/// applies the specified merge mode: +/// - `LastRow`: Removes duplicate rows, keeping only the last one +/// - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field +/// +/// # Examples +/// +/// ```ignore +/// let merged_iter = merge_and_dedup( +/// &schema, +/// false, // not append mode, apply dedup +/// MergeMode::LastRow, +/// 2, // fields start at column 2 after primary key columns +/// vec![iter1, iter2, iter3], +/// )?; +/// ``` +pub fn merge_and_dedup( schema: &SchemaRef, - options: &RegionOptions, + append_mode: bool, + merge_mode: MergeMode, field_column_start: usize, input_iters: Vec, ) -> Result { let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?; - let maybe_dedup = if options.append_mode { + let maybe_dedup = if append_mode { // No dedup in append mode Box::new(merge_iter) as _ } else { // Dedup according to merge mode. - match options.merge_mode() { + match merge_mode { MergeMode::LastRow => { Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _ }