Skip to main content

mito2/memtable/bulk/
part_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use mito_codec::row_converter::PrimaryKeyFilter;
21use parquet::arrow::ProjectionMask;
22use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
23use snafu::ResultExt;
24use store_api::storage::SequenceRange;
25
26use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
27use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
28use crate::memtable::bulk::part::EncodedBulkPart;
29use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
30use crate::memtable::{MemScanMetrics, MemScanMetricsData};
31use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
32use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
33use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index};
34use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key};
35
36/// Iterator for reading data inside a bulk part.
37pub struct EncodedBulkPartIter {
38    context: BulkIterContextRef,
39    row_groups_to_read: VecDeque<usize>,
40    current_reader: Option<ParquetRecordBatchReader>,
41    builder: MemtableRowGroupReaderBuilder,
42    /// Sequence number filter.
43    sequence: Option<SequenceRange>,
44    /// Cached skip_fields for current row group.
45    current_skip_fields: bool,
46    /// Primary key filter for prefiltering before convert_batch.
47    pk_filter: Option<CachedPrimaryKeyFilter>,
48    /// Metrics for this iterator.
49    metrics: MemScanMetricsData,
50    /// Optional memory scan metrics to report to.
51    mem_scan_metrics: Option<MemScanMetrics>,
52}
53
54impl EncodedBulkPartIter {
55    /// Creates a new [BulkPartIter].
56    pub fn try_new(
57        encoded_part: &EncodedBulkPart,
58        context: BulkIterContextRef,
59        mut row_groups_to_read: VecDeque<usize>,
60        sequence: Option<SequenceRange>,
61        mem_scan_metrics: Option<MemScanMetrics>,
62    ) -> error::Result<Self> {
63        assert!(context.read_format().as_flat().is_some());
64
65        let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
66        let data = encoded_part.data().clone();
67        let series_count = encoded_part.metadata().num_series as usize;
68
69        let projection_mask = ProjectionMask::roots(
70            parquet_meta.file_metadata().schema_descr(),
71            context.read_format().projection_indices().iter().copied(),
72        );
73        let builder =
74            MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
75
76        // Build PK filter if applicable (flat format with dictionary-encoded PKs).
77        let pk_filter = context.build_pk_filter();
78
79        let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
80            Some(first_row_group) => {
81                let skip_fields = builder.compute_skip_fields(&context, first_row_group);
82                let reader = builder.build_row_group_reader(first_row_group, None)?;
83                (Some(reader), skip_fields)
84            }
85            None => (None, false),
86        };
87
88        Ok(Self {
89            context,
90            row_groups_to_read,
91            current_reader: init_reader,
92            builder,
93            sequence,
94            current_skip_fields,
95            pk_filter,
96            metrics: MemScanMetricsData {
97                total_series: series_count,
98                ..Default::default()
99            },
100            mem_scan_metrics,
101        })
102    }
103
104    fn report_mem_scan_metrics(&mut self) {
105        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
106            mem_scan_metrics.merge_inner(&self.metrics);
107        }
108    }
109
110    /// Fetches next non-empty record batch.
111    pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
112        let start = Instant::now();
113
114        let Some(current) = &mut self.current_reader else {
115            // All row group exhausted.
116            self.metrics.scan_cost += start.elapsed();
117            return Ok(None);
118        };
119
120        for batch in current {
121            let batch = batch.context(DecodeArrowRowGroupSnafu)?;
122            if let Some(batch) = apply_combined_filters(
123                &self.context,
124                &self.sequence,
125                batch,
126                self.current_skip_fields,
127                self.pk_filter
128                    .as_mut()
129                    .map(|f| f as &mut dyn PrimaryKeyFilter),
130                &mut self.metrics,
131            )? {
132                // Update metrics
133                self.metrics.num_batches += 1;
134                self.metrics.num_rows += batch.num_rows();
135                self.metrics.scan_cost += start.elapsed();
136                return Ok(Some(batch));
137            }
138        }
139
140        // Previous row group exhausted, read next row group
141        while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
142            // Compute skip_fields for this row group
143            self.current_skip_fields = self
144                .builder
145                .compute_skip_fields(&self.context, next_row_group);
146
147            let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
148            let current = self.current_reader.insert(next_reader);
149
150            for batch in current {
151                let batch = batch.context(DecodeArrowRowGroupSnafu)?;
152                if let Some(batch) = apply_combined_filters(
153                    &self.context,
154                    &self.sequence,
155                    batch,
156                    self.current_skip_fields,
157                    self.pk_filter
158                        .as_mut()
159                        .map(|f| f as &mut dyn PrimaryKeyFilter),
160                    &mut self.metrics,
161                )? {
162                    // Update metrics
163                    self.metrics.num_batches += 1;
164                    self.metrics.num_rows += batch.num_rows();
165                    self.metrics.scan_cost += start.elapsed();
166                    return Ok(Some(batch));
167                }
168            }
169        }
170
171        self.metrics.scan_cost += start.elapsed();
172        Ok(None)
173    }
174}
175
176impl Iterator for EncodedBulkPartIter {
177    type Item = error::Result<RecordBatch>;
178
179    fn next(&mut self) -> Option<Self::Item> {
180        let result = self.next_record_batch().transpose();
181
182        // Report metrics when iteration is complete
183        if result.is_none() {
184            self.report_mem_scan_metrics();
185        }
186
187        result
188    }
189}
190
191impl Drop for EncodedBulkPartIter {
192    fn drop(&mut self) {
193        common_telemetry::debug!(
194            "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
195            self.context.region_id(),
196            self.metrics.total_series,
197            self.metrics.num_rows,
198            self.metrics.num_batches,
199            self.metrics.scan_cost,
200            self.metrics.prefilter_cost,
201            self.metrics.prefilter_rows_filtered
202        );
203
204        // Report MemScanMetrics if not already reported
205        self.report_mem_scan_metrics();
206
207        READ_ROWS_TOTAL
208            .with_label_values(&["bulk_memtable"])
209            .inc_by(self.metrics.num_rows as u64);
210        READ_STAGE_ELAPSED
211            .with_label_values(&["scan_memtable"])
212            .observe(self.metrics.scan_cost.as_secs_f64());
213    }
214}
215
216/// Iterator for reading record batches from a bulk part.
217///
218/// Iterates through one or more RecordBatches, applying filters and projections.
219pub struct BulkPartBatchIter {
220    /// Queue of RecordBatches to process.
221    batches: VecDeque<RecordBatch>,
222    /// Iterator context for filtering and projection.
223    context: BulkIterContextRef,
224    /// Sequence number filter.
225    sequence: Option<SequenceRange>,
226    /// Primary key filter for prefiltering before convert_batch.
227    pk_filter: Option<CachedPrimaryKeyFilter>,
228    /// Metrics for this iterator.
229    metrics: MemScanMetricsData,
230    /// Optional memory scan metrics to report to.
231    mem_scan_metrics: Option<MemScanMetrics>,
232}
233
234impl BulkPartBatchIter {
235    /// Creates a new [BulkPartBatchIter] from multiple RecordBatches.
236    pub fn new(
237        batches: Vec<RecordBatch>,
238        context: BulkIterContextRef,
239        sequence: Option<SequenceRange>,
240        series_count: usize,
241        mem_scan_metrics: Option<MemScanMetrics>,
242    ) -> Self {
243        assert!(context.read_format().as_flat().is_some());
244
245        let pk_filter = context.build_pk_filter();
246
247        Self {
248            batches: VecDeque::from(batches),
249            context,
250            sequence,
251            pk_filter,
252            metrics: MemScanMetricsData {
253                total_series: series_count,
254                ..Default::default()
255            },
256            mem_scan_metrics,
257        }
258    }
259
260    /// Creates a new [BulkPartBatchIter] from a single RecordBatch.
261    pub fn from_single(
262        record_batch: RecordBatch,
263        context: BulkIterContextRef,
264        sequence: Option<SequenceRange>,
265        series_count: usize,
266        mem_scan_metrics: Option<MemScanMetrics>,
267    ) -> Self {
268        Self::new(
269            vec![record_batch],
270            context,
271            sequence,
272            series_count,
273            mem_scan_metrics,
274        )
275    }
276
277    fn report_mem_scan_metrics(&mut self) {
278        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
279            mem_scan_metrics.merge_inner(&self.metrics);
280        }
281    }
282
283    /// Applies projection to the RecordBatch if needed.
284    fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
285        let projection_indices = self.context.read_format().projection_indices();
286        if projection_indices.len() == record_batch.num_columns() {
287            return Ok(record_batch);
288        }
289
290        record_batch
291            .project(projection_indices)
292            .context(ComputeArrowSnafu)
293    }
294
295    fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
296        let start = Instant::now();
297
298        // Apply projection first.
299        let projected_batch = self.apply_projection(record_batch)?;
300
301        // Apply combined filtering (both predicate and sequence filters)
302        let skip_fields = match self.context.pre_filter_mode() {
303            PreFilterMode::All => false,
304            PreFilterMode::SkipFields => true,
305            PreFilterMode::SkipFieldsOnDelete => true,
306        };
307
308        let Some(filtered_batch) = apply_combined_filters(
309            &self.context,
310            &self.sequence,
311            projected_batch,
312            skip_fields,
313            self.pk_filter
314                .as_mut()
315                .map(|f| f as &mut dyn PrimaryKeyFilter),
316            &mut self.metrics,
317        )?
318        else {
319            self.metrics.scan_cost += start.elapsed();
320            return Ok(None);
321        };
322
323        // Update metrics
324        self.metrics.num_batches += 1;
325        self.metrics.num_rows += filtered_batch.num_rows();
326        self.metrics.scan_cost += start.elapsed();
327
328        Ok(Some(filtered_batch))
329    }
330}
331
332impl Iterator for BulkPartBatchIter {
333    type Item = error::Result<RecordBatch>;
334
335    fn next(&mut self) -> Option<Self::Item> {
336        // Process batches until we find a non-empty one or run out
337        while let Some(batch) = self.batches.pop_front() {
338            match self.process_batch(batch) {
339                Ok(Some(result)) => return Some(Ok(result)),
340                Ok(None) => continue, // This batch was filtered out, try next
341                Err(e) => {
342                    self.report_mem_scan_metrics();
343                    return Some(Err(e));
344                }
345            }
346        }
347
348        // No more batches
349        self.report_mem_scan_metrics();
350        None
351    }
352}
353
354impl Drop for BulkPartBatchIter {
355    fn drop(&mut self) {
356        common_telemetry::debug!(
357            "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
358            self.context.region_id(),
359            self.metrics.total_series,
360            self.metrics.num_rows,
361            self.metrics.num_batches,
362            self.metrics.scan_cost,
363            self.metrics.prefilter_cost,
364            self.metrics.prefilter_rows_filtered
365        );
366
367        // Report MemScanMetrics if not already reported
368        self.report_mem_scan_metrics();
369
370        READ_ROWS_TOTAL
371            .with_label_values(&["bulk_memtable"])
372            .inc_by(self.metrics.num_rows as u64);
373        READ_STAGE_ELAPSED
374            .with_label_values(&["scan_memtable"])
375            .observe(self.metrics.scan_cost.as_secs_f64());
376    }
377}
378
379/// Applies both predicate filtering and sequence filtering in a single pass.
380/// Returns None if the filtered batch is empty.
381///
382/// # Panics
383/// Panics if the format is not flat.
384fn apply_combined_filters(
385    context: &BulkIterContext,
386    sequence: &Option<SequenceRange>,
387    record_batch: RecordBatch,
388    skip_fields: bool,
389    pk_filter: Option<&mut dyn PrimaryKeyFilter>,
390    metrics: &mut MemScanMetricsData,
391) -> error::Result<Option<RecordBatch>> {
392    // Apply PK prefilter on raw batch before convert_batch to reduce conversion overhead.
393    let has_pk_prefilter = pk_filter.is_some();
394    let record_batch = if let Some(pk_filter) = pk_filter {
395        let rows_before = record_batch.num_rows();
396        let prefilter_start = Instant::now();
397        let pk_col_idx = primary_key_column_index(record_batch.num_columns());
398        match prefilter_flat_batch_by_primary_key(record_batch, pk_col_idx, pk_filter)? {
399            Some(batch) => {
400                metrics.prefilter_cost += prefilter_start.elapsed();
401                metrics.prefilter_rows_filtered += rows_before - batch.num_rows();
402                batch
403            }
404            None => {
405                metrics.prefilter_cost += prefilter_start.elapsed();
406                metrics.prefilter_rows_filtered += rows_before;
407                return Ok(None);
408            }
409        }
410    } else {
411        record_batch
412    };
413
414    // Converts the format to the flat format.
415    let format = context.read_format().as_flat().unwrap();
416    let record_batch = format.convert_batch(record_batch, None)?;
417
418    let num_rows = record_batch.num_rows();
419    let mut combined_filter = None;
420    let mut tag_decode_state = TagDecodeState::new();
421
422    // Apply predicate filters using the shared method.
423    if !context.base.filters.is_empty() {
424        let predicate_mask = context.base.compute_filter_mask_flat(
425            &record_batch,
426            skip_fields,
427            has_pk_prefilter,
428            &mut tag_decode_state,
429        )?;
430        // If predicate filters out the entire batch, return None early
431        let Some(mask) = predicate_mask else {
432            return Ok(None);
433        };
434        combined_filter = Some(BooleanArray::from(mask));
435    }
436
437    // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
438    if let Some(sequence) = sequence {
439        let sequence_column =
440            record_batch.column(sequence_column_index(record_batch.num_columns()));
441        let sequence_filter = sequence
442            .filter(&sequence_column)
443            .context(ComputeArrowSnafu)?;
444        // Combine with existing filter using AND operation
445        combined_filter = match combined_filter {
446            None => Some(sequence_filter),
447            Some(existing_filter) => {
448                let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
449                    .context(ComputeArrowSnafu)?;
450                Some(and_result)
451            }
452        };
453    }
454
455    // Apply the combined filter if any filters were applied
456    let Some(filter_array) = combined_filter else {
457        // No filters applied, return original batch
458        return Ok(Some(record_batch));
459    };
460    let select_count = filter_array.true_count();
461    if select_count == 0 {
462        return Ok(None);
463    }
464    if select_count == num_rows {
465        return Ok(Some(record_batch));
466    }
467    let filtered_batch =
468        datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
469            .context(ComputeArrowSnafu)?;
470
471    Ok(Some(filtered_batch))
472}
473
474#[cfg(test)]
475mod tests {
476    use std::sync::Arc;
477
478    use api::v1::SemanticType;
479    use datafusion_expr::{col, lit};
480    use datatypes::arrow::array::{
481        ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
482        UInt64Array,
483    };
484    use datatypes::arrow::datatypes::{DataType, Field, Schema};
485    use datatypes::data_type::ConcreteDataType;
486    use datatypes::schema::ColumnSchema;
487    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
488    use store_api::storage::RegionId;
489    use table::predicate::Predicate;
490
491    use super::*;
492    use crate::memtable::bulk::context::BulkIterContext;
493    use crate::test_util::sst_util::new_primary_key;
494
495    #[test]
496    fn test_bulk_part_batch_iter() {
497        // Create a simple schema
498        let schema = Arc::new(Schema::new(vec![
499            Field::new("key1", DataType::Utf8, false),
500            Field::new("field1", DataType::Int64, false),
501            Field::new(
502                "timestamp",
503                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
504                false,
505            ),
506            Field::new(
507                "__primary_key",
508                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
509                false,
510            ),
511            Field::new("__sequence", DataType::UInt64, false),
512            Field::new("__op_type", DataType::UInt8, false),
513        ]));
514
515        // Create test data
516        let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
517        let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
518        let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
519            vec![1000, 2000, 3000],
520        ));
521
522        // Create primary key dictionary array with properly encoded PKs
523        use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
524        let pk1 = new_primary_key(&["key1"]);
525        let pk2 = new_primary_key(&["key2"]);
526        let pk3 = new_primary_key(&["key3"]);
527        let values = Arc::new(BinaryArray::from_iter_values([
528            pk1.as_slice(),
529            pk2.as_slice(),
530            pk3.as_slice(),
531        ]));
532        let keys = UInt32Array::from(vec![0, 1, 2]);
533        let primary_key = Arc::new(DictionaryArray::new(keys, values));
534
535        let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
536        let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations
537
538        let record_batch = RecordBatch::try_new(
539            schema,
540            vec![
541                key1,
542                field1,
543                timestamp,
544                primary_key.clone(),
545                sequence,
546                op_type,
547            ],
548        )
549        .unwrap();
550
551        // Create a minimal region metadata for testing
552        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
553        builder
554            .push_column_metadata(ColumnMetadata {
555                column_schema: ColumnSchema::new(
556                    "key1",
557                    ConcreteDataType::string_datatype(),
558                    false,
559                ),
560                semantic_type: SemanticType::Tag,
561                column_id: 0,
562            })
563            .push_column_metadata(ColumnMetadata {
564                column_schema: ColumnSchema::new(
565                    "field1",
566                    ConcreteDataType::int64_datatype(),
567                    false,
568                ),
569                semantic_type: SemanticType::Field,
570                column_id: 1,
571            })
572            .push_column_metadata(ColumnMetadata {
573                column_schema: ColumnSchema::new(
574                    "timestamp",
575                    ConcreteDataType::timestamp_millisecond_datatype(),
576                    false,
577                ),
578                semantic_type: SemanticType::Timestamp,
579                column_id: 2,
580            })
581            .primary_key(vec![0]);
582
583        let region_metadata = builder.build().unwrap();
584
585        // Create context
586        let context = Arc::new(
587            BulkIterContext::new(
588                Arc::new(region_metadata.clone()),
589                None, // No projection
590                None, // No predicate
591                false,
592            )
593            .unwrap(),
594        );
595        // Iterates all rows.
596        let iter =
597            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
598        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
599        assert_eq!(1, result.len());
600        assert_eq!(3, result[0].num_rows());
601        assert_eq!(6, result[0].num_columns(),);
602
603        // Creates iter with sequence filter (only include sequences <= 2)
604        let iter = BulkPartBatchIter::from_single(
605            record_batch.clone(),
606            context,
607            Some(SequenceRange::LtEq { max: 2 }),
608            0,
609            None,
610        );
611        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
612        assert_eq!(1, result.len());
613        let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
614        assert_eq!(
615            &expect_sequence,
616            result[0].column(result[0].num_columns() - 2)
617        );
618        assert_eq!(6, result[0].num_columns());
619
620        let context = Arc::new(
621            BulkIterContext::new(
622                Arc::new(region_metadata),
623                Some(&[0, 2]),
624                Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
625                false,
626            )
627            .unwrap(),
628        );
629        // Creates iter with projection and predicate.
630        let iter =
631            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
632        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
633        assert_eq!(1, result.len());
634        assert_eq!(1, result[0].num_rows());
635        assert_eq!(5, result[0].num_columns());
636        let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
637        assert_eq!(
638            &expect_sequence,
639            result[0].column(result[0].num_columns() - 2)
640        );
641    }
642
643    #[test]
644    fn test_bulk_part_batch_iter_multiple_batches() {
645        // Create a simple schema
646        let schema = Arc::new(Schema::new(vec![
647            Field::new("key1", DataType::Utf8, false),
648            Field::new("field1", DataType::Int64, false),
649            Field::new(
650                "timestamp",
651                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
652                false,
653            ),
654            Field::new(
655                "__primary_key",
656                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
657                false,
658            ),
659            Field::new("__sequence", DataType::UInt64, false),
660            Field::new("__op_type", DataType::UInt8, false),
661        ]));
662
663        // Create first batch with 2 rows
664        let pk1 = new_primary_key(&["key1"]);
665        let pk2 = new_primary_key(&["key2"]);
666        let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
667        let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
668        let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
669            vec![1000, 2000],
670        ));
671        let values_1 = Arc::new(BinaryArray::from_iter_values([
672            pk1.as_slice(),
673            pk2.as_slice(),
674        ]));
675        let keys_1 = UInt32Array::from(vec![0, 1]);
676        let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
677        let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
678        let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
679
680        let batch1 = RecordBatch::try_new(
681            schema.clone(),
682            vec![
683                key1_1,
684                field1_1,
685                timestamp_1,
686                primary_key_1,
687                sequence_1,
688                op_type_1,
689            ],
690        )
691        .unwrap();
692
693        // Create second batch with 3 rows
694        let pk3 = new_primary_key(&["key3"]);
695        let pk4 = new_primary_key(&["key4"]);
696        let pk5 = new_primary_key(&["key5"]);
697        let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
698        let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
699        let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
700            vec![3000, 4000, 5000],
701        ));
702        let values_2 = Arc::new(BinaryArray::from_iter_values([
703            pk3.as_slice(),
704            pk4.as_slice(),
705            pk5.as_slice(),
706        ]));
707        let keys_2 = UInt32Array::from(vec![0, 1, 2]);
708        let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
709        let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
710        let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
711
712        let batch2 = RecordBatch::try_new(
713            schema.clone(),
714            vec![
715                key1_2,
716                field1_2,
717                timestamp_2,
718                primary_key_2,
719                sequence_2,
720                op_type_2,
721            ],
722        )
723        .unwrap();
724
725        // Create region metadata
726        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
727        builder
728            .push_column_metadata(ColumnMetadata {
729                column_schema: ColumnSchema::new(
730                    "key1",
731                    ConcreteDataType::string_datatype(),
732                    false,
733                ),
734                semantic_type: SemanticType::Tag,
735                column_id: 0,
736            })
737            .push_column_metadata(ColumnMetadata {
738                column_schema: ColumnSchema::new(
739                    "field1",
740                    ConcreteDataType::int64_datatype(),
741                    false,
742                ),
743                semantic_type: SemanticType::Field,
744                column_id: 1,
745            })
746            .push_column_metadata(ColumnMetadata {
747                column_schema: ColumnSchema::new(
748                    "timestamp",
749                    ConcreteDataType::timestamp_millisecond_datatype(),
750                    false,
751                ),
752                semantic_type: SemanticType::Timestamp,
753                column_id: 2,
754            })
755            .primary_key(vec![0]);
756
757        let region_metadata = builder.build().unwrap();
758
759        // Create context
760        let context = Arc::new(
761            BulkIterContext::new(
762                Arc::new(region_metadata),
763                None, // No projection
764                None, // No predicate
765                false,
766            )
767            .unwrap(),
768        );
769
770        // Create iterator with multiple batches
771        let expect_batches = vec![batch1, batch2];
772        let iter = BulkPartBatchIter::new(expect_batches.clone(), context.clone(), None, 0, None);
773
774        // Collect all results
775        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
776        assert_eq!(expect_batches, result);
777    }
778}