mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, LazyLock, Mutex, RwLock};
27use std::time::Instant;
28
29/// Reads an environment variable as usize, returning default if not set or invalid.
30fn env_usize(name: &str, default: usize) -> usize {
31    std::env::var(name)
32        .ok()
33        .and_then(|v| v.parse().ok())
34        .unwrap_or(default)
35}
36
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
42use tokio::sync::Semaphore;
43
44use crate::error::{Result, UnsupportedOperationSnafu};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::bulk::context::BulkIterContext;
47use crate::memtable::bulk::part::{
48    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
49};
50use crate::memtable::bulk::part_reader::BulkPartBatchIter;
51use crate::memtable::stats::WriteMetrics;
52use crate::memtable::{
53    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
54    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
55    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
56};
57use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
58use crate::read::flat_merge::FlatMergeIterator;
59use crate::region::options::MergeMode;
60use crate::sst::parquet::flat_format::field_column_start;
61use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
62use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
63
64/// Default merge threshold for triggering compaction.
65const DEFAULT_MERGE_THRESHOLD: usize = 16;
66
67/// Threshold for triggering merge of parts. Configurable via `GREPTIME_BULK_MERGE_THRESHOLD`.
68static MERGE_THRESHOLD: LazyLock<usize> =
69    LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
70
71/// Default maximum number of groups for parallel merging.
72const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
73
74/// Maximum merge groups. Configurable via `GREPTIME_BULK_MAX_MERGE_GROUPS`.
75static MAX_MERGE_GROUPS: LazyLock<usize> =
76    LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
77
78/// Row threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_ROW_THRESHOLD`.
79/// When estimated rows exceed this threshold, parts are encoded as EncodedBulkPart.
80pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
81    env_usize(
82        "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
83        10 * DEFAULT_ROW_GROUP_SIZE,
84    )
85});
86
87/// Default bytes threshold for encoding.
88const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
89
90/// Bytes threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_BYTES_THRESHOLD`.
91/// When estimated bytes exceed this threshold, parts are encoded as EncodedBulkPart.
92static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
93    env_usize(
94        "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
95        DEFAULT_ENCODE_BYTES_THRESHOLD,
96    )
97});
98
99/// Configuration for bulk memtable.
100#[derive(Debug, Clone)]
101pub struct BulkMemtableConfig {
102    /// Threshold for triggering merge of parts.
103    pub merge_threshold: usize,
104    /// Row threshold for encoding parts.
105    pub encode_row_threshold: usize,
106    /// Bytes threshold for encoding parts.
107    pub encode_bytes_threshold: usize,
108    /// Maximum number of groups for parallel merging.
109    pub max_merge_groups: usize,
110}
111
112impl Default for BulkMemtableConfig {
113    fn default() -> Self {
114        Self {
115            merge_threshold: *MERGE_THRESHOLD,
116            encode_row_threshold: *ENCODE_ROW_THRESHOLD,
117            encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
118            max_merge_groups: *MAX_MERGE_GROUPS,
119        }
120    }
121}
122
123/// Result of merging parts - either a MultiBulkPart or an EncodedBulkPart
124enum MergedPart {
125    /// Merged part stored as MultiBulkPart (when rows < DEFAULT_ROW_GROUP_SIZE)
126    Multi(MultiBulkPart),
127    /// Merged part stored as EncodedBulkPart (when rows >= DEFAULT_ROW_GROUP_SIZE)
128    Encoded(EncodedBulkPart),
129}
130
131/// Result of collecting parts to merge
132struct CollectedParts {
133    /// Groups of parts ready for merging (each group has up to 16 parts)
134    groups: Vec<Vec<PartToMerge>>,
135}
136
137/// All parts in a bulk memtable.
138#[derive(Default)]
139struct BulkParts {
140    /// Unordered small parts (< 1024 rows).
141    unordered_part: UnorderedPart,
142    /// All parts (raw and encoded).
143    parts: Vec<BulkPartWrapper>,
144}
145
146impl BulkParts {
147    /// Total number of parts (including unordered).
148    fn num_parts(&self) -> usize {
149        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
150        self.parts.len() + unordered_count
151    }
152
153    /// Returns true if there is no part.
154    fn is_empty(&self) -> bool {
155        self.unordered_part.is_empty() && self.parts.is_empty()
156    }
157
158    /// Returns true if bulk parts or encoded parts should be merged.
159    /// Uses short-circuit counting to stop early once threshold is reached.
160    fn should_merge_parts(&self, merge_threshold: usize) -> bool {
161        let mut bulk_count = 0;
162        let mut encoded_count = 0;
163
164        for wrapper in &self.parts {
165            if wrapper.merging {
166                continue;
167            }
168
169            if wrapper.part.is_encoded() {
170                encoded_count += 1;
171            } else {
172                bulk_count += 1;
173            }
174
175            // Short-circuit: stop counting if either threshold is reached
176            if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
177                return true;
178            }
179        }
180
181        false
182    }
183
184    /// Returns true if the unordered_part should be compacted into a BulkPart.
185    fn should_compact_unordered_part(&self) -> bool {
186        self.unordered_part.should_compact()
187    }
188
189    /// Collects unmerged parts and marks them as being merged.
190    /// Only collects parts of types that meet the threshold.
191    /// Parts are pre-grouped into chunks for parallel processing.
192    fn collect_parts_to_merge(
193        &mut self,
194        merge_threshold: usize,
195        max_merge_groups: usize,
196    ) -> CollectedParts {
197        // First pass: collect indices and row counts for each type
198        let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
199        let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
200
201        for (idx, wrapper) in self.parts.iter().enumerate() {
202            if wrapper.merging {
203                continue;
204            }
205            let num_rows = wrapper.part.num_rows();
206            if wrapper.part.is_encoded() {
207                encoded_indices.push((idx, num_rows));
208            } else {
209                bulk_indices.push((idx, num_rows));
210            }
211        }
212
213        let mut groups = Vec::new();
214
215        // Process bulk parts if threshold met
216        if bulk_indices.len() >= merge_threshold {
217            groups.extend(self.collect_and_group_parts(
218                bulk_indices,
219                merge_threshold,
220                max_merge_groups,
221            ));
222        }
223
224        // Process encoded parts if threshold met
225        if encoded_indices.len() >= merge_threshold {
226            groups.extend(self.collect_and_group_parts(
227                encoded_indices,
228                merge_threshold,
229                max_merge_groups,
230            ));
231        }
232
233        CollectedParts { groups }
234    }
235
236    /// Sorts indices by row count, groups into chunks, marks as merging, and returns groups.
237    fn collect_and_group_parts(
238        &mut self,
239        mut indices: Vec<(usize, usize)>,
240        merge_threshold: usize,
241        max_merge_groups: usize,
242    ) -> Vec<Vec<PartToMerge>> {
243        if indices.is_empty() {
244            return Vec::new();
245        }
246
247        // Sort by row count for better grouping
248        indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
249
250        // Group into chunks of merge_threshold size, limit to max_merge_groups
251        indices
252            .chunks(merge_threshold)
253            .take(max_merge_groups)
254            .map(|chunk| {
255                chunk
256                    .iter()
257                    .map(|(idx, _)| {
258                        let wrapper = &mut self.parts[*idx];
259                        wrapper.merging = true;
260                        wrapper.part.clone()
261                    })
262                    .collect()
263            })
264            .collect()
265    }
266
267    /// Installs merged parts and removes the original parts by file ids.
268    /// Returns the total number of rows in the merged parts.
269    fn install_merged_parts<I>(
270        &mut self,
271        merged_parts: I,
272        merged_file_ids: &HashSet<FileId>,
273    ) -> usize
274    where
275        I: IntoIterator<Item = MergedPart>,
276    {
277        let mut total_output_rows = 0;
278
279        for merged_part in merged_parts {
280            match merged_part {
281                MergedPart::Encoded(encoded_part) => {
282                    total_output_rows += encoded_part.metadata().num_rows;
283                    self.parts.push(BulkPartWrapper {
284                        part: PartToMerge::Encoded {
285                            part: encoded_part,
286                            file_id: FileId::random(),
287                        },
288                        merging: false,
289                    });
290                }
291                MergedPart::Multi(multi_part) => {
292                    total_output_rows += multi_part.num_rows();
293                    self.parts.push(BulkPartWrapper {
294                        part: PartToMerge::Multi {
295                            part: multi_part,
296                            file_id: FileId::random(),
297                        },
298                        merging: false,
299                    });
300                }
301            }
302        }
303
304        self.parts
305            .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
306
307        total_output_rows
308    }
309
310    /// Resets merging flag for parts with the given file ids.
311    /// Used when merging fails or is cancelled.
312    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
313        for wrapper in &mut self.parts {
314            if file_ids.contains(&wrapper.file_id()) {
315                wrapper.merging = false;
316            }
317        }
318    }
319}
320
321/// RAII guard for managing merging flags.
322/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
323struct MergingFlagsGuard<'a> {
324    bulk_parts: &'a RwLock<BulkParts>,
325    file_ids: &'a HashSet<FileId>,
326    success: bool,
327}
328
329impl<'a> MergingFlagsGuard<'a> {
330    /// Creates a new guard for the given file ids.
331    fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
332        Self {
333            bulk_parts,
334            file_ids,
335            success: false,
336        }
337    }
338
339    /// Marks the merge operation as successful.
340    /// When this is called, the guard will not reset the flags on drop.
341    fn mark_success(&mut self) {
342        self.success = true;
343    }
344}
345
346impl<'a> Drop for MergingFlagsGuard<'a> {
347    fn drop(&mut self) {
348        if !self.success
349            && let Ok(mut parts) = self.bulk_parts.write()
350        {
351            parts.reset_merging_flags(self.file_ids);
352        }
353    }
354}
355
356/// Memtable that ingests and scans parts directly.
357pub struct BulkMemtable {
358    id: MemtableId,
359    /// Configuration for the bulk memtable.
360    config: BulkMemtableConfig,
361    parts: Arc<RwLock<BulkParts>>,
362    metadata: RegionMetadataRef,
363    alloc_tracker: AllocTracker,
364    max_timestamp: AtomicI64,
365    min_timestamp: AtomicI64,
366    max_sequence: AtomicU64,
367    num_rows: AtomicUsize,
368    /// Cached flat SST arrow schema for memtable compaction.
369    flat_arrow_schema: SchemaRef,
370    /// Compactor for merging bulk parts
371    compactor: Arc<Mutex<MemtableCompactor>>,
372    /// Dispatcher for scheduling compaction tasks
373    compact_dispatcher: Option<Arc<CompactDispatcher>>,
374    /// Whether the append mode is enabled
375    append_mode: bool,
376    /// Mode to handle duplicate rows while merging
377    merge_mode: MergeMode,
378}
379
380impl std::fmt::Debug for BulkMemtable {
381    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382        f.debug_struct("BulkMemtable")
383            .field("id", &self.id)
384            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
385            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
386            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
387            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
388            .finish()
389    }
390}
391
392impl Memtable for BulkMemtable {
393    fn id(&self) -> MemtableId {
394        self.id
395    }
396
397    fn write(&self, _kvs: &KeyValues) -> Result<()> {
398        UnsupportedOperationSnafu {
399            err_msg: "write() is not supported for bulk memtable",
400        }
401        .fail()
402    }
403
404    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
405        UnsupportedOperationSnafu {
406            err_msg: "write_one() is not supported for bulk memtable",
407        }
408        .fail()
409    }
410
411    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
412        let local_metrics = WriteMetrics {
413            key_bytes: 0,
414            value_bytes: fragment.estimated_size(),
415            min_ts: fragment.min_timestamp,
416            max_ts: fragment.max_timestamp,
417            num_rows: fragment.num_rows(),
418            max_sequence: fragment.sequence,
419        };
420
421        {
422            let mut bulk_parts = self.parts.write().unwrap();
423
424            // Routes small parts to unordered_part based on threshold
425            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
426                bulk_parts.unordered_part.push(fragment);
427
428                // Compacts unordered_part if threshold is reached
429                if bulk_parts.should_compact_unordered_part()
430                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
431                {
432                    bulk_parts.parts.push(BulkPartWrapper {
433                        part: PartToMerge::Bulk {
434                            part: bulk_part,
435                            file_id: FileId::random(),
436                        },
437                        merging: false,
438                    });
439                    bulk_parts.unordered_part.clear();
440                }
441            } else {
442                bulk_parts.parts.push(BulkPartWrapper {
443                    part: PartToMerge::Bulk {
444                        part: fragment,
445                        file_id: FileId::random(),
446                    },
447                    merging: false,
448                });
449            }
450
451            // Since this operation should be fast, we do it in parts lock scope.
452            // This ensure the statistics in `ranges()` are correct. What's more,
453            // it guarantees no rows are out of the time range so we don't need to
454            // prune rows by time range again in the iterator of the MemtableRange.
455            self.update_stats(local_metrics);
456        }
457
458        if self.should_compact() {
459            self.schedule_compact();
460        }
461
462        Ok(())
463    }
464
465    fn ranges(
466        &self,
467        projection: Option<&[ColumnId]>,
468        options: RangesOptions,
469    ) -> Result<MemtableRanges> {
470        let predicate = options.predicate;
471        let sequence = options.sequence;
472        let mut ranges = BTreeMap::new();
473        let mut range_id = 0;
474
475        // TODO(yingwen): Filter ranges by sequence.
476        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
477            self.metadata.clone(),
478            projection,
479            predicate.predicate().cloned(),
480            options.for_flush,
481            options.pre_filter_mode,
482        )?);
483
484        // Adds ranges for regular parts and encoded parts
485        {
486            let bulk_parts = self.parts.read().unwrap();
487
488            // Adds range for unordered part if not empty
489            if !bulk_parts.unordered_part.is_empty()
490                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
491            {
492                let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
493                let range = MemtableRange::new(
494                    Arc::new(MemtableRangeContext::new(
495                        self.id,
496                        Box::new(BulkRangeIterBuilder {
497                            part: unordered_bulk_part,
498                            context: context.clone(),
499                            sequence,
500                        }),
501                        predicate.clone(),
502                    )),
503                    part_stats,
504                );
505                ranges.insert(range_id, range);
506                range_id += 1;
507            }
508
509            // Adds ranges for all parts
510            for part_wrapper in bulk_parts.parts.iter() {
511                // Skips empty parts
512                if part_wrapper.part.num_rows() == 0 {
513                    continue;
514                }
515
516                let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
517                let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
518                    PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
519                        part: part.clone(),
520                        context: context.clone(),
521                        sequence,
522                    }),
523                    PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
524                        part: part.clone(),
525                        context: context.clone(),
526                        sequence,
527                    }),
528                    PartToMerge::Encoded { part, file_id } => {
529                        Box::new(EncodedBulkRangeIterBuilder {
530                            file_id: *file_id,
531                            part: part.clone(),
532                            context: context.clone(),
533                            sequence,
534                        })
535                    }
536                };
537
538                let range = MemtableRange::new(
539                    Arc::new(MemtableRangeContext::new(
540                        self.id,
541                        iter_builder,
542                        predicate.clone(),
543                    )),
544                    part_stats,
545                );
546                ranges.insert(range_id, range);
547                range_id += 1;
548            }
549        }
550
551        Ok(MemtableRanges { ranges })
552    }
553
554    fn is_empty(&self) -> bool {
555        let bulk_parts = self.parts.read().unwrap();
556        bulk_parts.is_empty()
557    }
558
559    fn freeze(&self) -> Result<()> {
560        self.alloc_tracker.done_allocating();
561        Ok(())
562    }
563
564    fn stats(&self) -> MemtableStats {
565        let estimated_bytes = self.alloc_tracker.bytes_allocated();
566
567        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
568            return MemtableStats {
569                estimated_bytes,
570                time_range: None,
571                num_rows: 0,
572                num_ranges: 0,
573                max_sequence: 0,
574                series_count: 0,
575            };
576        }
577
578        let ts_type = self
579            .metadata
580            .time_index_column()
581            .column_schema
582            .data_type
583            .clone()
584            .as_timestamp()
585            .expect("Timestamp column must have timestamp type");
586        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
587        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
588
589        let num_ranges = self.parts.read().unwrap().num_parts();
590
591        MemtableStats {
592            estimated_bytes,
593            time_range: Some((min_timestamp, max_timestamp)),
594            num_rows: self.num_rows.load(Ordering::Relaxed),
595            num_ranges,
596            max_sequence: self.max_sequence.load(Ordering::Relaxed),
597            series_count: self.estimated_series_count(),
598        }
599    }
600
601    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
602        // Computes the new flat schema based on the new metadata.
603        let flat_arrow_schema = to_flat_sst_arrow_schema(
604            metadata,
605            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
606        );
607
608        Arc::new(Self {
609            id,
610            config: self.config.clone(),
611            parts: Arc::new(RwLock::new(BulkParts::default())),
612            metadata: metadata.clone(),
613            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
614            max_timestamp: AtomicI64::new(i64::MIN),
615            min_timestamp: AtomicI64::new(i64::MAX),
616            max_sequence: AtomicU64::new(0),
617            num_rows: AtomicUsize::new(0),
618            flat_arrow_schema,
619            compactor: Arc::new(Mutex::new(MemtableCompactor::new(
620                metadata.region_id,
621                id,
622                self.config.clone(),
623            ))),
624            compact_dispatcher: self.compact_dispatcher.clone(),
625            append_mode: self.append_mode,
626            merge_mode: self.merge_mode,
627        })
628    }
629
630    fn compact(&self, for_flush: bool) -> Result<()> {
631        let mut compactor = self.compactor.lock().unwrap();
632
633        if for_flush {
634            return Ok(());
635        }
636
637        // Unified merge for all parts
638        let should_merge = self
639            .parts
640            .read()
641            .unwrap()
642            .should_merge_parts(self.config.merge_threshold);
643        if should_merge {
644            compactor.merge_parts(
645                &self.flat_arrow_schema,
646                &self.parts,
647                &self.metadata,
648                !self.append_mode,
649                self.merge_mode,
650            )?;
651        }
652
653        Ok(())
654    }
655}
656
657impl BulkMemtable {
658    /// Creates a new BulkMemtable
659    pub fn new(
660        id: MemtableId,
661        config: BulkMemtableConfig,
662        metadata: RegionMetadataRef,
663        write_buffer_manager: Option<WriteBufferManagerRef>,
664        compact_dispatcher: Option<Arc<CompactDispatcher>>,
665        append_mode: bool,
666        merge_mode: MergeMode,
667    ) -> Self {
668        let flat_arrow_schema = to_flat_sst_arrow_schema(
669            &metadata,
670            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
671        );
672
673        let region_id = metadata.region_id;
674        Self {
675            id,
676            config: config.clone(),
677            parts: Arc::new(RwLock::new(BulkParts::default())),
678            metadata,
679            alloc_tracker: AllocTracker::new(write_buffer_manager),
680            max_timestamp: AtomicI64::new(i64::MIN),
681            min_timestamp: AtomicI64::new(i64::MAX),
682            max_sequence: AtomicU64::new(0),
683            num_rows: AtomicUsize::new(0),
684            flat_arrow_schema,
685            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
686            compact_dispatcher,
687            append_mode,
688            merge_mode,
689        }
690    }
691
692    /// Sets the unordered part threshold (for testing).
693    #[cfg(test)]
694    pub fn set_unordered_part_threshold(&self, threshold: usize) {
695        self.parts
696            .write()
697            .unwrap()
698            .unordered_part
699            .set_threshold(threshold);
700    }
701
702    /// Sets the unordered part compact threshold (for testing).
703    #[cfg(test)]
704    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
705        self.parts
706            .write()
707            .unwrap()
708            .unordered_part
709            .set_compact_threshold(compact_threshold);
710    }
711
712    /// Updates memtable stats.
713    ///
714    /// Please update this inside the write lock scope.
715    fn update_stats(&self, stats: WriteMetrics) {
716        self.alloc_tracker
717            .on_allocation(stats.key_bytes + stats.value_bytes);
718
719        self.max_timestamp
720            .fetch_max(stats.max_ts, Ordering::Relaxed);
721        self.min_timestamp
722            .fetch_min(stats.min_ts, Ordering::Relaxed);
723        self.max_sequence
724            .fetch_max(stats.max_sequence, Ordering::Relaxed);
725        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
726    }
727
728    /// Returns the estimated time series count.
729    fn estimated_series_count(&self) -> usize {
730        let bulk_parts = self.parts.read().unwrap();
731        bulk_parts
732            .parts
733            .iter()
734            .map(|part_wrapper| part_wrapper.part.series_count())
735            .sum()
736    }
737
738    /// Returns whether the memtable should be compacted.
739    fn should_compact(&self) -> bool {
740        let parts = self.parts.read().unwrap();
741        parts.should_merge_parts(self.config.merge_threshold)
742    }
743
744    /// Schedules a compaction task using the CompactDispatcher.
745    fn schedule_compact(&self) {
746        if let Some(dispatcher) = &self.compact_dispatcher {
747            let task = MemCompactTask {
748                metadata: self.metadata.clone(),
749                parts: self.parts.clone(),
750                config: self.config.clone(),
751                flat_arrow_schema: self.flat_arrow_schema.clone(),
752                compactor: self.compactor.clone(),
753                append_mode: self.append_mode,
754                merge_mode: self.merge_mode,
755            };
756
757            dispatcher.dispatch_compact(task);
758        } else {
759            // Uses synchronous compaction if no dispatcher is available.
760            if let Err(e) = self.compact(false) {
761                common_telemetry::error!(e; "Failed to compact table");
762            }
763        }
764    }
765}
766
767/// Iterator builder for bulk range
768pub struct BulkRangeIterBuilder {
769    pub part: BulkPart,
770    pub context: Arc<BulkIterContext>,
771    pub sequence: Option<SequenceRange>,
772}
773
774/// Iterator builder for multi bulk range
775struct MultiBulkRangeIterBuilder {
776    part: MultiBulkPart,
777    context: Arc<BulkIterContext>,
778    sequence: Option<SequenceRange>,
779}
780
781impl IterBuilder for BulkRangeIterBuilder {
782    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
783        UnsupportedOperationSnafu {
784            err_msg: "BatchIterator is not supported for bulk memtable",
785        }
786        .fail()
787    }
788
789    fn is_record_batch(&self) -> bool {
790        true
791    }
792
793    fn build_record_batch(
794        &self,
795        metrics: Option<MemScanMetrics>,
796    ) -> Result<BoxedRecordBatchIterator> {
797        let series_count = self.part.estimated_series_count();
798        let iter = BulkPartBatchIter::from_single(
799            self.part.batch.clone(),
800            self.context.clone(),
801            self.sequence,
802            series_count,
803            metrics,
804        );
805
806        Ok(Box::new(iter))
807    }
808
809    fn encoded_range(&self) -> Option<EncodedRange> {
810        None
811    }
812}
813
814impl IterBuilder for MultiBulkRangeIterBuilder {
815    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
816        UnsupportedOperationSnafu {
817            err_msg: "BatchIterator is not supported for multi bulk memtable",
818        }
819        .fail()
820    }
821
822    fn is_record_batch(&self) -> bool {
823        true
824    }
825
826    fn build_record_batch(
827        &self,
828        metrics: Option<MemScanMetrics>,
829    ) -> Result<BoxedRecordBatchIterator> {
830        self.part
831            .read(self.context.clone(), self.sequence, metrics)?
832            .ok_or_else(|| {
833                UnsupportedOperationSnafu {
834                    err_msg: "Failed to create iterator for multi bulk part",
835                }
836                .build()
837            })
838    }
839
840    fn encoded_range(&self) -> Option<EncodedRange> {
841        None
842    }
843}
844
845/// Iterator builder for encoded bulk range
846struct EncodedBulkRangeIterBuilder {
847    file_id: FileId,
848    part: EncodedBulkPart,
849    context: Arc<BulkIterContext>,
850    sequence: Option<SequenceRange>,
851}
852
853impl IterBuilder for EncodedBulkRangeIterBuilder {
854    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
855        UnsupportedOperationSnafu {
856            err_msg: "BatchIterator is not supported for encoded bulk memtable",
857        }
858        .fail()
859    }
860
861    fn is_record_batch(&self) -> bool {
862        true
863    }
864
865    fn build_record_batch(
866        &self,
867        metrics: Option<MemScanMetrics>,
868    ) -> Result<BoxedRecordBatchIterator> {
869        if let Some(iter) = self
870            .part
871            .read(self.context.clone(), self.sequence, metrics)?
872        {
873            Ok(iter)
874        } else {
875            // Return an empty iterator if no data to read
876            Ok(Box::new(std::iter::empty()))
877        }
878    }
879
880    fn encoded_range(&self) -> Option<EncodedRange> {
881        Some(EncodedRange {
882            data: self.part.data().clone(),
883            sst_info: self.part.to_sst_info(self.file_id),
884        })
885    }
886}
887
888struct BulkPartWrapper {
889    /// The part to store. It already contains the file id.
890    part: PartToMerge,
891    /// Whether this part is currently being merged.
892    merging: bool,
893}
894
895impl BulkPartWrapper {
896    /// Returns the file id of this part.
897    fn file_id(&self) -> FileId {
898        self.part.file_id()
899    }
900}
901
902/// Enum to wrap different types of parts for unified merging.
903#[derive(Clone)]
904enum PartToMerge {
905    /// Raw bulk part.
906    Bulk { part: BulkPart, file_id: FileId },
907    /// Multiple bulk parts.
908    Multi {
909        part: MultiBulkPart,
910        file_id: FileId,
911    },
912    /// Encoded bulk part.
913    Encoded {
914        part: EncodedBulkPart,
915        file_id: FileId,
916    },
917}
918
919impl PartToMerge {
920    /// Gets the file ID of this part.
921    fn file_id(&self) -> FileId {
922        match self {
923            PartToMerge::Bulk { file_id, .. } => *file_id,
924            PartToMerge::Multi { file_id, .. } => *file_id,
925            PartToMerge::Encoded { file_id, .. } => *file_id,
926        }
927    }
928
929    /// Gets the minimum timestamp of this part.
930    fn min_timestamp(&self) -> i64 {
931        match self {
932            PartToMerge::Bulk { part, .. } => part.min_timestamp,
933            PartToMerge::Multi { part, .. } => part.min_timestamp(),
934            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
935        }
936    }
937
938    /// Gets the maximum timestamp of this part.
939    fn max_timestamp(&self) -> i64 {
940        match self {
941            PartToMerge::Bulk { part, .. } => part.max_timestamp,
942            PartToMerge::Multi { part, .. } => part.max_timestamp(),
943            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
944        }
945    }
946
947    /// Gets the number of rows in this part.
948    fn num_rows(&self) -> usize {
949        match self {
950            PartToMerge::Bulk { part, .. } => part.num_rows(),
951            PartToMerge::Multi { part, .. } => part.num_rows(),
952            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
953        }
954    }
955
956    /// Gets the maximum sequence number of this part.
957    fn max_sequence(&self) -> u64 {
958        match self {
959            PartToMerge::Bulk { part, .. } => part.sequence,
960            PartToMerge::Multi { part, .. } => part.max_sequence(),
961            PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
962        }
963    }
964
965    /// Gets the estimated series count in this part.
966    fn series_count(&self) -> usize {
967        match self {
968            PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
969            PartToMerge::Multi { part, .. } => part.series_count(),
970            PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
971        }
972    }
973
974    /// Returns true if this is an encoded part.
975    fn is_encoded(&self) -> bool {
976        matches!(self, PartToMerge::Encoded { .. })
977    }
978
979    /// Gets the estimated size in bytes of this part.
980    fn estimated_size(&self) -> usize {
981        match self {
982            PartToMerge::Bulk { part, .. } => part.estimated_size(),
983            PartToMerge::Multi { part, .. } => part.estimated_size(),
984            PartToMerge::Encoded { part, .. } => part.size_bytes(),
985        }
986    }
987
988    /// Converts this part to `MemtableStats`.
989    fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
990        match self {
991            PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
992            PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
993            PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
994        }
995    }
996
997    /// Creates a record batch iterator for this part.
998    fn create_iterator(
999        self,
1000        context: Arc<BulkIterContext>,
1001    ) -> Result<Option<BoxedRecordBatchIterator>> {
1002        match self {
1003            PartToMerge::Bulk { part, .. } => {
1004                let series_count = part.estimated_series_count();
1005                let iter = BulkPartBatchIter::from_single(
1006                    part.batch,
1007                    context,
1008                    None, // No sequence filter for merging
1009                    series_count,
1010                    None, // No metrics for merging
1011                );
1012                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1013            }
1014            PartToMerge::Multi { part, .. } => part.read(context, None, None),
1015            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1016        }
1017    }
1018}
1019
1020struct MemtableCompactor {
1021    region_id: RegionId,
1022    memtable_id: MemtableId,
1023    /// Configuration for the bulk memtable.
1024    config: BulkMemtableConfig,
1025}
1026
1027impl MemtableCompactor {
1028    /// Creates a new MemtableCompactor.
1029    fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1030        Self {
1031            region_id,
1032            memtable_id,
1033            config,
1034        }
1035    }
1036
1037    /// Merges parts (bulk and encoded) and then encodes the result.
1038    fn merge_parts(
1039        &mut self,
1040        arrow_schema: &SchemaRef,
1041        bulk_parts: &RwLock<BulkParts>,
1042        metadata: &RegionMetadataRef,
1043        dedup: bool,
1044        merge_mode: MergeMode,
1045    ) -> Result<()> {
1046        let start = Instant::now();
1047
1048        // Collect pre-grouped parts
1049        let collected = bulk_parts
1050            .write()
1051            .unwrap()
1052            .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1053
1054        if collected.groups.is_empty() {
1055            return Ok(());
1056        }
1057
1058        // Collect all file IDs for tracking
1059        let merged_file_ids: HashSet<FileId> = collected
1060            .groups
1061            .iter()
1062            .flatten()
1063            .map(|part| part.file_id())
1064            .collect();
1065        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1066
1067        let num_groups = collected.groups.len();
1068        let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1069
1070        let encode_row_threshold = self.config.encode_row_threshold;
1071        let encode_bytes_threshold = self.config.encode_bytes_threshold;
1072
1073        // Merge all groups in parallel
1074        let merged_parts = collected
1075            .groups
1076            .into_par_iter()
1077            .map(|group| {
1078                Self::merge_parts_group(
1079                    group,
1080                    arrow_schema,
1081                    metadata,
1082                    dedup,
1083                    merge_mode,
1084                    encode_row_threshold,
1085                    encode_bytes_threshold,
1086                )
1087            })
1088            .collect::<Result<Vec<Option<MergedPart>>>>()?;
1089
1090        // Install all merged parts
1091        let total_output_rows = {
1092            let mut parts = bulk_parts.write().unwrap();
1093            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1094        };
1095
1096        guard.mark_success();
1097
1098        common_telemetry::debug!(
1099            "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1100            self.region_id,
1101            self.memtable_id,
1102            num_groups,
1103            num_parts,
1104            total_output_rows,
1105            start.elapsed()
1106        );
1107
1108        Ok(())
1109    }
1110
1111    /// Merges a group of parts into a single part (either MultiBulkPart or EncodedBulkPart).
1112    fn merge_parts_group(
1113        parts_to_merge: Vec<PartToMerge>,
1114        arrow_schema: &SchemaRef,
1115        metadata: &RegionMetadataRef,
1116        dedup: bool,
1117        merge_mode: MergeMode,
1118        encode_row_threshold: usize,
1119        encode_bytes_threshold: usize,
1120    ) -> Result<Option<MergedPart>> {
1121        if parts_to_merge.is_empty() {
1122            return Ok(None);
1123        }
1124
1125        // Calculates timestamp bounds and statistics for merged data
1126        let min_timestamp = parts_to_merge
1127            .iter()
1128            .map(|p| p.min_timestamp())
1129            .min()
1130            .unwrap_or(i64::MAX);
1131        let max_timestamp = parts_to_merge
1132            .iter()
1133            .map(|p| p.max_timestamp())
1134            .max()
1135            .unwrap_or(i64::MIN);
1136        let max_sequence = parts_to_merge
1137            .iter()
1138            .map(|p| p.max_sequence())
1139            .max()
1140            .unwrap_or(0);
1141
1142        // Collects statistics from parts before creating iterators
1143        let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1144        let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1145        let estimated_series_count = parts_to_merge
1146            .iter()
1147            .map(|p| p.series_count())
1148            .max()
1149            .unwrap_or(0);
1150
1151        let context = Arc::new(BulkIterContext::new(
1152            metadata.clone(),
1153            None, // No column projection for merging
1154            None, // No predicate for merging
1155            true,
1156        )?);
1157
1158        // Creates iterators for all parts to merge.
1159        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1160            .into_iter()
1161            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1162            .collect();
1163
1164        if iterators.is_empty() {
1165            return Ok(None);
1166        }
1167
1168        let merged_iter =
1169            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1170
1171        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1172            // Applies deduplication based on merge mode
1173            match merge_mode {
1174                MergeMode::LastRow => {
1175                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1176                    Box::new(dedup_iter)
1177                }
1178                MergeMode::LastNonNull => {
1179                    let field_column_start =
1180                        field_column_start(metadata, arrow_schema.fields().len());
1181
1182                    let dedup_iter = FlatDedupIterator::new(
1183                        merged_iter,
1184                        FlatLastNonNull::new(field_column_start, false),
1185                    );
1186                    Box::new(dedup_iter)
1187                }
1188            }
1189        } else {
1190            Box::new(merged_iter)
1191        };
1192
1193        // Encode as EncodedBulkPart if rows exceed row threshold OR bytes exceed bytes threshold
1194        if estimated_total_rows > encode_row_threshold
1195            || estimated_total_bytes > encode_bytes_threshold
1196        {
1197            let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1198            let mut metrics = BulkPartEncodeMetrics::default();
1199            let encoded_part = encoder.encode_record_batch_iter(
1200                boxed_iter,
1201                arrow_schema.clone(),
1202                min_timestamp,
1203                max_timestamp,
1204                max_sequence,
1205                &mut metrics,
1206            )?;
1207
1208            common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1209
1210            Ok(encoded_part.map(MergedPart::Encoded))
1211        } else {
1212            // Otherwise, collect into MultiBulkPart
1213            let mut batches = Vec::new();
1214            let mut actual_total_rows = 0;
1215
1216            for batch_result in boxed_iter {
1217                let batch = batch_result?;
1218                actual_total_rows += batch.num_rows();
1219                batches.push(batch);
1220            }
1221
1222            if actual_total_rows == 0 {
1223                return Ok(None);
1224            }
1225
1226            let multi_part = MultiBulkPart::new(
1227                batches,
1228                min_timestamp,
1229                max_timestamp,
1230                max_sequence,
1231                estimated_series_count,
1232            );
1233
1234            common_telemetry::trace!(
1235                "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1236                actual_total_rows,
1237                multi_part.num_batches()
1238            );
1239
1240            Ok(Some(MergedPart::Multi(multi_part)))
1241        }
1242    }
1243}
1244
1245/// A memtable compact task to run in background.
1246struct MemCompactTask {
1247    metadata: RegionMetadataRef,
1248    parts: Arc<RwLock<BulkParts>>,
1249    /// Configuration for the bulk memtable.
1250    config: BulkMemtableConfig,
1251    /// Cached flat SST arrow schema
1252    flat_arrow_schema: SchemaRef,
1253    /// Compactor for merging bulk parts
1254    compactor: Arc<Mutex<MemtableCompactor>>,
1255    /// Whether the append mode is enabled
1256    append_mode: bool,
1257    /// Mode to handle duplicate rows while merging
1258    merge_mode: MergeMode,
1259}
1260
1261impl MemCompactTask {
1262    fn compact(&self) -> Result<()> {
1263        let mut compactor = self.compactor.lock().unwrap();
1264
1265        let should_merge = self
1266            .parts
1267            .read()
1268            .unwrap()
1269            .should_merge_parts(self.config.merge_threshold);
1270        if should_merge {
1271            compactor.merge_parts(
1272                &self.flat_arrow_schema,
1273                &self.parts,
1274                &self.metadata,
1275                !self.append_mode,
1276                self.merge_mode,
1277            )?;
1278        }
1279
1280        Ok(())
1281    }
1282}
1283
1284/// Scheduler to run compact tasks in background.
1285#[derive(Debug)]
1286pub struct CompactDispatcher {
1287    semaphore: Arc<Semaphore>,
1288}
1289
1290impl CompactDispatcher {
1291    /// Creates a new dispatcher with the given number of max concurrent tasks.
1292    pub fn new(permits: usize) -> Self {
1293        Self {
1294            semaphore: Arc::new(Semaphore::new(permits)),
1295        }
1296    }
1297
1298    /// Dispatches a compact task to run in background.
1299    fn dispatch_compact(&self, task: MemCompactTask) {
1300        let semaphore = self.semaphore.clone();
1301        common_runtime::spawn_global(async move {
1302            let Ok(_permit) = semaphore.acquire().await else {
1303                return;
1304            };
1305
1306            common_runtime::spawn_blocking_global(move || {
1307                if let Err(e) = task.compact() {
1308                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1309                }
1310            });
1311        });
1312    }
1313}
1314
1315/// Builder to build a [BulkMemtable].
1316#[derive(Debug, Default)]
1317pub struct BulkMemtableBuilder {
1318    /// Configuration for the bulk memtable.
1319    config: BulkMemtableConfig,
1320    write_buffer_manager: Option<WriteBufferManagerRef>,
1321    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1322    append_mode: bool,
1323    merge_mode: MergeMode,
1324}
1325
1326impl BulkMemtableBuilder {
1327    /// Creates a new builder with specific `write_buffer_manager`.
1328    pub fn new(
1329        write_buffer_manager: Option<WriteBufferManagerRef>,
1330        append_mode: bool,
1331        merge_mode: MergeMode,
1332    ) -> Self {
1333        Self {
1334            config: BulkMemtableConfig::default(),
1335            write_buffer_manager,
1336            compact_dispatcher: None,
1337            append_mode,
1338            merge_mode,
1339        }
1340    }
1341
1342    /// Sets the compact dispatcher.
1343    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1344        self.compact_dispatcher = Some(compact_dispatcher);
1345        self
1346    }
1347}
1348
1349impl MemtableBuilder for BulkMemtableBuilder {
1350    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1351        Arc::new(BulkMemtable::new(
1352            id,
1353            self.config.clone(),
1354            metadata.clone(),
1355            self.write_buffer_manager.clone(),
1356            self.compact_dispatcher.clone(),
1357            self.append_mode,
1358            self.merge_mode,
1359        ))
1360    }
1361
1362    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1363        true
1364    }
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369    use mito_codec::row_converter::build_primary_key_codec;
1370
1371    use super::*;
1372    use crate::memtable::bulk::part::BulkPartConverter;
1373    use crate::read::scan_region::PredicateGroup;
1374    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1375    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1376
1377    fn create_bulk_part_with_converter(
1378        k0: &str,
1379        k1: u32,
1380        timestamps: Vec<i64>,
1381        values: Vec<Option<f64>>,
1382        sequence: u64,
1383    ) -> Result<BulkPart> {
1384        let metadata = metadata_for_test();
1385        let capacity = 100;
1386        let primary_key_codec = build_primary_key_codec(&metadata);
1387        let schema = to_flat_sst_arrow_schema(
1388            &metadata,
1389            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1390        );
1391
1392        let mut converter =
1393            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1394
1395        let key_values = build_key_values_with_ts_seq_values(
1396            &metadata,
1397            k0.to_string(),
1398            k1,
1399            timestamps.into_iter(),
1400            values.into_iter(),
1401            sequence,
1402        );
1403
1404        converter.append_key_values(&key_values)?;
1405        converter.convert()
1406    }
1407
1408    #[test]
1409    fn test_bulk_memtable_write_read() {
1410        let metadata = metadata_for_test();
1411        let memtable = BulkMemtable::new(
1412            999,
1413            BulkMemtableConfig::default(),
1414            metadata.clone(),
1415            None,
1416            None,
1417            false,
1418            MergeMode::LastRow,
1419        );
1420        // Disable unordered_part for this test
1421        memtable.set_unordered_part_threshold(0);
1422
1423        let test_data = [
1424            (
1425                "key_a",
1426                1u32,
1427                vec![1000i64, 2000i64],
1428                vec![Some(10.5), Some(20.5)],
1429                100u64,
1430            ),
1431            (
1432                "key_b",
1433                2u32,
1434                vec![1500i64, 2500i64],
1435                vec![Some(15.5), Some(25.5)],
1436                200u64,
1437            ),
1438            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1439        ];
1440
1441        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1442            let part =
1443                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1444                    .unwrap();
1445            memtable.write_bulk(part).unwrap();
1446        }
1447
1448        let stats = memtable.stats();
1449        assert_eq!(5, stats.num_rows);
1450        assert_eq!(3, stats.num_ranges);
1451        assert_eq!(300, stats.max_sequence);
1452
1453        let (min_ts, max_ts) = stats.time_range.unwrap();
1454        assert_eq!(1000, min_ts.value());
1455        assert_eq!(3000, max_ts.value());
1456
1457        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1458        let ranges = memtable
1459            .ranges(
1460                None,
1461                RangesOptions::default().with_predicate(predicate_group),
1462            )
1463            .unwrap();
1464
1465        assert_eq!(3, ranges.ranges.len());
1466        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1467        assert_eq!(5, total_rows);
1468
1469        for (_range_id, range) in ranges.ranges.iter() {
1470            assert!(range.num_rows() > 0);
1471            assert!(range.is_record_batch());
1472
1473            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1474
1475            let mut total_rows = 0;
1476            for batch_result in record_batch_iter {
1477                let batch = batch_result.unwrap();
1478                total_rows += batch.num_rows();
1479                assert!(batch.num_rows() > 0);
1480                assert_eq!(8, batch.num_columns());
1481            }
1482            assert_eq!(total_rows, range.num_rows());
1483        }
1484    }
1485
1486    #[test]
1487    fn test_bulk_memtable_ranges_with_projection() {
1488        let metadata = metadata_for_test();
1489        let memtable = BulkMemtable::new(
1490            111,
1491            BulkMemtableConfig::default(),
1492            metadata.clone(),
1493            None,
1494            None,
1495            false,
1496            MergeMode::LastRow,
1497        );
1498
1499        let bulk_part = create_bulk_part_with_converter(
1500            "projection_test",
1501            5,
1502            vec![5000, 6000, 7000],
1503            vec![Some(50.0), Some(60.0), Some(70.0)],
1504            500,
1505        )
1506        .unwrap();
1507
1508        memtable.write_bulk(bulk_part).unwrap();
1509
1510        let projection = vec![4u32];
1511        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1512        let ranges = memtable
1513            .ranges(
1514                Some(&projection),
1515                RangesOptions::default().with_predicate(predicate_group),
1516            )
1517            .unwrap();
1518
1519        assert_eq!(1, ranges.ranges.len());
1520        let range = ranges.ranges.get(&0).unwrap();
1521
1522        assert!(range.is_record_batch());
1523        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1524
1525        let mut total_rows = 0;
1526        for batch_result in record_batch_iter {
1527            let batch = batch_result.unwrap();
1528            assert!(batch.num_rows() > 0);
1529            assert_eq!(5, batch.num_columns());
1530            total_rows += batch.num_rows();
1531        }
1532        assert_eq!(3, total_rows);
1533    }
1534
1535    #[test]
1536    fn test_bulk_memtable_unsupported_operations() {
1537        let metadata = metadata_for_test();
1538        let memtable = BulkMemtable::new(
1539            111,
1540            BulkMemtableConfig::default(),
1541            metadata.clone(),
1542            None,
1543            None,
1544            false,
1545            MergeMode::LastRow,
1546        );
1547
1548        let key_values = build_key_values_with_ts_seq_values(
1549            &metadata,
1550            "test".to_string(),
1551            1,
1552            vec![1000].into_iter(),
1553            vec![Some(1.0)].into_iter(),
1554            1,
1555        );
1556
1557        let err = memtable.write(&key_values).unwrap_err();
1558        assert!(err.to_string().contains("not supported"));
1559
1560        let kv = key_values.iter().next().unwrap();
1561        let err = memtable.write_one(kv).unwrap_err();
1562        assert!(err.to_string().contains("not supported"));
1563    }
1564
1565    #[test]
1566    fn test_bulk_memtable_freeze() {
1567        let metadata = metadata_for_test();
1568        let memtable = BulkMemtable::new(
1569            222,
1570            BulkMemtableConfig::default(),
1571            metadata.clone(),
1572            None,
1573            None,
1574            false,
1575            MergeMode::LastRow,
1576        );
1577
1578        let bulk_part = create_bulk_part_with_converter(
1579            "freeze_test",
1580            10,
1581            vec![10000],
1582            vec![Some(100.0)],
1583            1000,
1584        )
1585        .unwrap();
1586
1587        memtable.write_bulk(bulk_part).unwrap();
1588        memtable.freeze().unwrap();
1589
1590        let stats_after_freeze = memtable.stats();
1591        assert_eq!(1, stats_after_freeze.num_rows);
1592    }
1593
1594    #[test]
1595    fn test_bulk_memtable_fork() {
1596        let metadata = metadata_for_test();
1597        let original_memtable = BulkMemtable::new(
1598            333,
1599            BulkMemtableConfig::default(),
1600            metadata.clone(),
1601            None,
1602            None,
1603            false,
1604            MergeMode::LastRow,
1605        );
1606
1607        let bulk_part =
1608            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1609                .unwrap();
1610
1611        original_memtable.write_bulk(bulk_part).unwrap();
1612
1613        let forked_memtable = original_memtable.fork(444, &metadata);
1614
1615        assert_eq!(forked_memtable.id(), 444);
1616        assert!(forked_memtable.is_empty());
1617        assert_eq!(0, forked_memtable.stats().num_rows);
1618
1619        assert!(!original_memtable.is_empty());
1620        assert_eq!(1, original_memtable.stats().num_rows);
1621    }
1622
1623    #[test]
1624    fn test_bulk_memtable_ranges_multiple_parts() {
1625        let metadata = metadata_for_test();
1626        let memtable = BulkMemtable::new(
1627            777,
1628            BulkMemtableConfig::default(),
1629            metadata.clone(),
1630            None,
1631            None,
1632            false,
1633            MergeMode::LastRow,
1634        );
1635        // Disable unordered_part for this test
1636        memtable.set_unordered_part_threshold(0);
1637
1638        let parts_data = vec![
1639            (
1640                "part1",
1641                1u32,
1642                vec![1000i64, 1100i64],
1643                vec![Some(10.0), Some(11.0)],
1644                100u64,
1645            ),
1646            (
1647                "part2",
1648                2u32,
1649                vec![2000i64, 2100i64],
1650                vec![Some(20.0), Some(21.0)],
1651                200u64,
1652            ),
1653            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1654        ];
1655
1656        for (k0, k1, timestamps, values, seq) in parts_data {
1657            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1658            memtable.write_bulk(part).unwrap();
1659        }
1660
1661        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1662        let ranges = memtable
1663            .ranges(
1664                None,
1665                RangesOptions::default().with_predicate(predicate_group),
1666            )
1667            .unwrap();
1668
1669        assert_eq!(3, ranges.ranges.len());
1670        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1671        assert_eq!(5, total_rows);
1672        assert_eq!(3, ranges.ranges.len());
1673
1674        for (range_id, range) in ranges.ranges.iter() {
1675            assert!(*range_id < 3);
1676            assert!(range.num_rows() > 0);
1677            assert!(range.is_record_batch());
1678        }
1679    }
1680
1681    #[test]
1682    fn test_bulk_memtable_ranges_with_sequence_filter() {
1683        let metadata = metadata_for_test();
1684        let memtable = BulkMemtable::new(
1685            888,
1686            BulkMemtableConfig::default(),
1687            metadata.clone(),
1688            None,
1689            None,
1690            false,
1691            MergeMode::LastRow,
1692        );
1693
1694        let part = create_bulk_part_with_converter(
1695            "seq_test",
1696            1,
1697            vec![1000, 2000, 3000],
1698            vec![Some(10.0), Some(20.0), Some(30.0)],
1699            500,
1700        )
1701        .unwrap();
1702
1703        memtable.write_bulk(part).unwrap();
1704
1705        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1706        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1707        let ranges = memtable
1708            .ranges(
1709                None,
1710                RangesOptions::default()
1711                    .with_predicate(predicate_group)
1712                    .with_sequence(sequence_filter),
1713            )
1714            .unwrap();
1715
1716        assert_eq!(1, ranges.ranges.len());
1717        let range = ranges.ranges.get(&0).unwrap();
1718
1719        let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1720        assert!(record_batch_iter.next().is_none());
1721    }
1722
1723    #[test]
1724    fn test_bulk_memtable_ranges_with_encoded_parts() {
1725        let metadata = metadata_for_test();
1726        let config = BulkMemtableConfig {
1727            merge_threshold: 8,
1728            ..Default::default()
1729        };
1730        let memtable = BulkMemtable::new(
1731            999,
1732            config,
1733            metadata.clone(),
1734            None,
1735            None,
1736            false,
1737            MergeMode::LastRow,
1738        );
1739        // Disable unordered_part for this test
1740        memtable.set_unordered_part_threshold(0);
1741
1742        // Adds enough bulk parts to trigger encoding
1743        for i in 0..10 {
1744            let part = create_bulk_part_with_converter(
1745                &format!("key_{}", i),
1746                i,
1747                vec![1000 + i as i64 * 100],
1748                vec![Some(i as f64 * 10.0)],
1749                100 + i as u64,
1750            )
1751            .unwrap();
1752            memtable.write_bulk(part).unwrap();
1753        }
1754
1755        memtable.compact(false).unwrap();
1756
1757        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1758        let ranges = memtable
1759            .ranges(
1760                None,
1761                RangesOptions::default().with_predicate(predicate_group),
1762            )
1763            .unwrap();
1764
1765        // Should have ranges for both bulk parts and encoded parts
1766        assert_eq!(3, ranges.ranges.len());
1767        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1768        assert_eq!(10, total_rows);
1769
1770        for (_range_id, range) in ranges.ranges.iter() {
1771            assert!(range.num_rows() > 0);
1772            assert!(range.is_record_batch());
1773
1774            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1775            let mut total_rows = 0;
1776            for batch_result in record_batch_iter {
1777                let batch = batch_result.unwrap();
1778                total_rows += batch.num_rows();
1779                assert!(batch.num_rows() > 0);
1780            }
1781            assert_eq!(total_rows, range.num_rows());
1782        }
1783    }
1784
1785    #[test]
1786    fn test_bulk_memtable_unordered_part() {
1787        let metadata = metadata_for_test();
1788        let memtable = BulkMemtable::new(
1789            1001,
1790            BulkMemtableConfig::default(),
1791            metadata.clone(),
1792            None,
1793            None,
1794            false,
1795            MergeMode::LastRow,
1796        );
1797
1798        // Set smaller thresholds for testing with smaller inputs
1799        // Accept parts with < 5 rows into unordered_part
1800        memtable.set_unordered_part_threshold(5);
1801        // Compact when total rows >= 10
1802        memtable.set_unordered_part_compact_threshold(10);
1803
1804        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
1805        for i in 0..3 {
1806            let part = create_bulk_part_with_converter(
1807                &format!("key_{}", i),
1808                i,
1809                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1810                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1811                100 + i as u64,
1812            )
1813            .unwrap();
1814            assert_eq!(2, part.num_rows());
1815            memtable.write_bulk(part).unwrap();
1816        }
1817
1818        // Total rows = 6, not yet reaching compact threshold
1819        let stats = memtable.stats();
1820        assert_eq!(6, stats.num_rows);
1821
1822        // Write 2 more small parts (each has 2 rows)
1823        // This should trigger compaction when total >= 10
1824        for i in 3..5 {
1825            let part = create_bulk_part_with_converter(
1826                &format!("key_{}", i),
1827                i,
1828                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1829                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1830                100 + i as u64,
1831            )
1832            .unwrap();
1833            memtable.write_bulk(part).unwrap();
1834        }
1835
1836        // Total rows = 10, should have compacted unordered_part into a regular part
1837        let stats = memtable.stats();
1838        assert_eq!(10, stats.num_rows);
1839
1840        // Verify we can read all data correctly
1841        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1842        let ranges = memtable
1843            .ranges(
1844                None,
1845                RangesOptions::default().with_predicate(predicate_group),
1846            )
1847            .unwrap();
1848
1849        // Should have at least 1 range (the compacted part)
1850        assert!(!ranges.ranges.is_empty());
1851        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1852        assert_eq!(10, total_rows);
1853
1854        // Read all data and verify
1855        let mut total_rows_read = 0;
1856        for (_range_id, range) in ranges.ranges.iter() {
1857            assert!(range.is_record_batch());
1858            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1859
1860            for batch_result in record_batch_iter {
1861                let batch = batch_result.unwrap();
1862                total_rows_read += batch.num_rows();
1863            }
1864        }
1865        assert_eq!(10, total_rows_read);
1866    }
1867
1868    #[test]
1869    fn test_bulk_memtable_unordered_part_mixed_sizes() {
1870        let metadata = metadata_for_test();
1871        let memtable = BulkMemtable::new(
1872            1002,
1873            BulkMemtableConfig::default(),
1874            metadata.clone(),
1875            None,
1876            None,
1877            false,
1878            MergeMode::LastRow,
1879        );
1880
1881        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
1882        memtable.set_unordered_part_threshold(4);
1883        memtable.set_unordered_part_compact_threshold(8);
1884
1885        // Write small parts (3 rows each) - should go to unordered_part
1886        for i in 0..2 {
1887            let part = create_bulk_part_with_converter(
1888                &format!("small_{}", i),
1889                i,
1890                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1891                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1892                10 + i as u64,
1893            )
1894            .unwrap();
1895            assert_eq!(3, part.num_rows());
1896            memtable.write_bulk(part).unwrap();
1897        }
1898
1899        // Write a large part (5 rows) - should go directly to regular parts
1900        let large_part = create_bulk_part_with_converter(
1901            "large_key",
1902            100,
1903            vec![5000, 6000, 7000, 8000, 9000],
1904            vec![
1905                Some(100.0),
1906                Some(101.0),
1907                Some(102.0),
1908                Some(103.0),
1909                Some(104.0),
1910            ],
1911            50,
1912        )
1913        .unwrap();
1914        assert_eq!(5, large_part.num_rows());
1915        memtable.write_bulk(large_part).unwrap();
1916
1917        // Write another small part (2 rows) - should trigger compaction of unordered_part
1918        let part = create_bulk_part_with_converter(
1919            "small_2",
1920            2,
1921            vec![4000, 4100],
1922            vec![Some(20.0), Some(21.0)],
1923            30,
1924        )
1925        .unwrap();
1926        memtable.write_bulk(part).unwrap();
1927
1928        let stats = memtable.stats();
1929        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
1930
1931        // Verify all data can be read
1932        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1933        let ranges = memtable
1934            .ranges(
1935                None,
1936                RangesOptions::default().with_predicate(predicate_group),
1937            )
1938            .unwrap();
1939
1940        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1941        assert_eq!(13, total_rows);
1942
1943        let mut total_rows_read = 0;
1944        for (_range_id, range) in ranges.ranges.iter() {
1945            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1946            for batch_result in record_batch_iter {
1947                let batch = batch_result.unwrap();
1948                total_rows_read += batch.num_rows();
1949            }
1950        }
1951        assert_eq!(13, total_rows_read);
1952    }
1953
1954    #[test]
1955    fn test_bulk_memtable_unordered_part_with_ranges() {
1956        let metadata = metadata_for_test();
1957        let memtable = BulkMemtable::new(
1958            1003,
1959            BulkMemtableConfig::default(),
1960            metadata.clone(),
1961            None,
1962            None,
1963            false,
1964            MergeMode::LastRow,
1965        );
1966
1967        // Set small thresholds
1968        memtable.set_unordered_part_threshold(3);
1969        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
1970
1971        // Write several small parts that stay in unordered_part
1972        for i in 0..3 {
1973            let part = create_bulk_part_with_converter(
1974                &format!("key_{}", i),
1975                i,
1976                vec![1000 + i as i64 * 100],
1977                vec![Some(i as f64 * 10.0)],
1978                100 + i as u64,
1979            )
1980            .unwrap();
1981            assert_eq!(1, part.num_rows());
1982            memtable.write_bulk(part).unwrap();
1983        }
1984
1985        let stats = memtable.stats();
1986        assert_eq!(3, stats.num_rows);
1987
1988        // Test that ranges() can correctly read from unordered_part
1989        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1990        let ranges = memtable
1991            .ranges(
1992                None,
1993                RangesOptions::default().with_predicate(predicate_group),
1994            )
1995            .unwrap();
1996
1997        // Should have 1 range for the unordered_part
1998        assert_eq!(1, ranges.ranges.len());
1999        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2000        assert_eq!(3, total_rows);
2001
2002        // Verify data is sorted correctly in the range
2003        let range = ranges.ranges.get(&0).unwrap();
2004        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2005
2006        let mut total_rows = 0;
2007        for batch_result in record_batch_iter {
2008            let batch = batch_result.unwrap();
2009            total_rows += batch.num_rows();
2010            // Verify data is properly sorted by primary key
2011            assert!(batch.num_rows() > 0);
2012        }
2013        assert_eq!(3, total_rows);
2014    }
2015
2016    /// Helper to create a BulkPartWrapper from a BulkPart.
2017    fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2018        BulkPartWrapper {
2019            part: PartToMerge::Bulk {
2020                part,
2021                file_id: FileId::random(),
2022            },
2023            merging: false,
2024        }
2025    }
2026
2027    #[test]
2028    fn test_should_merge_parts_below_threshold() {
2029        let mut bulk_parts = BulkParts::default();
2030
2031        // Add 7 bulk parts (below DEFAULT_MERGE_THRESHOLD of 8)
2032        for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2033            let part = create_bulk_part_with_converter(
2034                &format!("key_{}", i),
2035                i as u32,
2036                vec![1000 + i as i64 * 100],
2037                vec![Some(i as f64 * 10.0)],
2038                100 + i as u64,
2039            )
2040            .unwrap();
2041            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2042        }
2043
2044        // Should not trigger merge since we have only 7 parts
2045        assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2046    }
2047
2048    #[test]
2049    fn test_should_merge_parts_at_threshold() {
2050        let mut bulk_parts = BulkParts::default();
2051        let merge_threshold = 8;
2052
2053        // Add 8 bulk parts (at merge_threshold)
2054        for i in 0..merge_threshold {
2055            let part = create_bulk_part_with_converter(
2056                &format!("key_{}", i),
2057                i as u32,
2058                vec![1000 + i as i64 * 100],
2059                vec![Some(i as f64 * 10.0)],
2060                100 + i as u64,
2061            )
2062            .unwrap();
2063            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2064        }
2065
2066        // Should trigger merge since we have 8 parts
2067        assert!(bulk_parts.should_merge_parts(merge_threshold));
2068    }
2069
2070    #[test]
2071    fn test_should_merge_parts_with_merging_flag() {
2072        let mut bulk_parts = BulkParts::default();
2073        let merge_threshold = 8;
2074
2075        // Add 10 bulk parts
2076        for i in 0..10 {
2077            let part = create_bulk_part_with_converter(
2078                &format!("key_{}", i),
2079                i as u32,
2080                vec![1000 + i as i64 * 100],
2081                vec![Some(i as f64 * 10.0)],
2082                100 + i as u64,
2083            )
2084            .unwrap();
2085            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2086        }
2087
2088        // Should trigger merge since we have 10 parts
2089        assert!(bulk_parts.should_merge_parts(merge_threshold));
2090
2091        // Mark first 3 parts as merging
2092        for wrapper in bulk_parts.parts.iter_mut().take(3) {
2093            wrapper.merging = true;
2094        }
2095
2096        // Now only 7 parts are available for merging, should not trigger
2097        assert!(!bulk_parts.should_merge_parts(merge_threshold));
2098    }
2099
2100    #[test]
2101    fn test_collect_parts_to_merge_grouping() {
2102        let mut bulk_parts = BulkParts::default();
2103
2104        // Add 16 bulk parts with different row counts
2105        for i in 0..16 {
2106            let num_rows = (i % 4) + 1; // 1 to 4 rows
2107            let timestamps: Vec<i64> = (0..num_rows)
2108                .map(|j| 1000 + i as i64 * 100 + j as i64)
2109                .collect();
2110            let values: Vec<Option<f64>> =
2111                (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2112            let part = create_bulk_part_with_converter(
2113                &format!("key_{}", i),
2114                i as u32,
2115                timestamps,
2116                values,
2117                100 + i as u64,
2118            )
2119            .unwrap();
2120            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2121        }
2122
2123        // Should trigger merge since we have 16 parts
2124        assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2125
2126        // Collect parts to merge
2127        let collected =
2128            bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2129
2130        // Should have groups
2131        assert!(!collected.groups.is_empty());
2132
2133        // All groups should have parts
2134        for group in &collected.groups {
2135            assert!(!group.is_empty());
2136        }
2137
2138        // Total parts collected should be 16
2139        let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2140        assert_eq!(16, total_parts);
2141    }
2142
2143    #[test]
2144    fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2145        let metadata = metadata_for_test();
2146        let merge_threshold = 8;
2147        let config = BulkMemtableConfig {
2148            merge_threshold,
2149            ..Default::default()
2150        };
2151        let memtable = BulkMemtable::new(
2152            2005,
2153            config,
2154            metadata.clone(),
2155            None,
2156            None,
2157            false,
2158            MergeMode::LastRow,
2159        );
2160        // Disable unordered_part for this test
2161        memtable.set_unordered_part_threshold(0);
2162
2163        // Write enough bulk parts to trigger merge (merge_threshold = 8)
2164        // Each part has small number of rows so total < DEFAULT_ROW_GROUP_SIZE
2165        // This will result in MultiBulkPart after compaction
2166        for i in 0..merge_threshold {
2167            let part = create_bulk_part_with_converter(
2168                &format!("key_{}", i),
2169                i as u32,
2170                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2171                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2172                100 + i as u64,
2173            )
2174            .unwrap();
2175            memtable.write_bulk(part).unwrap();
2176        }
2177
2178        // Compact to trigger MultiBulkPart creation (since total rows < DEFAULT_ROW_GROUP_SIZE)
2179        memtable.compact(false).unwrap();
2180
2181        // Verify we can read from the memtable
2182        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2183        let ranges = memtable
2184            .ranges(
2185                None,
2186                RangesOptions::default().with_predicate(predicate_group),
2187            )
2188            .unwrap();
2189
2190        assert_eq!(1, ranges.ranges.len());
2191        let expected_rows = merge_threshold * 2; // Each part has 2 rows
2192        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2193        assert_eq!(expected_rows, total_rows);
2194
2195        // Read all data
2196        let mut total_rows_read = 0;
2197        for (_range_id, range) in ranges.ranges.iter() {
2198            assert!(range.is_record_batch());
2199            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2200
2201            for batch_result in record_batch_iter {
2202                let batch = batch_result.unwrap();
2203                total_rows_read += batch.num_rows();
2204            }
2205        }
2206        assert_eq!(expected_rows, total_rows_read);
2207    }
2208}