mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
chore: expose symbols (#7451)
* chore/expose-symbols: ### Commit Message Enhance `merge_and_dedup` Functionality in `flush.rs` - **Function Signature Update**: Modified the `merge_and_dedup` function to accept `append_mode` and `merge_mode` as separate parameters instead of using `options`. - **Function Accessibility**: Changed the visibility of `merge_and_dedup` to `pub` to allow external access. - **Function Calls Update**: Updated calls to `merge_and_dedup` within `memtable_flat_sources` to align with the new function signature, passing `options.append_mode` and `options.merge_mode()` directly. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/expose-symbols: ### Add Merge and Deduplication Functionality - **File**: `src/mito2/src/flush.rs` - Introduced `merge_and_dedup` function to merge multiple record batch iterators and apply deduplication based on specified modes. - Added detailed documentation for the function, explaining its arguments, behavior, and usage examples. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -801,7 +801,8 @@ fn memtable_flat_sources(
|
|||||||
if last_iter_rows > min_flush_rows {
|
if last_iter_rows > min_flush_rows {
|
||||||
let maybe_dedup = merge_and_dedup(
|
let maybe_dedup = merge_and_dedup(
|
||||||
&schema,
|
&schema,
|
||||||
options,
|
options.append_mode,
|
||||||
|
options.merge_mode(),
|
||||||
field_column_start,
|
field_column_start,
|
||||||
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
|
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
|
||||||
)?;
|
)?;
|
||||||
@@ -813,7 +814,13 @@ fn memtable_flat_sources(
|
|||||||
|
|
||||||
// Handle remaining iters.
|
// Handle remaining iters.
|
||||||
if !input_iters.is_empty() {
|
if !input_iters.is_empty() {
|
||||||
let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
|
let maybe_dedup = merge_and_dedup(
|
||||||
|
&schema,
|
||||||
|
options.append_mode,
|
||||||
|
options.merge_mode(),
|
||||||
|
field_column_start,
|
||||||
|
input_iters,
|
||||||
|
)?;
|
||||||
|
|
||||||
flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
|
flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
|
||||||
}
|
}
|
||||||
@@ -822,19 +829,64 @@ fn memtable_flat_sources(
|
|||||||
Ok(flat_sources)
|
Ok(flat_sources)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge_and_dedup(
|
/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
|
||||||
|
///
|
||||||
|
/// This function is used during the flush process to combine data from multiple memtable ranges
|
||||||
|
/// into a single stream while handling duplicate records according to the configured merge strategy.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `schema` - The Arrow schema reference that defines the structure of the record batches
|
||||||
|
/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
|
||||||
|
/// This is used for append-only workloads where duplicate handling is not required.
|
||||||
|
/// * `merge_mode` - The strategy used for deduplication when not in append mode:
|
||||||
|
/// - `MergeMode::LastRow`: Keeps the last record for each primary key
|
||||||
|
/// - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
|
||||||
|
/// * `field_column_start` - The starting column index for fields in the record batch.
|
||||||
|
/// Used when `MergeMode::LastNonNull` to identify which columns
|
||||||
|
/// contain field values versus primary key columns.
|
||||||
|
/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
|
||||||
|
/// record batches.
|
||||||
|
///
|
||||||
|
/// # Behavior
|
||||||
|
///
|
||||||
|
/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
|
||||||
|
/// primary key and timestamp
|
||||||
|
/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
|
||||||
|
/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
|
||||||
|
/// applies the specified merge mode:
|
||||||
|
/// - `LastRow`: Removes duplicate rows, keeping only the last one
|
||||||
|
/// - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```ignore
|
||||||
|
/// let merged_iter = merge_and_dedup(
|
||||||
|
/// &schema,
|
||||||
|
/// false, // not append mode, apply dedup
|
||||||
|
/// MergeMode::LastRow,
|
||||||
|
/// 2, // fields start at column 2 after primary key columns
|
||||||
|
/// vec![iter1, iter2, iter3],
|
||||||
|
/// )?;
|
||||||
|
/// ```
|
||||||
|
pub fn merge_and_dedup(
|
||||||
schema: &SchemaRef,
|
schema: &SchemaRef,
|
||||||
options: &RegionOptions,
|
append_mode: bool,
|
||||||
|
merge_mode: MergeMode,
|
||||||
field_column_start: usize,
|
field_column_start: usize,
|
||||||
input_iters: Vec<BoxedRecordBatchIterator>,
|
input_iters: Vec<BoxedRecordBatchIterator>,
|
||||||
) -> Result<BoxedRecordBatchIterator> {
|
) -> Result<BoxedRecordBatchIterator> {
|
||||||
let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
|
let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
|
||||||
let maybe_dedup = if options.append_mode {
|
let maybe_dedup = if append_mode {
|
||||||
// No dedup in append mode
|
// No dedup in append mode
|
||||||
Box::new(merge_iter) as _
|
Box::new(merge_iter) as _
|
||||||
} else {
|
} else {
|
||||||
// Dedup according to merge mode.
|
// Dedup according to merge mode.
|
||||||
match options.merge_mode() {
|
match merge_mode {
|
||||||
MergeMode::LastRow => {
|
MergeMode::LastRow => {
|
||||||
Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
|
Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user