From 65a88a63db2b578568de650d05bd639fa316ee05 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 6 Feb 2025 07:36:37 +0000 Subject: [PATCH] poc-write-path: **feat(memtable): Add BulkMemtableBuilder and BulkMemtable** - Introduced `BulkMemtableBuilder` and `BulkMemtable` in `memtable.rs` and `bulk.rs` to support bulk operations. - Added environment variable check for `enable_bulk_memtable` to conditionally use `BulkMemtableBuilder`. - Implemented `MemtableBuilder` for `BulkMemtableBuilder` and `Memtable` for `BulkMemtable`. - Included new fields `dedup` and `merge_mode` in `BulkMemtable` to handle deduplication and merge operations. - Temporarily disabled reads in `BulkMemtable` with `EmptyIter` as a placeholder iterator. --- src/mito2/src/memtable.rs | 15 +++++++++ src/mito2/src/memtable/bulk.rs | 61 ++++++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 7c6e51509b..a223e06178 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::fmt; +use std::str::FromStr; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -29,6 +30,7 @@ use table::predicate::Predicate; use crate::config::MitoConfig; use crate::error::Result; use crate::flush::WriteBufferManagerRef; +use crate::memtable::bulk::BulkMemtableBuilder; use crate::memtable::key_values::KeyValue; pub use crate::memtable::key_values::KeyValues; use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; @@ -290,6 +292,19 @@ impl MemtableBuilderProvider { dedup: bool, merge_mode: MergeMode, ) -> MemtableBuilderRef { + // todo(hl): make it an option + if std::env::var("enable_bulk_memtable") + .ok() + .and_then(|v| bool::from_str(&v).ok()) + .unwrap_or(false) + { + return Arc::new(BulkMemtableBuilder::new( + self.write_buffer_manager.clone(), + dedup, + merge_mode, + )); + } + match options { Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( self.write_buffer_manager.clone(), diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 6e748739d3..482bc9e503 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -26,9 +26,11 @@ use crate::flush::WriteBufferManagerRef; use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, - MemtableStats, + AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, + MemtableRanges, MemtableRef, MemtableStats, }; +use crate::read::Batch; +use crate::region::options::MergeMode; #[allow(unused)] mod context; @@ -37,6 +39,39 @@ pub(crate) mod part; mod part_reader; mod row_group_reader; +#[derive(Debug)] +pub struct BulkMemtableBuilder { + write_buffer_manager: Option, + dedup: bool, + merge_mode: MergeMode, +} + +impl MemtableBuilder for BulkMemtableBuilder { + fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(BulkMemtable::new( + metadata.clone(), + id, + self.write_buffer_manager.clone(), + self.dedup, + self.merge_mode, + )) + } +} + +impl BulkMemtableBuilder { + pub fn new( + write_buffer_manager: Option, + dedup: bool, + merge_mode: MergeMode, + ) -> Self { + Self { + write_buffer_manager, + dedup, + merge_mode, + } + } +} + #[derive(Debug)] pub struct BulkMemtable { id: MemtableId, @@ -47,6 +82,8 @@ pub struct BulkMemtable { min_timestamp: AtomicI64, max_sequence: AtomicU64, num_rows: AtomicUsize, + dedup: bool, + merge_mode: MergeMode, } impl BulkMemtable { @@ -54,6 +91,8 @@ impl BulkMemtable { region_metadata: RegionMetadataRef, id: MemtableId, write_buffer_manager: Option, + dedup: bool, + merge_mode: MergeMode, ) -> Self { Self { id, @@ -64,10 +103,22 @@ impl BulkMemtable { min_timestamp: AtomicI64::new(i64::MAX), max_sequence: Default::default(), num_rows: Default::default(), + dedup, + merge_mode, } } } +struct EmptyIter; + +impl Iterator for EmptyIter { + type Item = Result; + + fn next(&mut self) -> Option { + None + } +} + impl Memtable for BulkMemtable { fn id(&self) -> MemtableId { self.id @@ -93,7 +144,9 @@ impl Memtable for BulkMemtable { _predicate: Option, _sequence: Option, ) -> Result { - todo!() + //todo(hl): temporarily disable reads. + //todo(hl): we should also consider dedup and merge mode when reading bulk parts, + Ok(Box::new(EmptyIter)) } fn ranges( @@ -151,6 +204,8 @@ impl Memtable for BulkMemtable { metadata.clone(), id, self.alloc_tracker.write_buffer_manager(), + self.dedup, + self.merge_mode, )) } }