Skip to main content

mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datafusion_common::Column;
29use datafusion_common::pruning::PruningStatistics;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow;
32use datatypes::arrow::array::{
33    Array, ArrayRef, BinaryArray, BooleanArray, StringDictionaryBuilder, UInt8Array, UInt64Array,
34};
35use datatypes::arrow::compute::{SortColumn, SortOptions};
36use datatypes::arrow::datatypes::{
37    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
38};
39use datatypes::data_type::DataType;
40use datatypes::extension::json::is_json_extension_type;
41use datatypes::prelude::{MutableVector, Vector};
42use datatypes::schema::ext::ArrowSchemaExt;
43use datatypes::types::JsonType;
44use datatypes::value::ValueRef;
45use datatypes::vectors::Helper;
46use datatypes::vectors::json::array::JsonArray;
47use mito_codec::key_values::{KeyValue, KeyValues};
48use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::file::metadata::ParquetMetaData;
52use parquet::file::properties::WriterProperties;
53use smallvec::SmallVec;
54use snafu::{OptionExt, ResultExt};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
59
60use crate::error::{
61    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu,
62    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
63    InvalidRequestSnafu, NewRecordBatchSnafu, Result,
64};
65use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
66use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
67use crate::memtable::time_series::{ValueBuilder, Values};
68use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
69use crate::sst::SeriesEstimator;
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::flat_format::primary_key_column_index;
72use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
73use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
74
75const INIT_DICT_VALUE_CAPACITY: usize = 8;
76
77/// A raw bulk part in the memtable.
78#[derive(Clone)]
79pub struct BulkPart {
80    pub batch: RecordBatch,
81    pub max_timestamp: i64,
82    pub min_timestamp: i64,
83    pub sequence: u64,
84    pub timestamp_index: usize,
85    pub raw_data: Option<ArrowIpc>,
86}
87
88impl TryFrom<BulkWalEntry> for BulkPart {
89    type Error = error::Error;
90
91    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
92        match value.body.expect("Entry payload should be present") {
93            Body::ArrowIpc(ipc) => {
94                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
95                    .context(error::ConvertBulkWalEntrySnafu)?;
96                let batch = decoder
97                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                Ok(Self {
100                    batch,
101                    max_timestamp: value.max_ts,
102                    min_timestamp: value.min_ts,
103                    sequence: value.sequence,
104                    timestamp_index: value.timestamp_index as usize,
105                    raw_data: Some(ipc),
106                })
107            }
108        }
109    }
110}
111
112impl TryFrom<&BulkPart> for BulkWalEntry {
113    type Error = error::Error;
114
115    fn try_from(value: &BulkPart) -> Result<Self> {
116        if let Some(ipc) = &value.raw_data {
117            Ok(BulkWalEntry {
118                sequence: value.sequence,
119                max_ts: value.max_timestamp,
120                min_ts: value.min_timestamp,
121                timestamp_index: value.timestamp_index as u32,
122                body: Some(Body::ArrowIpc(ipc.clone())),
123            })
124        } else {
125            let mut encoder = FlightEncoder::default();
126            let schema_bytes = encoder
127                .encode_schema(value.batch.schema().as_ref())
128                .data_header;
129            let [rb_data] = encoder
130                .encode(FlightMessage::RecordBatch(value.batch.clone()))
131                .try_into()
132                .map_err(|_| {
133                    error::UnsupportedOperationSnafu {
134                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
135                    }
136                    .build()
137                })?;
138            Ok(BulkWalEntry {
139                sequence: value.sequence,
140                max_ts: value.max_timestamp,
141                min_ts: value.min_timestamp,
142                timestamp_index: value.timestamp_index as u32,
143                body: Some(Body::ArrowIpc(ArrowIpc {
144                    schema: schema_bytes,
145                    data_header: rb_data.data_header,
146                    payload: rb_data.data_body,
147                })),
148            })
149        }
150    }
151}
152
153impl BulkPart {
154    pub(crate) fn estimated_size(&self) -> usize {
155        record_batch_estimated_size(&self.batch)
156    }
157
158    /// Returns the estimated series count in this BulkPart.
159    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
160    pub fn estimated_series_count(&self) -> usize {
161        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
162        let pk_column = self.batch.column(pk_column_idx);
163        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
164            dict_array.values().len()
165        } else {
166            0
167        }
168    }
169
170    /// Creates MemtableStats from this BulkPart.
171    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
172        let ts_type = region_metadata.time_index_type();
173        let min_ts = ts_type.create_timestamp(self.min_timestamp);
174        let max_ts = ts_type.create_timestamp(self.max_timestamp);
175
176        MemtableStats {
177            estimated_bytes: self.estimated_size(),
178            time_range: Some((min_ts, max_ts)),
179            num_rows: self.num_rows(),
180            num_ranges: 1,
181            max_sequence: self.sequence,
182            series_count: self.estimated_series_count(),
183        }
184    }
185
186    /// Fills missing columns in the BulkPart batch with default values.
187    ///
188    /// This function checks if the batch schema matches the region metadata schema,
189    /// and if there are missing columns, it fills them with default values (or null
190    /// for nullable columns).
191    ///
192    /// # Arguments
193    ///
194    /// * `region_metadata` - The region metadata containing the expected schema
195    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
196        // Builds a map of existing columns in the batch
197        let batch_schema = self.batch.schema();
198        let batch_columns: HashSet<_> = batch_schema
199            .fields()
200            .iter()
201            .map(|f| f.name().as_str())
202            .collect();
203
204        // Finds columns that need to be filled
205        let mut columns_to_fill = Vec::new();
206        for column_meta in &region_metadata.column_metadatas {
207            // TODO(yingwen): Returns error if it is impure default after we support filling
208            // bulk insert request in the frontend
209            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
210                columns_to_fill.push(column_meta);
211            }
212        }
213
214        if columns_to_fill.is_empty() {
215            return Ok(());
216        }
217
218        let num_rows = self.batch.num_rows();
219
220        let mut new_columns = Vec::new();
221        let mut new_fields = Vec::new();
222
223        // First, adds all existing columns
224        new_fields.extend(batch_schema.fields().iter().cloned());
225        new_columns.extend_from_slice(self.batch.columns());
226
227        let region_id = region_metadata.region_id;
228        // Then adds the missing columns with default values
229        for column_meta in columns_to_fill {
230            let default_vector = column_meta
231                .column_schema
232                .create_default_vector(num_rows)
233                .context(CreateDefaultSnafu {
234                    region_id,
235                    column: &column_meta.column_schema.name,
236                })?
237                .with_context(|| InvalidRequestSnafu {
238                    region_id,
239                    reason: format!(
240                        "column {} does not have default value",
241                        column_meta.column_schema.name
242                    ),
243                })?;
244            let arrow_array = default_vector.to_arrow_array();
245            column_meta.column_schema.data_type.as_arrow_type();
246
247            new_fields.push(Arc::new(Field::new(
248                column_meta.column_schema.name.clone(),
249                column_meta.column_schema.data_type.as_arrow_type(),
250                column_meta.column_schema.is_nullable(),
251            )));
252            new_columns.push(arrow_array);
253        }
254
255        // Create a new schema and batch with the filled columns
256        let new_schema = Arc::new(Schema::new(new_fields));
257        let new_batch =
258            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
259
260        // Update the batch
261        self.batch = new_batch;
262
263        Ok(())
264    }
265
266    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
267    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
268        let vectors = region_metadata
269            .schema
270            .column_schemas()
271            .iter()
272            .map(|col| match self.batch.column_by_name(&col.name) {
273                None => Ok(None),
274                Some(col) => Helper::try_into_vector(col).map(Some),
275            })
276            .collect::<datatypes::error::Result<Vec<_>>>()
277            .context(error::ComputeVectorSnafu)?;
278
279        let rows = (0..self.num_rows())
280            .map(|row_idx| {
281                let values = (0..self.batch.num_columns())
282                    .map(|col_idx| {
283                        if let Some(v) = &vectors[col_idx] {
284                            to_grpc_value(v.get(row_idx))
285                        } else {
286                            api::v1::Value { value_data: None }
287                        }
288                    })
289                    .collect::<Vec<_>>();
290                api::v1::Row { values }
291            })
292            .collect::<Vec<_>>();
293
294        let schema = region_metadata
295            .column_metadatas
296            .iter()
297            .map(|c| {
298                let data_type_wrapper =
299                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
300                Ok(api::v1::ColumnSchema {
301                    column_name: c.column_schema.name.clone(),
302                    datatype: data_type_wrapper.datatype() as i32,
303                    semantic_type: c.semantic_type as i32,
304                    ..Default::default()
305                })
306            })
307            .collect::<api::error::Result<Vec<_>>>()
308            .context(error::ConvertColumnDataTypeSnafu {
309                reason: "failed to convert region metadata to column schema",
310            })?;
311
312        let rows = api::v1::Rows { schema, rows };
313
314        Ok(Mutation {
315            op_type: OpType::Put as i32,
316            sequence: self.sequence,
317            rows: Some(rows),
318            write_hint: None,
319        })
320    }
321
322    pub fn timestamps(&self) -> &ArrayRef {
323        self.batch.column(self.timestamp_index)
324    }
325
326    pub fn num_rows(&self) -> usize {
327        self.batch.num_rows()
328    }
329}
330
331/// A collection of small unordered bulk parts.
332/// Used to batch small parts together before merging them into a sorted part.
333pub struct UnorderedPart {
334    /// Small bulk parts that haven't been sorted yet.
335    parts: Vec<BulkPart>,
336    /// Total number of rows across all parts.
337    total_rows: usize,
338    /// Minimum timestamp across all parts.
339    min_timestamp: i64,
340    /// Maximum timestamp across all parts.
341    max_timestamp: i64,
342    /// Maximum sequence number across all parts.
343    max_sequence: u64,
344    /// Row count threshold for accepting parts (default: 1024).
345    threshold: usize,
346    /// Row count threshold for compacting (default: 4096).
347    compact_threshold: usize,
348}
349
350impl Default for UnorderedPart {
351    fn default() -> Self {
352        Self::new()
353    }
354}
355
356impl UnorderedPart {
357    /// Creates a new empty UnorderedPart.
358    pub fn new() -> Self {
359        Self {
360            parts: Vec::new(),
361            total_rows: 0,
362            min_timestamp: i64::MAX,
363            max_timestamp: i64::MIN,
364            max_sequence: 0,
365            threshold: 1024,
366            compact_threshold: 4096,
367        }
368    }
369
370    /// Sets the threshold for accepting parts into unordered_part.
371    pub fn set_threshold(&mut self, threshold: usize) {
372        self.threshold = threshold;
373    }
374
375    /// Sets the threshold for compacting unordered_part.
376    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
377        self.compact_threshold = compact_threshold;
378    }
379
380    /// Returns the threshold for accepting parts.
381    pub fn threshold(&self) -> usize {
382        self.threshold
383    }
384
385    /// Returns the compact threshold.
386    pub fn compact_threshold(&self) -> usize {
387        self.compact_threshold
388    }
389
390    /// Returns true if this part should accept the given row count.
391    pub fn should_accept(&self, num_rows: usize) -> bool {
392        num_rows < self.threshold
393    }
394
395    /// Returns true if this part should be compacted.
396    pub fn should_compact(&self) -> bool {
397        self.total_rows >= self.compact_threshold
398    }
399
400    /// Adds a BulkPart to this unordered collection.
401    pub fn push(&mut self, part: BulkPart) {
402        self.total_rows += part.num_rows();
403        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
404        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
405        self.max_sequence = self.max_sequence.max(part.sequence);
406        self.parts.push(part);
407    }
408
409    /// Returns the total number of rows across all parts.
410    pub fn num_rows(&self) -> usize {
411        self.total_rows
412    }
413
414    /// Returns true if there are no parts.
415    pub fn is_empty(&self) -> bool {
416        self.parts.is_empty()
417    }
418
419    /// Returns the number of parts in this collection.
420    pub fn num_parts(&self) -> usize {
421        self.parts.len()
422    }
423
424    /// Concatenates and sorts all parts into a single RecordBatch.
425    /// Returns None if the collection is empty.
426    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
427        if self.parts.is_empty() {
428            return Ok(None);
429        }
430
431        if self.parts.len() == 1 {
432            // If there's only one part, return its batch directly
433            return Ok(Some(self.parts[0].batch.clone()));
434        }
435
436        // Get the schema from the first part
437        let schema = self.parts[0].batch.schema();
438        let concatenated = if schema.has_json_extension_field() {
439            let (schema, batches) = align_parts(&self.parts)?;
440            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
441        } else {
442            arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
443                .context(ComputeArrowSnafu)?
444        };
445
446        // Sort the concatenated batch
447        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
448
449        Ok(Some(sorted_batch))
450    }
451
452    /// Converts all parts into a single sorted BulkPart.
453    /// Returns None if the collection is empty.
454    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
455        let Some(sorted_batch) = self.concat_and_sort()? else {
456            return Ok(None);
457        };
458
459        let timestamp_index = self.parts[0].timestamp_index;
460
461        Ok(Some(BulkPart {
462            batch: sorted_batch,
463            max_timestamp: self.max_timestamp,
464            min_timestamp: self.min_timestamp,
465            sequence: self.max_sequence,
466            timestamp_index,
467            raw_data: None,
468        }))
469    }
470
471    /// Clears all parts from this collection.
472    pub fn clear(&mut self) {
473        self.parts.clear();
474        self.total_rows = 0;
475        self.min_timestamp = i64::MAX;
476        self.max_timestamp = i64::MIN;
477        self.max_sequence = 0;
478    }
479}
480
481/// Align the JSON columns in [BulkPart]s, to unified Arrow arrays. So that we can compute (concat,
482/// sort, etc.) on them.
483fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec<RecordBatch>)> {
484    debug_assert!(
485        !parts.is_empty()
486            && parts
487                .windows(2)
488                .all(|w| w[0].batch.schema_ref().fields().len()
489                    == w[1].batch.schema_ref().fields().len())
490    );
491
492    let first = &parts[0];
493    let base_schema = first.batch.schema_ref();
494    let rest = &parts[1..];
495
496    let mut merged_types = HashMap::new();
497    let mut aligned_fields = Vec::with_capacity(base_schema.fields().len());
498    for (i, field) in base_schema.fields().iter().enumerate() {
499        if is_json_extension_type(field) {
500            let mut merged = JsonType::from(field.data_type());
501            rest.iter()
502                .try_fold(&mut merged, |acc, x| {
503                    acc.merge(&JsonType::from(x.batch.schema_ref().field(i).data_type()))?;
504                    Ok(acc)
505                })
506                .context(DataTypeMismatchSnafu)?;
507            merged_types.insert(i, merged.as_arrow_type());
508
509            aligned_fields.push(Arc::new(
510                Field::new(
511                    field.name().clone(),
512                    merged.as_arrow_type(),
513                    field.is_nullable(),
514                )
515                .with_metadata(field.metadata().clone()),
516            ));
517        } else {
518            aligned_fields.push(field.clone())
519        };
520    }
521    let aligned_schema = Arc::new(Schema::new_with_metadata(
522        aligned_fields,
523        base_schema.metadata().clone(),
524    ));
525
526    let mut aligned_batches = Vec::with_capacity(parts.len());
527    for part in parts {
528        let mut columns = Vec::with_capacity(part.batch.num_columns());
529        for (i, column) in part.batch.columns().iter().enumerate() {
530            if let Some(expect) = merged_types.get(&i) {
531                columns.push(
532                    JsonArray::from(column)
533                        .try_align(expect)
534                        .context(ConvertValueSnafu)?,
535                );
536            } else {
537                columns.push(column.clone());
538            }
539        }
540        aligned_batches.push(
541            RecordBatch::try_new(aligned_schema.clone(), columns).context(NewRecordBatchSnafu)?,
542        );
543    }
544
545    Ok((aligned_schema, aligned_batches))
546}
547
548/// More accurate estimation of the size of a record batch.
549pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
550    batch
551        .columns()
552        .iter()
553        // If can not get slice memory size, assume 0 here.
554        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
555        .sum()
556}
557
558/// Primary key column builder for handling strings specially.
559enum PrimaryKeyColumnBuilder {
560    /// String dictionary builder for string types.
561    StringDict(StringDictionaryBuilder<UInt32Type>),
562    /// Generic mutable vector for other types.
563    Vector(Box<dyn MutableVector>),
564}
565
566impl PrimaryKeyColumnBuilder {
567    /// Appends a value to the builder.
568    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
569        match self {
570            PrimaryKeyColumnBuilder::StringDict(builder) => {
571                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
572                    // We know the value is a string.
573                    builder.append_value(s);
574                } else {
575                    builder.append_null();
576                }
577            }
578            PrimaryKeyColumnBuilder::Vector(builder) => {
579                builder.push_value_ref(&value);
580            }
581        }
582        Ok(())
583    }
584
585    /// Converts the builder to an ArrayRef.
586    fn into_arrow_array(self) -> ArrayRef {
587        match self {
588            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
589            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
590        }
591    }
592}
593
594/// Converter that converts structs into [BulkPart].
595pub struct BulkPartConverter {
596    /// Schema of the converted batch.
597    schema: SchemaRef,
598    /// Primary key codec for encoding keys
599    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
600    /// Buffer for encoding primary key.
601    key_buf: Vec<u8>,
602    /// Primary key array builder.
603    key_array_builder: PrimaryKeyArrayBuilder,
604    /// Builders for non-primary key columns.
605    value_builder: ValueBuilder,
606    /// Builders for individual primary key columns.
607    /// The order of builders is the same as the order of primary key columns in the region metadata.
608    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
609
610    /// Max timestamp value.
611    max_ts: i64,
612    /// Min timestamp value.
613    min_ts: i64,
614    /// Max sequence number.
615    max_sequence: SequenceNumber,
616}
617
618impl BulkPartConverter {
619    /// Creates a new converter.
620    ///
621    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
622    /// stores primary key columns in arrays additionally.
623    pub fn new(
624        region_metadata: &RegionMetadataRef,
625        schema: SchemaRef,
626        capacity: usize,
627        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
628        store_primary_key_columns: bool,
629    ) -> Self {
630        debug_assert_eq!(
631            region_metadata.primary_key_encoding,
632            primary_key_codec.encoding()
633        );
634
635        let primary_key_column_builders = if store_primary_key_columns
636            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
637        {
638            new_primary_key_column_builders(region_metadata, capacity)
639        } else {
640            Vec::new()
641        };
642
643        Self {
644            schema,
645            primary_key_codec,
646            key_buf: Vec::new(),
647            key_array_builder: PrimaryKeyArrayBuilder::new(),
648            value_builder: ValueBuilder::new(region_metadata, capacity),
649            primary_key_column_builders,
650            min_ts: i64::MAX,
651            max_ts: i64::MIN,
652            max_sequence: SequenceNumber::MIN,
653        }
654    }
655
656    /// Appends a [KeyValues] into the converter.
657    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
658        for kv in key_values.iter() {
659            self.append_key_value(&kv)?;
660        }
661
662        Ok(())
663    }
664
665    /// Appends a [KeyValue] to builders.
666    ///
667    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
668    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
669        // Handles primary key based on encoding type
670        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
671            // For sparse encoding, the primary key is already encoded in the KeyValue
672            // Gets the first (and only) primary key value which contains the encoded key
673            let mut primary_keys = kv.primary_keys();
674            if let Some(encoded) = primary_keys
675                .next()
676                .context(ColumnNotFoundSnafu {
677                    column: PRIMARY_KEY_COLUMN_NAME,
678                })?
679                .try_into_binary()
680                .context(DataTypeMismatchSnafu)?
681            {
682                self.key_array_builder
683                    .append(encoded)
684                    .context(ComputeArrowSnafu)?;
685            } else {
686                self.key_array_builder
687                    .append("")
688                    .context(ComputeArrowSnafu)?;
689            }
690        } else {
691            // For dense encoding, we need to encode the primary key columns
692            self.key_buf.clear();
693            self.primary_key_codec
694                .encode_key_value(kv, &mut self.key_buf)
695                .context(EncodeSnafu)?;
696            self.key_array_builder
697                .append(&self.key_buf)
698                .context(ComputeArrowSnafu)?;
699        };
700
701        // If storing primary key columns, append values to individual builders
702        if !self.primary_key_column_builders.is_empty() {
703            for (builder, pk_value) in self
704                .primary_key_column_builders
705                .iter_mut()
706                .zip(kv.primary_keys())
707            {
708                builder.push_value_ref(pk_value)?;
709            }
710        }
711
712        // Pushes other columns.
713        self.value_builder.push(
714            kv.timestamp(),
715            kv.sequence(),
716            kv.op_type() as u8,
717            kv.fields(),
718        );
719
720        // Updates statistics
721        // Safety: timestamp of kv must be both present and a valid timestamp value.
722        let ts = kv
723            .timestamp()
724            .try_into_timestamp()
725            .unwrap()
726            .unwrap()
727            .value();
728        self.min_ts = self.min_ts.min(ts);
729        self.max_ts = self.max_ts.max(ts);
730        self.max_sequence = self.max_sequence.max(kv.sequence());
731
732        Ok(())
733    }
734
735    /// Converts buffered content into a [BulkPart].
736    ///
737    /// It sorts the record batch by (primary key, timestamp, sequence desc).
738    pub fn convert(mut self) -> Result<BulkPart> {
739        let values = Values::from(self.value_builder);
740        let mut columns =
741            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
742
743        // Build primary key column arrays if enabled.
744        for builder in self.primary_key_column_builders {
745            columns.push(builder.into_arrow_array());
746        }
747        // Then fields columns.
748        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
749        // Time index.
750        let timestamp_index = columns.len();
751        columns.push(values.timestamp.to_arrow_array());
752        // Primary key.
753        let pk_array = self.key_array_builder.finish();
754        columns.push(Arc::new(pk_array));
755        // Sequence and op type.
756        columns.push(values.sequence.to_arrow_array());
757        columns.push(values.op_type.to_arrow_array());
758
759        // The actual datatype of JSON array is data oriented, not to be derived from the Region
760        // metadata, which is static. So here we have to align the schema.
761        let schema = align_schema_with_json_array(self.schema, &columns);
762        let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
763        // Sorts the record batch.
764        let batch = sort_primary_key_record_batch(&batch)?;
765
766        Ok(BulkPart {
767            batch,
768            max_timestamp: self.max_ts,
769            min_timestamp: self.min_ts,
770            sequence: self.max_sequence,
771            timestamp_index,
772            raw_data: None,
773        })
774    }
775}
776
777fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
778    if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
779        return schema;
780    }
781
782    let mut fields = Vec::with_capacity(schema.fields().len());
783    for (field, array) in schema.fields().iter().zip(columns) {
784        if !is_json_extension_type(field) {
785            fields.push(field.clone());
786            continue;
787        }
788
789        let mut field = field.as_ref().clone();
790        field.set_data_type(array.data_type().clone());
791        fields.push(Arc::new(field));
792    }
793
794    Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
795}
796
797fn new_primary_key_column_builders(
798    metadata: &RegionMetadata,
799    capacity: usize,
800) -> Vec<PrimaryKeyColumnBuilder> {
801    metadata
802        .primary_key_columns()
803        .map(|col| {
804            if col.column_schema.data_type.is_string() {
805                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
806                    capacity,
807                    INIT_DICT_VALUE_CAPACITY,
808                    capacity,
809                ))
810            } else {
811                PrimaryKeyColumnBuilder::Vector(
812                    col.column_schema.data_type.create_mutable_vector(capacity),
813                )
814            }
815        })
816        .collect()
817}
818
819/// Sorts the record batch with primary key format.
820pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
821    let total_columns = batch.num_columns();
822    let sort_columns = vec![
823        // Primary key column (ascending)
824        SortColumn {
825            values: batch.column(total_columns - 3).clone(),
826            options: Some(SortOptions {
827                descending: false,
828                nulls_first: true,
829            }),
830        },
831        // Time index column (ascending)
832        SortColumn {
833            values: batch.column(total_columns - 4).clone(),
834            options: Some(SortOptions {
835                descending: false,
836                nulls_first: true,
837            }),
838        },
839        // Sequence column (descending)
840        SortColumn {
841            values: batch.column(total_columns - 2).clone(),
842            options: Some(SortOptions {
843                descending: true,
844                nulls_first: true,
845            }),
846        },
847    ];
848
849    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
850        .context(ComputeArrowSnafu)?;
851
852    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
853}
854
855/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
856/// with the same format as produced by [BulkPartConverter].
857///
858/// This function takes a `BulkPart` where:
859/// - For dense encoding: Primary key columns may be stored as individual columns
860/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
861/// - The batch may not be sorted
862///
863/// And produces a `BulkPart` where:
864/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
865/// - An encoded `__primary_key` dictionary column is present
866/// - The batch is sorted by (primary_key, timestamp, sequence desc)
867///
868/// # Arguments
869///
870/// * `part` - The input `BulkPart` to convert
871/// * `region_metadata` - Region metadata containing schema information
872/// * `primary_key_codec` - Codec for encoding primary keys
873/// * `schema` - Target schema for the output batch
874/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
875///
876/// # Returns
877///
878/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
879/// encoded primary keys and sorted data.
880pub fn convert_bulk_part(
881    part: BulkPart,
882    region_metadata: &RegionMetadataRef,
883    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
884    schema: SchemaRef,
885    store_primary_key_columns: bool,
886) -> Result<Option<BulkPart>> {
887    if part.num_rows() == 0 {
888        return Ok(None);
889    }
890
891    let num_rows = part.num_rows();
892    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
893
894    // Builds a column name-to-index map for efficient lookups
895    let input_schema = part.batch.schema();
896    let column_indices: HashMap<&str, usize> = input_schema
897        .fields()
898        .iter()
899        .enumerate()
900        .map(|(idx, field)| (field.name().as_str(), idx))
901        .collect();
902
903    // Determines the structure of the input batch by looking up columns by name
904    let mut output_columns = Vec::new();
905
906    // Extracts primary key columns if we need to encode them (dense encoding)
907    let pk_array = if is_sparse {
908        // For sparse encoding, the input should already have the __primary_key column
909        // We need to find it in the input batch
910        None
911    } else {
912        // For dense encoding, extract and encode primary key columns by name
913        let pk_vectors: Result<Vec<_>> = region_metadata
914            .primary_key_columns()
915            .map(|col_meta| {
916                let col_idx = column_indices
917                    .get(col_meta.column_schema.name.as_str())
918                    .context(ColumnNotFoundSnafu {
919                        column: &col_meta.column_schema.name,
920                    })?;
921                let col = part.batch.column(*col_idx);
922                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
923            })
924            .collect();
925        let pk_vectors = pk_vectors?;
926
927        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
928        let mut encode_buf = Vec::new();
929
930        for row_idx in 0..num_rows {
931            encode_buf.clear();
932
933            // Collects primary key values with column IDs for this row
934            let pk_values_with_ids: Vec<_> = region_metadata
935                .primary_key
936                .iter()
937                .zip(pk_vectors.iter())
938                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
939                .collect();
940
941            // Encodes the primary key
942            primary_key_codec
943                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
944                .context(EncodeSnafu)?;
945
946            key_array_builder
947                .append(&encode_buf)
948                .context(ComputeArrowSnafu)?;
949        }
950
951        Some(key_array_builder.finish())
952    };
953
954    // Adds primary key columns if storing them (only for dense encoding)
955    if store_primary_key_columns && !is_sparse {
956        for col_meta in region_metadata.primary_key_columns() {
957            let col_idx = column_indices
958                .get(col_meta.column_schema.name.as_str())
959                .context(ColumnNotFoundSnafu {
960                    column: &col_meta.column_schema.name,
961                })?;
962            let col = part.batch.column(*col_idx);
963
964            // Converts to dictionary if needed for string types
965            let col = if col_meta.column_schema.data_type.is_string() {
966                let target_type = ArrowDataType::Dictionary(
967                    Box::new(ArrowDataType::UInt32),
968                    Box::new(ArrowDataType::Utf8),
969                );
970                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
971            } else {
972                col.clone()
973            };
974            output_columns.push(col);
975        }
976    }
977
978    // Adds field columns
979    for col_meta in region_metadata.field_columns() {
980        let col_idx = column_indices
981            .get(col_meta.column_schema.name.as_str())
982            .context(ColumnNotFoundSnafu {
983                column: &col_meta.column_schema.name,
984            })?;
985        output_columns.push(part.batch.column(*col_idx).clone());
986    }
987
988    // Adds timestamp column
989    let new_timestamp_index = output_columns.len();
990    let ts_col_idx = column_indices
991        .get(
992            region_metadata
993                .time_index_column()
994                .column_schema
995                .name
996                .as_str(),
997        )
998        .context(ColumnNotFoundSnafu {
999            column: &region_metadata.time_index_column().column_schema.name,
1000        })?;
1001    output_columns.push(part.batch.column(*ts_col_idx).clone());
1002
1003    // Adds encoded primary key dictionary column
1004    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
1005        Arc::new(pk_dict_array) as ArrayRef
1006    } else {
1007        let pk_col_idx =
1008            column_indices
1009                .get(PRIMARY_KEY_COLUMN_NAME)
1010                .context(ColumnNotFoundSnafu {
1011                    column: PRIMARY_KEY_COLUMN_NAME,
1012                })?;
1013        let col = part.batch.column(*pk_col_idx);
1014
1015        // Casts to dictionary type if needed
1016        let target_type = ArrowDataType::Dictionary(
1017            Box::new(ArrowDataType::UInt32),
1018            Box::new(ArrowDataType::Binary),
1019        );
1020        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
1021    };
1022    output_columns.push(pk_dictionary);
1023
1024    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
1025    output_columns.push(Arc::new(sequence_array) as ArrayRef);
1026
1027    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
1028    output_columns.push(Arc::new(op_type_array) as ArrayRef);
1029
1030    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
1031
1032    // Sorts the batch by (primary_key, timestamp, sequence desc)
1033    let sorted_batch = sort_primary_key_record_batch(&batch)?;
1034
1035    Ok(Some(BulkPart {
1036        batch: sorted_batch,
1037        max_timestamp: part.max_timestamp,
1038        min_timestamp: part.min_timestamp,
1039        sequence: part.sequence,
1040        timestamp_index: new_timestamp_index,
1041        raw_data: None,
1042    }))
1043}
1044
1045#[derive(Debug, Clone)]
1046pub struct EncodedBulkPart {
1047    data: Bytes,
1048    metadata: BulkPartMeta,
1049}
1050
1051impl EncodedBulkPart {
1052    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
1053        Self { data, metadata }
1054    }
1055
1056    pub fn metadata(&self) -> &BulkPartMeta {
1057        &self.metadata
1058    }
1059
1060    /// Returns the size of the encoded data in bytes
1061    pub(crate) fn size_bytes(&self) -> usize {
1062        self.data.len()
1063    }
1064
1065    /// Returns the encoded data.
1066    pub fn data(&self) -> &Bytes {
1067        &self.data
1068    }
1069
1070    /// Creates MemtableStats from this EncodedBulkPart.
1071    pub fn to_memtable_stats(&self) -> MemtableStats {
1072        let meta = &self.metadata;
1073        let ts_type = meta.region_metadata.time_index_type();
1074        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
1075        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
1076
1077        MemtableStats {
1078            estimated_bytes: self.size_bytes(),
1079            time_range: Some((min_ts, max_ts)),
1080            num_rows: meta.num_rows,
1081            num_ranges: 1,
1082            max_sequence: meta.max_sequence,
1083            series_count: meta.num_series as usize,
1084        }
1085    }
1086
1087    /// Converts this `EncodedBulkPart` to `SstInfo`.
1088    ///
1089    /// # Arguments
1090    /// * `file_id` - The SST file ID to assign to this part
1091    ///
1092    /// # Returns
1093    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
1094    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1095        let unit = self.metadata.region_metadata.time_index_type().unit();
1096        let max_row_group_uncompressed_size: u64 = self
1097            .metadata
1098            .parquet_metadata
1099            .row_groups()
1100            .iter()
1101            .map(|rg| {
1102                rg.columns()
1103                    .iter()
1104                    .map(|c| c.uncompressed_size() as u64)
1105                    .sum::<u64>()
1106            })
1107            .max()
1108            .unwrap_or(0);
1109        SstInfo {
1110            file_id,
1111            time_range: (
1112                Timestamp::new(self.metadata.min_timestamp, unit),
1113                Timestamp::new(self.metadata.max_timestamp, unit),
1114            ),
1115            file_size: self.data.len() as u64,
1116            max_row_group_uncompressed_size,
1117            num_rows: self.metadata.num_rows,
1118            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1119            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1120            index_metadata: IndexOutput::default(),
1121            num_series: self.metadata.num_series,
1122        }
1123    }
1124
1125    pub(crate) fn read(
1126        &self,
1127        context: BulkIterContextRef,
1128        sequence: Option<SequenceRange>,
1129        mem_scan_metrics: Option<MemScanMetrics>,
1130    ) -> Result<Option<BoxedRecordBatchIterator>> {
1131        // Compute skip_fields for row group pruning from the configured pre-filter mode.
1132        let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
1133
1134        // use predicate to find row groups to read.
1135        let row_groups_to_read =
1136            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1137
1138        if row_groups_to_read.is_empty() {
1139            // All row groups are filtered.
1140            return Ok(None);
1141        }
1142
1143        let iter = EncodedBulkPartIter::try_new(
1144            self,
1145            context,
1146            row_groups_to_read,
1147            sequence,
1148            mem_scan_metrics,
1149        )?;
1150        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1151    }
1152}
1153
1154// TODO(yingwen): max_sequence
1155#[derive(Debug, Clone)]
1156pub struct BulkPartMeta {
1157    /// Total rows in part.
1158    pub num_rows: usize,
1159    /// Max timestamp in part.
1160    pub max_timestamp: i64,
1161    /// Min timestamp in part.
1162    pub min_timestamp: i64,
1163    /// Part file metadata.
1164    pub parquet_metadata: Arc<ParquetMetaData>,
1165    /// Part region schema.
1166    pub region_metadata: RegionMetadataRef,
1167    /// Number of series.
1168    pub num_series: u64,
1169    /// Maximum sequence number in part.
1170    pub max_sequence: u64,
1171}
1172
1173/// Metrics for encoding a part.
1174#[derive(Default, Debug)]
1175pub struct BulkPartEncodeMetrics {
1176    /// Cost of iterating over the data.
1177    pub iter_cost: Duration,
1178    /// Cost of writing the data.
1179    pub write_cost: Duration,
1180    /// Size of data before encoding.
1181    pub raw_size: usize,
1182    /// Size of data after encoding.
1183    pub encoded_size: usize,
1184    /// Number of rows in part.
1185    pub num_rows: usize,
1186}
1187
1188pub struct BulkPartEncoder {
1189    metadata: RegionMetadataRef,
1190    writer_props: Option<WriterProperties>,
1191}
1192
1193impl BulkPartEncoder {
1194    pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1195        // TODO(yingwen): Skip arrow schema if needed.
1196        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1197        let key_value_meta =
1198            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1199
1200        // TODO(yingwen): Do we need compression?
1201        let writer_props = Some(
1202            WriterProperties::builder()
1203                .set_key_value_metadata(Some(vec![key_value_meta]))
1204                .set_write_batch_size(row_group_size)
1205                .set_max_row_group_row_count(Some(row_group_size))
1206                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1207                .set_column_index_truncate_length(None)
1208                .set_statistics_truncate_length(None)
1209                .build(),
1210        );
1211
1212        Ok(Self {
1213            metadata,
1214            writer_props,
1215        })
1216    }
1217}
1218
1219impl BulkPartEncoder {
1220    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1221    pub fn encode_record_batch_iter(
1222        &self,
1223        iter: BoxedRecordBatchIterator,
1224        arrow_schema: SchemaRef,
1225        min_timestamp: i64,
1226        max_timestamp: i64,
1227        max_sequence: u64,
1228        metrics: &mut BulkPartEncodeMetrics,
1229    ) -> Result<Option<EncodedBulkPart>> {
1230        let mut buf = Vec::with_capacity(4096);
1231        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1232            .context(EncodeMemtableSnafu)?;
1233        let mut total_rows = 0;
1234        let mut series_estimator = SeriesEstimator::default();
1235
1236        // Process each batch from the iterator
1237        let mut iter_start = Instant::now();
1238        for batch_result in iter {
1239            metrics.iter_cost += iter_start.elapsed();
1240            let batch = batch_result?;
1241            if batch.num_rows() == 0 {
1242                continue;
1243            }
1244
1245            series_estimator.update_flat(&batch);
1246            metrics.raw_size += record_batch_estimated_size(&batch);
1247            let write_start = Instant::now();
1248            writer.write(&batch).context(EncodeMemtableSnafu)?;
1249            metrics.write_cost += write_start.elapsed();
1250            total_rows += batch.num_rows();
1251            iter_start = Instant::now();
1252        }
1253        metrics.iter_cost += iter_start.elapsed();
1254
1255        if total_rows == 0 {
1256            return Ok(None);
1257        }
1258
1259        let close_start = Instant::now();
1260        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1261        metrics.write_cost += close_start.elapsed();
1262        metrics.encoded_size += buf.len();
1263        metrics.num_rows += total_rows;
1264
1265        let buf = Bytes::from(buf);
1266        let parquet_metadata = Arc::new(file_metadata);
1267        let num_series = series_estimator.finish();
1268
1269        Ok(Some(EncodedBulkPart {
1270            data: buf,
1271            metadata: BulkPartMeta {
1272                num_rows: total_rows,
1273                max_timestamp,
1274                min_timestamp,
1275                parquet_metadata,
1276                region_metadata: self.metadata.clone(),
1277                num_series,
1278                max_sequence,
1279            },
1280        }))
1281    }
1282
1283    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1284    pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1285        if part.batch.num_rows() == 0 {
1286            return Ok(None);
1287        }
1288
1289        let mut buf = Vec::with_capacity(4096);
1290        let arrow_schema = part.batch.schema();
1291
1292        let file_metadata = {
1293            let mut writer =
1294                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1295                    .context(EncodeMemtableSnafu)?;
1296            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1297            writer.finish().context(EncodeMemtableSnafu)?
1298        };
1299
1300        let buf = Bytes::from(buf);
1301        let parquet_metadata = Arc::new(file_metadata);
1302
1303        Ok(Some(EncodedBulkPart {
1304            data: buf,
1305            metadata: BulkPartMeta {
1306                num_rows: part.batch.num_rows(),
1307                max_timestamp: part.max_timestamp,
1308                min_timestamp: part.min_timestamp,
1309                parquet_metadata,
1310                region_metadata: self.metadata.clone(),
1311                num_series: part.estimated_series_count() as u64,
1312                max_sequence: part.sequence,
1313            },
1314        }))
1315    }
1316}
1317
1318/// Per-batch min/max statistics for the first tag column in a `MultiBulkPart`.
1319///
1320/// Since batches are sorted by primary key, we can extract the min/max of the first tag
1321/// from the first/last row's encoded primary key in each batch. These statistics enable
1322/// batch-level pruning using predicates, analogous to row-group pruning in parquet.
1323#[derive(Debug, Clone)]
1324struct BatchStats {
1325    /// Number of batches.
1326    num_batches: usize,
1327    /// Column id of the first tag.
1328    first_tag_id: ColumnId,
1329    /// Min values of the first tag, one element per batch.
1330    min_values: ArrayRef,
1331    /// Max values of the first tag, one element per batch.
1332    max_values: ArrayRef,
1333}
1334
1335impl BatchStats {
1336    /// Computes batch statistics from a slice of record batches.
1337    ///
1338    /// Returns `None` if there is no primary key (no first tag to collect stats for)
1339    /// or if extracting statistics fails.
1340    fn compute(batches: &[RecordBatch], metadata: &RegionMetadata) -> Option<Self> {
1341        // `primary_key.first()` is correct for both dense and sparse encodings.
1342        // For dense, values follow the order of `metadata.primary_key`.
1343        // For sparse, `decode_leftmost` decodes the first value which also
1344        // corresponds to `primary_key.first()`. See `SparsePrimaryKeyCodec` for format details.
1345        let first_tag_id = *metadata.primary_key.first()?;
1346        let first_tag_column = metadata.column_by_id(first_tag_id)?;
1347        let data_type = &first_tag_column.column_schema.data_type;
1348
1349        let converter = build_primary_key_codec_with_fields(
1350            metadata.primary_key_encoding,
1351            [(first_tag_id, SortField::new(data_type.clone()))].into_iter(),
1352        );
1353        let pk_index = primary_key_column_index(batches.first()?.num_columns());
1354
1355        let mut min_builder = data_type.create_mutable_vector(batches.len());
1356        let mut max_builder = data_type.create_mutable_vector(batches.len());
1357
1358        for batch in batches {
1359            match Self::extract_first_tag_bounds(batch, pk_index, &*converter) {
1360                Some((min_val, max_val)) => {
1361                    min_builder.push_value_ref(&min_val.as_value_ref());
1362                    max_builder.push_value_ref(&max_val.as_value_ref());
1363                }
1364                None => {
1365                    min_builder.push_null();
1366                    max_builder.push_null();
1367                }
1368            }
1369        }
1370
1371        Some(Self {
1372            num_batches: batches.len(),
1373            first_tag_id,
1374            min_values: min_builder.to_vector().to_arrow_array(),
1375            max_values: max_builder.to_vector().to_arrow_array(),
1376        })
1377    }
1378
1379    /// Extracts the first tag value from the first and last rows of a batch.
1380    fn extract_first_tag_bounds(
1381        batch: &RecordBatch,
1382        pk_index: usize,
1383        converter: &dyn PrimaryKeyCodec,
1384    ) -> Option<(datatypes::value::Value, datatypes::value::Value)> {
1385        if batch.num_rows() == 0 {
1386            return None;
1387        }
1388
1389        let pk_dict = batch
1390            .column(pk_index)
1391            .as_any()
1392            .downcast_ref::<PrimaryKeyArray>()?;
1393        let pk_values = pk_dict.values().as_any().downcast_ref::<BinaryArray>()?;
1394
1395        let keys = pk_dict.keys();
1396        let min_key = keys.value(0);
1397        let max_key = keys.value(batch.num_rows() - 1);
1398        let min_bytes = pk_values.value(min_key as usize);
1399        let max_bytes = pk_values.value(max_key as usize);
1400
1401        Some((
1402            converter.decode_leftmost(min_bytes).ok()??,
1403            converter.decode_leftmost(max_bytes).ok()??,
1404        ))
1405    }
1406}
1407
1408/// Adapter implementing `PruningStatistics` for `BatchStats`.
1409///
1410/// Used with `Predicate::prune_with_stats()` to skip batches whose first-tag
1411/// min/max range does not match the query predicate.
1412struct BatchPruningStats<'a> {
1413    stats: &'a BatchStats,
1414    metadata: &'a RegionMetadataRef,
1415}
1416
1417impl PruningStatistics for BatchPruningStats<'_> {
1418    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1419        let col = self.metadata.column_by_name(&column.name)?;
1420        if col.column_id == self.stats.first_tag_id {
1421            Some(self.stats.min_values.clone())
1422        } else {
1423            None
1424        }
1425    }
1426
1427    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1428        let col = self.metadata.column_by_name(&column.name)?;
1429        if col.column_id == self.stats.first_tag_id {
1430            Some(self.stats.max_values.clone())
1431        } else {
1432            None
1433        }
1434    }
1435
1436    fn num_containers(&self) -> usize {
1437        self.stats.num_batches
1438    }
1439
1440    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1441        None
1442    }
1443
1444    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1445        None
1446    }
1447
1448    fn contained(
1449        &self,
1450        _column: &Column,
1451        _values: &std::collections::HashSet<datafusion_common::ScalarValue>,
1452    ) -> Option<BooleanArray> {
1453        None
1454    }
1455}
1456
1457/// Returns true if the predicate references the given column name.
1458fn predicate_references_column(predicate: &table::predicate::Predicate, column_name: &str) -> bool {
1459    let mut columns = HashSet::new();
1460    for expr in predicate.exprs() {
1461        let _ = expr_to_columns(expr, &mut columns);
1462    }
1463    columns.iter().any(|col| col.name == column_name)
1464}
1465
1466/// Returns true if the batch should be pruned (skipped) based on the first-tag min/max
1467/// statistics and the predicate in the context. Returns false if no pruning is possible
1468/// (no primary key, no predicate, or the batch matches the predicate).
1469pub(crate) fn should_prune_bulk_part(
1470    batch: &RecordBatch,
1471    context: &BulkIterContext,
1472    metadata: &RegionMetadata,
1473) -> bool {
1474    let predicate = match &context.predicate {
1475        Some(p) => p,
1476        None => return false,
1477    };
1478    // Check if the predicate references the first tag column to avoid computing
1479    // expensive batch statistics when they won't help with pruning.
1480    let first_tag_id = match metadata.primary_key.first() {
1481        Some(id) => *id,
1482        None => return false,
1483    };
1484    // Safety: `first_tag_id` comes from `metadata.primary_key` so the column always exists.
1485    let first_tag_name = &metadata
1486        .column_by_id(first_tag_id)
1487        .unwrap()
1488        .column_schema
1489        .name;
1490    if !predicate_references_column(predicate, first_tag_name) {
1491        return false;
1492    }
1493    let stats = match BatchStats::compute(std::slice::from_ref(batch), metadata) {
1494        Some(s) => s,
1495        None => return false,
1496    };
1497    let region_meta = context.read_format().metadata();
1498    let pruning_stats = BatchPruningStats {
1499        stats: &stats,
1500        metadata: region_meta,
1501    };
1502    let mask = predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1503    !mask.first().copied().unwrap_or(true)
1504}
1505
1506/// A collection of ordered RecordBatches representing a bulk part without parquet encoding.
1507///
1508/// Similar to `EncodedBulkPart` but stores raw RecordBatches instead of encoded parquet data.
1509/// The RecordBatches must be ordered by (primary key, timestamp, sequence desc).
1510/// Uses SmallVec to optimize for the common case of few batches while avoiding heap allocation.
1511#[derive(Debug, Clone)]
1512pub struct MultiBulkPart {
1513    /// Ordered record batches. SmallVec optimized for up to 4 batches inline.
1514    batches: SmallVec<[RecordBatch; 4]>,
1515    /// Total rows across all batches.
1516    total_rows: usize,
1517    /// Max timestamp in part.
1518    max_timestamp: i64,
1519    /// Min timestamp in part.
1520    min_timestamp: i64,
1521    /// Max sequence number in part.
1522    max_sequence: SequenceNumber,
1523    /// Number of series.
1524    series_count: usize,
1525    /// Pre-computed per-batch statistics for the first tag column.
1526    /// `None` if there is no primary key.
1527    batch_stats: Option<BatchStats>,
1528}
1529
1530impl MultiBulkPart {
1531    /// Creates a new MultiBulkPart from a single BulkPart.
1532    pub fn from_bulk_part(part: BulkPart, metadata: &RegionMetadata) -> Self {
1533        let num_rows = part.num_rows();
1534        let series_count = part.estimated_series_count();
1535        let batch_stats = BatchStats::compute(std::slice::from_ref(&part.batch), metadata);
1536        let mut batches = SmallVec::new();
1537        batches.push(part.batch);
1538
1539        Self {
1540            batches,
1541            total_rows: num_rows,
1542            max_timestamp: part.max_timestamp,
1543            min_timestamp: part.min_timestamp,
1544            max_sequence: part.sequence,
1545            series_count,
1546            batch_stats,
1547        }
1548    }
1549
1550    /// Creates a new MultiBulkPart from multiple ordered RecordBatches.
1551    ///
1552    /// # Arguments
1553    /// * `batches` - Ordered record batches
1554    /// * `min_timestamp` - Minimum timestamp across all batches
1555    /// * `max_timestamp` - Maximum timestamp across all batches
1556    /// * `max_sequence` - Maximum sequence number across all batches
1557    /// * `series_count` - Number of series in the batches
1558    /// * `metadata` - Region metadata for computing batch statistics
1559    ///
1560    /// # Panics
1561    /// Panics if batches is empty.
1562    pub fn new(
1563        batches: Vec<RecordBatch>,
1564        min_timestamp: i64,
1565        max_timestamp: i64,
1566        max_sequence: SequenceNumber,
1567        series_count: usize,
1568        metadata: &RegionMetadata,
1569    ) -> Self {
1570        assert!(!batches.is_empty(), "batches must not be empty");
1571
1572        let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1573        let batch_stats = BatchStats::compute(&batches, metadata);
1574
1575        Self {
1576            batches: SmallVec::from_vec(batches),
1577            total_rows,
1578            max_timestamp,
1579            min_timestamp,
1580            max_sequence,
1581            series_count,
1582            batch_stats,
1583        }
1584    }
1585
1586    /// Returns the total number of rows across all batches.
1587    pub fn num_rows(&self) -> usize {
1588        self.total_rows
1589    }
1590
1591    /// Returns the minimum timestamp.
1592    pub fn min_timestamp(&self) -> i64 {
1593        self.min_timestamp
1594    }
1595
1596    /// Returns the maximum timestamp.
1597    pub fn max_timestamp(&self) -> i64 {
1598        self.max_timestamp
1599    }
1600
1601    /// Returns the maximum sequence number.
1602    pub fn max_sequence(&self) -> SequenceNumber {
1603        self.max_sequence
1604    }
1605
1606    /// Returns the number of series.
1607    pub fn series_count(&self) -> usize {
1608        self.series_count
1609    }
1610
1611    /// Returns the number of record batches in this part.
1612    pub fn num_batches(&self) -> usize {
1613        self.batches.len()
1614    }
1615
1616    /// Returns the estimated memory size of all batches.
1617    pub(crate) fn estimated_size(&self) -> usize {
1618        self.batches.iter().map(record_batch_estimated_size).sum()
1619    }
1620
1621    /// Reads data from this part with the given context and filters.
1622    ///
1623    /// If batch-level statistics are available and a predicate is set, prunes
1624    /// batches whose first-tag min/max range doesn't match the predicate before
1625    /// creating the iterator.
1626    pub(crate) fn read(
1627        &self,
1628        context: BulkIterContextRef,
1629        sequence: Option<SequenceRange>,
1630        mem_scan_metrics: Option<MemScanMetrics>,
1631    ) -> Result<Option<BoxedRecordBatchIterator>> {
1632        if self.batches.is_empty() {
1633            return Ok(None);
1634        }
1635
1636        let batches_to_read = self.prune_batches(&context);
1637
1638        if batches_to_read.is_empty() {
1639            return Ok(None);
1640        }
1641
1642        let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1643            batches_to_read,
1644            context,
1645            sequence,
1646            self.series_count,
1647            mem_scan_metrics,
1648        );
1649        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1650    }
1651
1652    /// Prunes batches using the first-tag min/max statistics and the predicate.
1653    /// Returns all batches if no stats or no predicate is available.
1654    fn prune_batches(&self, context: &BulkIterContextRef) -> Vec<RecordBatch> {
1655        if let Some(stats) = &self.batch_stats
1656            && let Some(predicate) = &context.predicate
1657        {
1658            let region_meta = context.read_format().metadata();
1659            let pruning_stats = BatchPruningStats {
1660                stats,
1661                metadata: region_meta,
1662            };
1663            let mask =
1664                predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1665            self.batches
1666                .iter()
1667                .zip(mask.iter())
1668                .filter_map(
1669                    |(batch, &selected)| {
1670                        if selected { Some(batch.clone()) } else { None }
1671                    },
1672                )
1673                .collect()
1674        } else {
1675            self.batches.iter().cloned().collect()
1676        }
1677    }
1678
1679    /// Converts this `MultiBulkPart` to `MemtableStats`.
1680    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1681        let ts_type = region_metadata.time_index_type();
1682        let min_ts = ts_type.create_timestamp(self.min_timestamp);
1683        let max_ts = ts_type.create_timestamp(self.max_timestamp);
1684
1685        MemtableStats {
1686            estimated_bytes: self.estimated_size(),
1687            time_range: Some((min_ts, max_ts)),
1688            num_rows: self.num_rows(),
1689            num_ranges: 1,
1690            max_sequence: self.max_sequence,
1691            series_count: self.series_count,
1692        }
1693    }
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698    use api::v1::{Row, SemanticType, WriteHint};
1699    use datafusion_common::ScalarValue;
1700    use datatypes::arrow::array::{
1701        BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1702    };
1703    use datatypes::arrow::datatypes::UInt32Type;
1704    use datatypes::prelude::{ConcreteDataType, Value};
1705    use datatypes::schema::ColumnSchema;
1706    use mito_codec::row_converter::build_primary_key_codec;
1707    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1708    use store_api::storage::RegionId;
1709    use store_api::storage::consts::ReservedColumnId;
1710    use table::predicate::Predicate;
1711
1712    use super::*;
1713    use crate::memtable::bulk::context::BulkIterContext;
1714    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1715    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1716
1717    struct MutationInput<'a> {
1718        k0: &'a str,
1719        k1: u32,
1720        timestamps: &'a [i64],
1721        v1: &'a [Option<f64>],
1722        sequence: u64,
1723    }
1724
1725    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1726        let metadata = metadata_for_test();
1727        let kvs = input
1728            .iter()
1729            .map(|m| {
1730                build_key_values_with_ts_seq_values(
1731                    &metadata,
1732                    m.k0.to_string(),
1733                    m.k1,
1734                    m.timestamps.iter().copied(),
1735                    m.v1.iter().copied(),
1736                    m.sequence,
1737                )
1738            })
1739            .collect::<Vec<_>>();
1740        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1741        let primary_key_codec = build_primary_key_codec(&metadata);
1742        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1743        for kv in kvs {
1744            converter.append_key_values(&kv).unwrap();
1745        }
1746        let part = converter.convert().unwrap();
1747        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1748        encoder.encode_part(&part).unwrap().unwrap()
1749    }
1750
1751    #[test]
1752    fn test_write_and_read_part_projection() {
1753        let part = encode(&[
1754            MutationInput {
1755                k0: "a",
1756                k1: 0,
1757                timestamps: &[1],
1758                v1: &[Some(0.1)],
1759                sequence: 0,
1760            },
1761            MutationInput {
1762                k0: "b",
1763                k1: 0,
1764                timestamps: &[1],
1765                v1: &[Some(0.0)],
1766                sequence: 0,
1767            },
1768            MutationInput {
1769                k0: "a",
1770                k1: 0,
1771                timestamps: &[2],
1772                v1: &[Some(0.2)],
1773                sequence: 1,
1774            },
1775        ]);
1776
1777        let projection = &[4u32];
1778        let reader = part
1779            .read(
1780                Arc::new(
1781                    BulkIterContext::new(
1782                        part.metadata.region_metadata.clone(),
1783                        Some(projection.as_slice()),
1784                        None,
1785                        false,
1786                    )
1787                    .unwrap(),
1788                ),
1789                None,
1790                None,
1791            )
1792            .unwrap()
1793            .expect("expect at least one row group");
1794
1795        let mut total_rows_read = 0;
1796        let mut field: Vec<f64> = vec![];
1797        for res in reader {
1798            let batch = res.unwrap();
1799            assert_eq!(5, batch.num_columns());
1800            field.extend_from_slice(
1801                batch
1802                    .column(0)
1803                    .as_any()
1804                    .downcast_ref::<Float64Array>()
1805                    .unwrap()
1806                    .values(),
1807            );
1808            total_rows_read += batch.num_rows();
1809        }
1810        assert_eq!(3, total_rows_read);
1811        assert_eq!(vec![0.1, 0.2, 0.0], field);
1812    }
1813
1814    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1815        let metadata = metadata_for_test();
1816        let kvs = key_values
1817            .into_iter()
1818            .map(|(k0, k1, (start, end), sequence)| {
1819                let ts = start..end;
1820                let v1 = (start..end).map(|_| None);
1821                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1822            })
1823            .collect::<Vec<_>>();
1824        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1825        let primary_key_codec = build_primary_key_codec(&metadata);
1826        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1827        for kv in kvs {
1828            converter.append_key_values(&kv).unwrap();
1829        }
1830        let part = converter.convert().unwrap();
1831        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1832        encoder.encode_part(&part).unwrap().unwrap()
1833    }
1834
1835    fn check_prune_row_group(
1836        part: &EncodedBulkPart,
1837        predicate: Option<Predicate>,
1838        expected_rows: usize,
1839    ) {
1840        let context = Arc::new(
1841            BulkIterContext::new(
1842                part.metadata.region_metadata.clone(),
1843                None,
1844                predicate,
1845                false,
1846            )
1847            .unwrap(),
1848        );
1849        let reader = part
1850            .read(context, None, None)
1851            .unwrap()
1852            .expect("expect at least one row group");
1853        let mut total_rows_read = 0;
1854        for res in reader {
1855            let batch = res.unwrap();
1856            total_rows_read += batch.num_rows();
1857        }
1858        // Should only read row group 1.
1859        assert_eq!(expected_rows, total_rows_read);
1860    }
1861
1862    #[test]
1863    fn test_prune_row_groups() {
1864        let part = prepare(vec![
1865            ("a", 0, (0, 40), 1),
1866            ("a", 1, (0, 60), 1),
1867            ("b", 0, (0, 100), 2),
1868            ("b", 1, (100, 180), 3),
1869            ("b", 1, (180, 210), 4),
1870        ]);
1871
1872        let context = Arc::new(
1873            BulkIterContext::new(
1874                part.metadata.region_metadata.clone(),
1875                None,
1876                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1877                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1878                )])),
1879                false,
1880            )
1881            .unwrap(),
1882        );
1883        assert!(part.read(context, None, None).unwrap().is_none());
1884
1885        check_prune_row_group(&part, None, 310);
1886
1887        check_prune_row_group(
1888            &part,
1889            Some(Predicate::new(vec![
1890                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1891                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1892            ])),
1893            40,
1894        );
1895
1896        check_prune_row_group(
1897            &part,
1898            Some(Predicate::new(vec![
1899                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1900                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1901            ])),
1902            60,
1903        );
1904
1905        check_prune_row_group(
1906            &part,
1907            Some(Predicate::new(vec![
1908                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1909            ])),
1910            100,
1911        );
1912
1913        check_prune_row_group(
1914            &part,
1915            Some(Predicate::new(vec![
1916                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1917                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1918            ])),
1919            100,
1920        );
1921
1922        // Predicates over field column can do precise filtering.
1923        check_prune_row_group(
1924            &part,
1925            Some(Predicate::new(vec![
1926                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1927            ])),
1928            1,
1929        );
1930    }
1931
1932    #[test]
1933    fn test_bulk_part_converter_append_and_convert() {
1934        let metadata = metadata_for_test();
1935        let capacity = 100;
1936        let primary_key_codec = build_primary_key_codec(&metadata);
1937        let schema = to_flat_sst_arrow_schema(
1938            &metadata,
1939            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1940        );
1941
1942        let mut converter =
1943            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1944
1945        let key_values1 = build_key_values_with_ts_seq_values(
1946            &metadata,
1947            "key1".to_string(),
1948            1u32,
1949            vec![1000, 2000].into_iter(),
1950            vec![Some(1.0), Some(2.0)].into_iter(),
1951            1,
1952        );
1953
1954        let key_values2 = build_key_values_with_ts_seq_values(
1955            &metadata,
1956            "key2".to_string(),
1957            2u32,
1958            vec![1500].into_iter(),
1959            vec![Some(3.0)].into_iter(),
1960            2,
1961        );
1962
1963        converter.append_key_values(&key_values1).unwrap();
1964        converter.append_key_values(&key_values2).unwrap();
1965
1966        let bulk_part = converter.convert().unwrap();
1967
1968        assert_eq!(bulk_part.num_rows(), 3);
1969        assert_eq!(bulk_part.min_timestamp, 1000);
1970        assert_eq!(bulk_part.max_timestamp, 2000);
1971        assert_eq!(bulk_part.sequence, 2);
1972        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1973
1974        // Validate primary key columns are stored
1975        // Schema should include primary key columns k0 and k1 at the beginning
1976        let schema = bulk_part.batch.schema();
1977        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1978        assert_eq!(
1979            field_names,
1980            vec![
1981                "k0",
1982                "k1",
1983                "v0",
1984                "v1",
1985                "ts",
1986                "__primary_key",
1987                "__sequence",
1988                "__op_type"
1989            ]
1990        );
1991    }
1992
1993    #[test]
1994    fn test_bulk_part_converter_sorting() {
1995        let metadata = metadata_for_test();
1996        let capacity = 100;
1997        let primary_key_codec = build_primary_key_codec(&metadata);
1998        let schema = to_flat_sst_arrow_schema(
1999            &metadata,
2000            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2001        );
2002
2003        let mut converter =
2004            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2005
2006        let key_values1 = build_key_values_with_ts_seq_values(
2007            &metadata,
2008            "z_key".to_string(),
2009            3u32,
2010            vec![3000].into_iter(),
2011            vec![Some(3.0)].into_iter(),
2012            3,
2013        );
2014
2015        let key_values2 = build_key_values_with_ts_seq_values(
2016            &metadata,
2017            "a_key".to_string(),
2018            1u32,
2019            vec![1000].into_iter(),
2020            vec![Some(1.0)].into_iter(),
2021            1,
2022        );
2023
2024        let key_values3 = build_key_values_with_ts_seq_values(
2025            &metadata,
2026            "m_key".to_string(),
2027            2u32,
2028            vec![2000].into_iter(),
2029            vec![Some(2.0)].into_iter(),
2030            2,
2031        );
2032
2033        converter.append_key_values(&key_values1).unwrap();
2034        converter.append_key_values(&key_values2).unwrap();
2035        converter.append_key_values(&key_values3).unwrap();
2036
2037        let bulk_part = converter.convert().unwrap();
2038
2039        assert_eq!(bulk_part.num_rows(), 3);
2040
2041        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
2042        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
2043
2044        let ts_array = ts_column
2045            .as_any()
2046            .downcast_ref::<TimestampMillisecondArray>()
2047            .unwrap();
2048        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2049
2050        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2051        assert_eq!(seq_array.values(), &[1, 2, 3]);
2052
2053        // Validate primary key columns are stored
2054        let schema = bulk_part.batch.schema();
2055        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2056        assert_eq!(
2057            field_names,
2058            vec![
2059                "k0",
2060                "k1",
2061                "v0",
2062                "v1",
2063                "ts",
2064                "__primary_key",
2065                "__sequence",
2066                "__op_type"
2067            ]
2068        );
2069    }
2070
2071    #[test]
2072    fn test_bulk_part_converter_empty() {
2073        let metadata = metadata_for_test();
2074        let capacity = 10;
2075        let primary_key_codec = build_primary_key_codec(&metadata);
2076        let schema = to_flat_sst_arrow_schema(
2077            &metadata,
2078            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2079        );
2080
2081        let converter =
2082            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2083
2084        let bulk_part = converter.convert().unwrap();
2085
2086        assert_eq!(bulk_part.num_rows(), 0);
2087        assert_eq!(bulk_part.min_timestamp, i64::MAX);
2088        assert_eq!(bulk_part.max_timestamp, i64::MIN);
2089        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2090
2091        // Validate primary key columns are present in schema even for empty batch
2092        let schema = bulk_part.batch.schema();
2093        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2094        assert_eq!(
2095            field_names,
2096            vec![
2097                "k0",
2098                "k1",
2099                "v0",
2100                "v1",
2101                "ts",
2102                "__primary_key",
2103                "__sequence",
2104                "__op_type"
2105            ]
2106        );
2107    }
2108
2109    #[test]
2110    fn test_bulk_part_converter_without_primary_key_columns() {
2111        let metadata = metadata_for_test();
2112        let primary_key_codec = build_primary_key_codec(&metadata);
2113        let schema = to_flat_sst_arrow_schema(
2114            &metadata,
2115            &FlatSchemaOptions {
2116                raw_pk_columns: false,
2117                string_pk_use_dict: true,
2118                ..Default::default()
2119            },
2120        );
2121
2122        let capacity = 100;
2123        let mut converter =
2124            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2125
2126        let key_values1 = build_key_values_with_ts_seq_values(
2127            &metadata,
2128            "key1".to_string(),
2129            1u32,
2130            vec![1000, 2000].into_iter(),
2131            vec![Some(1.0), Some(2.0)].into_iter(),
2132            1,
2133        );
2134
2135        let key_values2 = build_key_values_with_ts_seq_values(
2136            &metadata,
2137            "key2".to_string(),
2138            2u32,
2139            vec![1500].into_iter(),
2140            vec![Some(3.0)].into_iter(),
2141            2,
2142        );
2143
2144        converter.append_key_values(&key_values1).unwrap();
2145        converter.append_key_values(&key_values2).unwrap();
2146
2147        let bulk_part = converter.convert().unwrap();
2148
2149        assert_eq!(bulk_part.num_rows(), 3);
2150        assert_eq!(bulk_part.min_timestamp, 1000);
2151        assert_eq!(bulk_part.max_timestamp, 2000);
2152        assert_eq!(bulk_part.sequence, 2);
2153        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2154
2155        // Validate primary key columns are NOT stored individually
2156        let schema = bulk_part.batch.schema();
2157        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2158        assert_eq!(
2159            field_names,
2160            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2161        );
2162    }
2163
2164    #[allow(clippy::too_many_arguments)]
2165    fn build_key_values_with_sparse_encoding(
2166        metadata: &RegionMetadataRef,
2167        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2168        table_id: u32,
2169        tsid: u64,
2170        k0: String,
2171        k1: String,
2172        timestamps: impl Iterator<Item = i64>,
2173        values: impl Iterator<Item = Option<f64>>,
2174        sequence: SequenceNumber,
2175    ) -> KeyValues {
2176        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
2177        let pk_values = vec![
2178            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2179            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2180            (0, Value::String(k0.clone().into())),
2181            (1, Value::String(k1.clone().into())),
2182        ];
2183        let mut encoded_key = Vec::new();
2184        primary_key_codec
2185            .encode_values(&pk_values, &mut encoded_key)
2186            .unwrap();
2187        assert!(!encoded_key.is_empty());
2188
2189        // Create schema for sparse encoding: __primary_key, ts, v0, v1
2190        let column_schema = vec![
2191            api::v1::ColumnSchema {
2192                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2193                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2194                    ConcreteDataType::binary_datatype(),
2195                )
2196                .unwrap()
2197                .datatype() as i32,
2198                semantic_type: api::v1::SemanticType::Tag as i32,
2199                ..Default::default()
2200            },
2201            api::v1::ColumnSchema {
2202                column_name: "ts".to_string(),
2203                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2204                    ConcreteDataType::timestamp_millisecond_datatype(),
2205                )
2206                .unwrap()
2207                .datatype() as i32,
2208                semantic_type: api::v1::SemanticType::Timestamp as i32,
2209                ..Default::default()
2210            },
2211            api::v1::ColumnSchema {
2212                column_name: "v0".to_string(),
2213                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2214                    ConcreteDataType::int64_datatype(),
2215                )
2216                .unwrap()
2217                .datatype() as i32,
2218                semantic_type: api::v1::SemanticType::Field as i32,
2219                ..Default::default()
2220            },
2221            api::v1::ColumnSchema {
2222                column_name: "v1".to_string(),
2223                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2224                    ConcreteDataType::float64_datatype(),
2225                )
2226                .unwrap()
2227                .datatype() as i32,
2228                semantic_type: api::v1::SemanticType::Field as i32,
2229                ..Default::default()
2230            },
2231        ];
2232
2233        let rows = timestamps
2234            .zip(values)
2235            .map(|(ts, v)| Row {
2236                values: vec![
2237                    api::v1::Value {
2238                        value_data: Some(api::v1::value::ValueData::BinaryValue(
2239                            encoded_key.clone(),
2240                        )),
2241                    },
2242                    api::v1::Value {
2243                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2244                    },
2245                    api::v1::Value {
2246                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2247                    },
2248                    api::v1::Value {
2249                        value_data: v.map(api::v1::value::ValueData::F64Value),
2250                    },
2251                ],
2252            })
2253            .collect();
2254
2255        let mutation = api::v1::Mutation {
2256            op_type: 1,
2257            sequence,
2258            rows: Some(api::v1::Rows {
2259                schema: column_schema,
2260                rows,
2261            }),
2262            write_hint: Some(WriteHint {
2263                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2264            }),
2265        };
2266        KeyValues::new(metadata.as_ref(), mutation).unwrap()
2267    }
2268
2269    #[test]
2270    fn test_bulk_part_converter_sparse_primary_key_encoding() {
2271        use api::v1::SemanticType;
2272        use datatypes::schema::ColumnSchema;
2273        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2274        use store_api::storage::RegionId;
2275
2276        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2277        builder
2278            .push_column_metadata(ColumnMetadata {
2279                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2280                semantic_type: SemanticType::Tag,
2281                column_id: 0,
2282            })
2283            .push_column_metadata(ColumnMetadata {
2284                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2285                semantic_type: SemanticType::Tag,
2286                column_id: 1,
2287            })
2288            .push_column_metadata(ColumnMetadata {
2289                column_schema: ColumnSchema::new(
2290                    "ts",
2291                    ConcreteDataType::timestamp_millisecond_datatype(),
2292                    false,
2293                ),
2294                semantic_type: SemanticType::Timestamp,
2295                column_id: 2,
2296            })
2297            .push_column_metadata(ColumnMetadata {
2298                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2299                semantic_type: SemanticType::Field,
2300                column_id: 3,
2301            })
2302            .push_column_metadata(ColumnMetadata {
2303                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2304                semantic_type: SemanticType::Field,
2305                column_id: 4,
2306            })
2307            .primary_key(vec![0, 1])
2308            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2309        let metadata = Arc::new(builder.build().unwrap());
2310
2311        let primary_key_codec = build_primary_key_codec(&metadata);
2312        let schema = to_flat_sst_arrow_schema(
2313            &metadata,
2314            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2315        );
2316
2317        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2318        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2319
2320        let capacity = 100;
2321        let mut converter =
2322            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2323
2324        let key_values1 = build_key_values_with_sparse_encoding(
2325            &metadata,
2326            &primary_key_codec,
2327            2048u32, // table_id
2328            100u64,  // tsid
2329            "key11".to_string(),
2330            "key21".to_string(),
2331            vec![1000, 2000].into_iter(),
2332            vec![Some(1.0), Some(2.0)].into_iter(),
2333            1,
2334        );
2335
2336        let key_values2 = build_key_values_with_sparse_encoding(
2337            &metadata,
2338            &primary_key_codec,
2339            4096u32, // table_id
2340            200u64,  // tsid
2341            "key12".to_string(),
2342            "key22".to_string(),
2343            vec![1500].into_iter(),
2344            vec![Some(3.0)].into_iter(),
2345            2,
2346        );
2347
2348        converter.append_key_values(&key_values1).unwrap();
2349        converter.append_key_values(&key_values2).unwrap();
2350
2351        let bulk_part = converter.convert().unwrap();
2352
2353        assert_eq!(bulk_part.num_rows(), 3);
2354        assert_eq!(bulk_part.min_timestamp, 1000);
2355        assert_eq!(bulk_part.max_timestamp, 2000);
2356        assert_eq!(bulk_part.sequence, 2);
2357        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2358
2359        // For sparse encoding, primary key columns should NOT be stored individually
2360        // even when store_primary_key_columns is true, because sparse encoding
2361        // stores the encoded primary key in the __primary_key column
2362        let schema = bulk_part.batch.schema();
2363        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2364        assert_eq!(
2365            field_names,
2366            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2367        );
2368
2369        // Verify the __primary_key column contains encoded sparse keys
2370        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2371        let dict_array = primary_key_column
2372            .as_any()
2373            .downcast_ref::<DictionaryArray<UInt32Type>>()
2374            .unwrap();
2375
2376        // Should have non-zero entries indicating encoded primary keys
2377        assert!(!dict_array.is_empty());
2378        assert_eq!(dict_array.len(), 3); // 3 rows total
2379
2380        // Verify values are properly encoded binary data (not empty)
2381        let values = dict_array
2382            .values()
2383            .as_any()
2384            .downcast_ref::<BinaryArray>()
2385            .unwrap();
2386        for i in 0..values.len() {
2387            assert!(
2388                !values.value(i).is_empty(),
2389                "Encoded primary key should not be empty"
2390            );
2391        }
2392    }
2393
2394    #[test]
2395    fn test_convert_bulk_part_empty() {
2396        let metadata = metadata_for_test();
2397        let schema = to_flat_sst_arrow_schema(
2398            &metadata,
2399            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2400        );
2401        let primary_key_codec = build_primary_key_codec(&metadata);
2402
2403        // Create empty batch
2404        let empty_batch = RecordBatch::new_empty(schema.clone());
2405        let empty_part = BulkPart {
2406            batch: empty_batch,
2407            max_timestamp: 0,
2408            min_timestamp: 0,
2409            sequence: 0,
2410            timestamp_index: 0,
2411            raw_data: None,
2412        };
2413
2414        let result =
2415            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2416        assert!(result.is_none());
2417    }
2418
2419    #[test]
2420    fn test_convert_bulk_part_dense_with_pk_columns() {
2421        let metadata = metadata_for_test();
2422        let primary_key_codec = build_primary_key_codec(&metadata);
2423
2424        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2425            "key1", "key2", "key1",
2426        ]));
2427        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2428        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2429        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2430        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2431
2432        let input_schema = Arc::new(Schema::new(vec![
2433            Field::new("k0", ArrowDataType::Utf8, false),
2434            Field::new("k1", ArrowDataType::UInt32, false),
2435            Field::new("v0", ArrowDataType::Int64, true),
2436            Field::new("v1", ArrowDataType::Float64, true),
2437            Field::new(
2438                "ts",
2439                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2440                false,
2441            ),
2442        ]));
2443
2444        let input_batch = RecordBatch::try_new(
2445            input_schema,
2446            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2447        )
2448        .unwrap();
2449
2450        let part = BulkPart {
2451            batch: input_batch,
2452            max_timestamp: 2000,
2453            min_timestamp: 1000,
2454            sequence: 5,
2455            timestamp_index: 4,
2456            raw_data: None,
2457        };
2458
2459        let output_schema = to_flat_sst_arrow_schema(
2460            &metadata,
2461            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2462        );
2463
2464        let result = convert_bulk_part(
2465            part,
2466            &metadata,
2467            primary_key_codec,
2468            output_schema,
2469            true, // store primary key columns
2470        )
2471        .unwrap();
2472
2473        let converted = result.unwrap();
2474
2475        assert_eq!(converted.num_rows(), 3);
2476        assert_eq!(converted.max_timestamp, 2000);
2477        assert_eq!(converted.min_timestamp, 1000);
2478        assert_eq!(converted.sequence, 5);
2479
2480        let schema = converted.batch.schema();
2481        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2482        assert_eq!(
2483            field_names,
2484            vec![
2485                "k0",
2486                "k1",
2487                "v0",
2488                "v1",
2489                "ts",
2490                "__primary_key",
2491                "__sequence",
2492                "__op_type"
2493            ]
2494        );
2495
2496        let k0_col = converted.batch.column_by_name("k0").unwrap();
2497        assert!(matches!(
2498            k0_col.data_type(),
2499            ArrowDataType::Dictionary(_, _)
2500        ));
2501
2502        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2503        let dict_array = pk_col
2504            .as_any()
2505            .downcast_ref::<DictionaryArray<UInt32Type>>()
2506            .unwrap();
2507        let keys = dict_array.keys();
2508
2509        assert_eq!(keys.len(), 3);
2510    }
2511
2512    #[test]
2513    fn test_convert_bulk_part_dense_without_pk_columns() {
2514        let metadata = metadata_for_test();
2515        let primary_key_codec = build_primary_key_codec(&metadata);
2516
2517        // Create input batch with primary key columns (k0, k1)
2518        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2519        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2520        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2521        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2522        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2523
2524        let input_schema = Arc::new(Schema::new(vec![
2525            Field::new("k0", ArrowDataType::Utf8, false),
2526            Field::new("k1", ArrowDataType::UInt32, false),
2527            Field::new("v0", ArrowDataType::Int64, true),
2528            Field::new("v1", ArrowDataType::Float64, true),
2529            Field::new(
2530                "ts",
2531                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2532                false,
2533            ),
2534        ]));
2535
2536        let input_batch = RecordBatch::try_new(
2537            input_schema,
2538            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2539        )
2540        .unwrap();
2541
2542        let part = BulkPart {
2543            batch: input_batch,
2544            max_timestamp: 2000,
2545            min_timestamp: 1000,
2546            sequence: 3,
2547            timestamp_index: 4,
2548            raw_data: None,
2549        };
2550
2551        let output_schema = to_flat_sst_arrow_schema(
2552            &metadata,
2553            &FlatSchemaOptions {
2554                raw_pk_columns: false,
2555                string_pk_use_dict: true,
2556                ..Default::default()
2557            },
2558        );
2559
2560        let result = convert_bulk_part(
2561            part,
2562            &metadata,
2563            primary_key_codec,
2564            output_schema,
2565            false, // don't store primary key columns
2566        )
2567        .unwrap();
2568
2569        let converted = result.unwrap();
2570
2571        assert_eq!(converted.num_rows(), 2);
2572        assert_eq!(converted.max_timestamp, 2000);
2573        assert_eq!(converted.min_timestamp, 1000);
2574        assert_eq!(converted.sequence, 3);
2575
2576        // Verify schema does NOT include individual primary key columns
2577        let schema = converted.batch.schema();
2578        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2579        assert_eq!(
2580            field_names,
2581            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2582        );
2583
2584        // Verify __primary_key column is present and is a dictionary
2585        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2586        assert!(matches!(
2587            pk_col.data_type(),
2588            ArrowDataType::Dictionary(_, _)
2589        ));
2590    }
2591
2592    #[test]
2593    fn test_convert_bulk_part_sparse_encoding() {
2594        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2595        builder
2596            .push_column_metadata(ColumnMetadata {
2597                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2598                semantic_type: SemanticType::Tag,
2599                column_id: 0,
2600            })
2601            .push_column_metadata(ColumnMetadata {
2602                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2603                semantic_type: SemanticType::Tag,
2604                column_id: 1,
2605            })
2606            .push_column_metadata(ColumnMetadata {
2607                column_schema: ColumnSchema::new(
2608                    "ts",
2609                    ConcreteDataType::timestamp_millisecond_datatype(),
2610                    false,
2611                ),
2612                semantic_type: SemanticType::Timestamp,
2613                column_id: 2,
2614            })
2615            .push_column_metadata(ColumnMetadata {
2616                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2617                semantic_type: SemanticType::Field,
2618                column_id: 3,
2619            })
2620            .push_column_metadata(ColumnMetadata {
2621                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2622                semantic_type: SemanticType::Field,
2623                column_id: 4,
2624            })
2625            .primary_key(vec![0, 1])
2626            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2627        let metadata = Arc::new(builder.build().unwrap());
2628
2629        let primary_key_codec = build_primary_key_codec(&metadata);
2630
2631        // Create input batch with __primary_key column (sparse encoding)
2632        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2633            b"encoded_key_1".as_slice(),
2634            b"encoded_key_2".as_slice(),
2635        ]));
2636        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2637        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2638        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2639
2640        let input_schema = Arc::new(Schema::new(vec![
2641            Field::new("__primary_key", ArrowDataType::Binary, false),
2642            Field::new("v0", ArrowDataType::Int64, true),
2643            Field::new("v1", ArrowDataType::Float64, true),
2644            Field::new(
2645                "ts",
2646                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2647                false,
2648            ),
2649        ]));
2650
2651        let input_batch =
2652            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2653                .unwrap();
2654
2655        let part = BulkPart {
2656            batch: input_batch,
2657            max_timestamp: 2000,
2658            min_timestamp: 1000,
2659            sequence: 7,
2660            timestamp_index: 3,
2661            raw_data: None,
2662        };
2663
2664        let output_schema = to_flat_sst_arrow_schema(
2665            &metadata,
2666            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2667        );
2668
2669        let result = convert_bulk_part(
2670            part,
2671            &metadata,
2672            primary_key_codec,
2673            output_schema,
2674            true, // store_primary_key_columns (ignored for sparse)
2675        )
2676        .unwrap();
2677
2678        let converted = result.unwrap();
2679
2680        assert_eq!(converted.num_rows(), 2);
2681        assert_eq!(converted.max_timestamp, 2000);
2682        assert_eq!(converted.min_timestamp, 1000);
2683        assert_eq!(converted.sequence, 7);
2684
2685        // Verify schema does NOT include individual primary key columns (sparse encoding)
2686        let schema = converted.batch.schema();
2687        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2688        assert_eq!(
2689            field_names,
2690            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2691        );
2692
2693        // Verify __primary_key is dictionary encoded
2694        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2695        assert!(matches!(
2696            pk_col.data_type(),
2697            ArrowDataType::Dictionary(_, _)
2698        ));
2699    }
2700
2701    #[test]
2702    fn test_convert_bulk_part_sorting_with_multiple_series() {
2703        let metadata = metadata_for_test();
2704        let primary_key_codec = build_primary_key_codec(&metadata);
2705
2706        // Create unsorted batch with multiple series and timestamps
2707        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2708            "series_b", "series_a", "series_b", "series_a",
2709        ]));
2710        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2711        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2712        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2713        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2714            2000, 1000, 4000, 3000,
2715        ]));
2716
2717        let input_schema = Arc::new(Schema::new(vec![
2718            Field::new("k0", ArrowDataType::Utf8, false),
2719            Field::new("k1", ArrowDataType::UInt32, false),
2720            Field::new("v0", ArrowDataType::Int64, true),
2721            Field::new("v1", ArrowDataType::Float64, true),
2722            Field::new(
2723                "ts",
2724                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2725                false,
2726            ),
2727        ]));
2728
2729        let input_batch = RecordBatch::try_new(
2730            input_schema,
2731            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2732        )
2733        .unwrap();
2734
2735        let part = BulkPart {
2736            batch: input_batch,
2737            max_timestamp: 4000,
2738            min_timestamp: 1000,
2739            sequence: 10,
2740            timestamp_index: 4,
2741            raw_data: None,
2742        };
2743
2744        let output_schema = to_flat_sst_arrow_schema(
2745            &metadata,
2746            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2747        );
2748
2749        let result =
2750            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2751
2752        let converted = result.unwrap();
2753
2754        assert_eq!(converted.num_rows(), 4);
2755
2756        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2757        let ts_col = converted.batch.column(converted.timestamp_index);
2758        let ts_array = ts_col
2759            .as_any()
2760            .downcast_ref::<TimestampMillisecondArray>()
2761            .unwrap();
2762
2763        // After sorting by (pk, ts), we should have:
2764        // series_a,1: ts=1000, 3000
2765        // series_b,2: ts=2000, 4000
2766        let timestamps: Vec<i64> = ts_array.values().to_vec();
2767        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2768    }
2769
2770    /// Helper to create a converted BulkPart (with __primary_key column) from MutationInputs.
2771    fn build_converted_bulk_part(inputs: &[MutationInput]) -> BulkPart {
2772        let metadata = metadata_for_test();
2773        let kvs = inputs
2774            .iter()
2775            .map(|m| {
2776                build_key_values_with_ts_seq_values(
2777                    &metadata,
2778                    m.k0.to_string(),
2779                    m.k1,
2780                    m.timestamps.iter().copied(),
2781                    m.v1.iter().copied(),
2782                    m.sequence,
2783                )
2784            })
2785            .collect::<Vec<_>>();
2786        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2787        let primary_key_codec = build_primary_key_codec(&metadata);
2788        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
2789        for kv in kvs {
2790            converter.append_key_values(&kv).unwrap();
2791        }
2792        converter.convert().unwrap()
2793    }
2794
2795    /// Helper to create a MultiBulkPart where each group becomes a separate batch.
2796    fn build_multi_bulk_part(groups: &[&[MutationInput]]) -> (MultiBulkPart, RegionMetadataRef) {
2797        let metadata = metadata_for_test();
2798        let mut all_batches = Vec::new();
2799        let mut min_ts = i64::MAX;
2800        let mut max_ts = i64::MIN;
2801        let mut max_seq = 0u64;
2802
2803        for inputs in groups {
2804            let part = build_converted_bulk_part(inputs);
2805            min_ts = min_ts.min(part.min_timestamp);
2806            max_ts = max_ts.max(part.max_timestamp);
2807            max_seq = max_seq.max(part.sequence);
2808            all_batches.push(part.batch);
2809        }
2810
2811        let multi = MultiBulkPart::new(
2812            all_batches,
2813            min_ts,
2814            max_ts,
2815            max_seq,
2816            groups.len(),
2817            &metadata,
2818        );
2819        (multi, metadata)
2820    }
2821
2822    #[test]
2823    fn test_multi_bulk_part_prune_batches() {
2824        // Three batches with distinct k0 ranges: ["a"], ["m"], ["z"].
2825        let (multi, metadata) = build_multi_bulk_part(&[
2826            &[MutationInput {
2827                k0: "a",
2828                k1: 0,
2829                timestamps: &[1, 2],
2830                v1: &[Some(1.0), Some(2.0)],
2831                sequence: 0,
2832            }],
2833            &[MutationInput {
2834                k0: "m",
2835                k1: 0,
2836                timestamps: &[3, 4],
2837                v1: &[Some(3.0), Some(4.0)],
2838                sequence: 1,
2839            }],
2840            &[MutationInput {
2841                k0: "z",
2842                k1: 0,
2843                timestamps: &[5, 6],
2844                v1: &[Some(5.0), Some(6.0)],
2845                sequence: 2,
2846            }],
2847        ]);
2848        assert_eq!(multi.num_rows(), 6);
2849        assert_eq!(multi.num_batches(), 3);
2850
2851        // k0 = "m" => only middle batch (2 rows).
2852        let context = Arc::new(
2853            BulkIterContext::new(
2854                metadata.clone(),
2855                None,
2856                Some(Predicate::new(vec![
2857                    datafusion_expr::col("k0").eq(datafusion_expr::lit("m")),
2858                ])),
2859                false,
2860            )
2861            .unwrap(),
2862        );
2863        let reader = multi
2864            .read(context, None, None)
2865            .unwrap()
2866            .expect("should have results");
2867        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2868        assert_eq!(total_rows, 2);
2869
2870        // k0 = "nonexistent" => all pruned, returns None.
2871        let context = Arc::new(
2872            BulkIterContext::new(
2873                metadata.clone(),
2874                None,
2875                Some(Predicate::new(vec![
2876                    datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent")),
2877                ])),
2878                false,
2879            )
2880            .unwrap(),
2881        );
2882        assert!(multi.read(context, None, None).unwrap().is_none());
2883
2884        // No predicate => all 6 rows.
2885        let context = Arc::new(BulkIterContext::new(metadata.clone(), None, None, false).unwrap());
2886        let reader = multi
2887            .read(context, None, None)
2888            .unwrap()
2889            .expect("should have results");
2890        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2891        assert_eq!(total_rows, 6);
2892    }
2893}