poc-write-path:

**feat(memtable): Add BulkMemtableBuilder and BulkMemtable**

 - Introduced `BulkMemtableBuilder` and `BulkMemtable` in `memtable.rs` and `bulk.rs` to support bulk operations.
 - Added environment variable check for `enable_bulk_memtable` to conditionally use `BulkMemtableBuilder`.
 - Implemented `MemtableBuilder` for `BulkMemtableBuilder` and `Memtable` for `BulkMemtable`.
 - Included new fields `dedup` and `merge_mode` in `BulkMemtable` to handle deduplication and merge operations.
 - Temporarily disabled reads in `BulkMemtable` with `EmptyIter` as a placeholder iterator.
This commit is contained in:
Lei, HUANG
2025-02-06 07:36:37 +00:00
parent 5ad1436a8f
commit 65a88a63db
2 changed files with 73 additions and 3 deletions

View File

@@ -16,6 +16,7 @@
use std::collections::BTreeMap;
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
@@ -29,6 +30,7 @@ use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::BulkMemtableBuilder;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
@@ -290,6 +292,19 @@ impl MemtableBuilderProvider {
dedup: bool,
merge_mode: MergeMode,
) -> MemtableBuilderRef {
// todo(hl): make it an option
if std::env::var("enable_bulk_memtable")
.ok()
.and_then(|v| bool::from_str(&v).ok())
.unwrap_or(false)
{
return Arc::new(BulkMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
merge_mode,
));
}
match options {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),

View File

@@ -26,9 +26,11 @@ use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
MemtableStats,
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRanges, MemtableRef, MemtableStats,
};
use crate::read::Batch;
use crate::region::options::MergeMode;
#[allow(unused)]
mod context;
@@ -37,6 +39,39 @@ pub(crate) mod part;
mod part_reader;
mod row_group_reader;
#[derive(Debug)]
pub struct BulkMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
}
impl MemtableBuilder for BulkMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(BulkMemtable::new(
metadata.clone(),
id,
self.write_buffer_manager.clone(),
self.dedup,
self.merge_mode,
))
}
}
impl BulkMemtableBuilder {
pub fn new(
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
) -> Self {
Self {
write_buffer_manager,
dedup,
merge_mode,
}
}
}
#[derive(Debug)]
pub struct BulkMemtable {
id: MemtableId,
@@ -47,6 +82,8 @@ pub struct BulkMemtable {
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
num_rows: AtomicUsize,
dedup: bool,
merge_mode: MergeMode,
}
impl BulkMemtable {
@@ -54,6 +91,8 @@ impl BulkMemtable {
region_metadata: RegionMetadataRef,
id: MemtableId,
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
) -> Self {
Self {
id,
@@ -64,10 +103,22 @@ impl BulkMemtable {
min_timestamp: AtomicI64::new(i64::MAX),
max_sequence: Default::default(),
num_rows: Default::default(),
dedup,
merge_mode,
}
}
}
struct EmptyIter;
impl Iterator for EmptyIter {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
impl Memtable for BulkMemtable {
fn id(&self) -> MemtableId {
self.id
@@ -93,7 +144,9 @@ impl Memtable for BulkMemtable {
_predicate: Option<Predicate>,
_sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> {
todo!()
//todo(hl): temporarily disable reads.
//todo(hl): we should also consider dedup and merge mode when reading bulk parts,
Ok(Box::new(EmptyIter))
}
fn ranges(
@@ -151,6 +204,8 @@ impl Memtable for BulkMemtable {
metadata.clone(),
id,
self.alloc_tracker.write_buffer_manager(),
self.dedup,
self.merge_mode,
))
}
}