diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 9d6577f1d4..d820a35226 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -14,7 +14,7 @@ //! Bulk part encoder/decoder. -use std::collections::VecDeque; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -34,7 +34,9 @@ use datatypes::arrow::array::{ UInt64Array, UInt64Builder, }; use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions}; -use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; +use datatypes::arrow::datatypes::{ + DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type, +}; use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; @@ -51,14 +53,15 @@ use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use snafu::{OptionExt, ResultExt, Snafu}; use store_api::codec::PrimaryKeyEncoding; -use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; -use store_api::storage::{FileId, SequenceNumber, SequenceRange}; +use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange}; use table::predicate::Predicate; use crate::error::{ - self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu, - EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result, + self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, + DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, + InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu, }; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; @@ -167,6 +170,86 @@ impl BulkPart { } } + /// Fills missing columns in the BulkPart batch with default values. + /// + /// This function checks if the batch schema matches the region metadata schema, + /// and if there are missing columns, it fills them with default values (or null + /// for nullable columns). + /// + /// # Arguments + /// + /// * `region_metadata` - The region metadata containing the expected schema + pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> { + // Builds a map of existing columns in the batch + let batch_schema = self.batch.schema(); + let batch_columns: HashSet<_> = batch_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + + // Finds columns that need to be filled + let mut columns_to_fill = Vec::new(); + for column_meta in ®ion_metadata.column_metadatas { + // TODO(yingwen): Returns error if it is impure default after we support filling + // bulk insert request in the frontend + if !batch_columns.contains(column_meta.column_schema.name.as_str()) { + columns_to_fill.push(column_meta); + } + } + + if columns_to_fill.is_empty() { + return Ok(()); + } + + let num_rows = self.batch.num_rows(); + + let mut new_columns = Vec::new(); + let mut new_fields = Vec::new(); + + // First, adds all existing columns + new_fields.extend(batch_schema.fields().iter().cloned()); + new_columns.extend_from_slice(self.batch.columns()); + + let region_id = region_metadata.region_id; + // Then adds the missing columns with default values + for column_meta in columns_to_fill { + let default_vector = column_meta + .column_schema + .create_default_vector(num_rows) + .context(CreateDefaultSnafu { + region_id, + column: &column_meta.column_schema.name, + })? + .with_context(|| InvalidRequestSnafu { + region_id, + reason: format!( + "column {} does not have default value", + column_meta.column_schema.name + ), + })?; + let arrow_array = default_vector.to_arrow_array(); + column_meta.column_schema.data_type.as_arrow_type(); + + new_fields.push(Arc::new(Field::new( + column_meta.column_schema.name.clone(), + column_meta.column_schema.data_type.as_arrow_type(), + column_meta.column_schema.is_nullable(), + ))); + new_columns.push(arrow_array); + } + + // Create a new schema and batch with the filled columns + let new_schema = Arc::new(Schema::new(new_fields)); + let new_batch = + RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?; + + // Update the batch + self.batch = new_batch; + + Ok(()) + } + /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation. pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result { let vectors = region_metadata @@ -667,6 +750,196 @@ fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result { datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu) } +/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart` +/// with the same format as produced by [BulkPartConverter]. +/// +/// This function takes a `BulkPart` where: +/// - For dense encoding: Primary key columns may be stored as individual columns +/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys +/// - The batch may not be sorted +/// +/// And produces a `BulkPart` where: +/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding) +/// - An encoded `__primary_key` dictionary column is present +/// - The batch is sorted by (primary_key, timestamp, sequence desc) +/// +/// # Arguments +/// +/// * `part` - The input `BulkPart` to convert +/// * `region_metadata` - Region metadata containing schema information +/// * `primary_key_codec` - Codec for encoding primary keys +/// * `schema` - Target schema for the output batch +/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns +/// +/// # Returns +/// +/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with +/// encoded primary keys and sorted data. +pub fn convert_bulk_part( + part: BulkPart, + region_metadata: &RegionMetadataRef, + primary_key_codec: Arc, + schema: SchemaRef, + store_primary_key_columns: bool, +) -> Result> { + if part.num_rows() == 0 { + return Ok(None); + } + + let num_rows = part.num_rows(); + let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse; + + // Builds a column name-to-index map for efficient lookups + let input_schema = part.batch.schema(); + let column_indices: HashMap<&str, usize> = input_schema + .fields() + .iter() + .enumerate() + .map(|(idx, field)| (field.name().as_str(), idx)) + .collect(); + + // Determines the structure of the input batch by looking up columns by name + let mut output_columns = Vec::new(); + + // Extracts primary key columns if we need to encode them (dense encoding) + let pk_array = if is_sparse { + // For sparse encoding, the input should already have the __primary_key column + // We need to find it in the input batch + None + } else { + // For dense encoding, extract and encode primary key columns by name + let pk_vectors: Result> = region_metadata + .primary_key_columns() + .map(|col_meta| { + let col_idx = column_indices + .get(col_meta.column_schema.name.as_str()) + .context(ColumnNotFoundSnafu { + column: &col_meta.column_schema.name, + })?; + let col = part.batch.column(*col_idx); + Helper::try_into_vector(col).context(error::ComputeVectorSnafu) + }) + .collect(); + let pk_vectors = pk_vectors?; + + let mut key_array_builder = PrimaryKeyArrayBuilder::new(); + let mut encode_buf = Vec::new(); + + for row_idx in 0..num_rows { + encode_buf.clear(); + + // Collects primary key values with column IDs for this row + let pk_values_with_ids: Vec<_> = region_metadata + .primary_key + .iter() + .zip(pk_vectors.iter()) + .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx))) + .collect(); + + // Encodes the primary key + primary_key_codec + .encode_value_refs(&pk_values_with_ids, &mut encode_buf) + .context(EncodeSnafu)?; + + key_array_builder + .append(&encode_buf) + .context(ComputeArrowSnafu)?; + } + + Some(key_array_builder.finish()) + }; + + // Adds primary key columns if storing them (only for dense encoding) + if store_primary_key_columns && !is_sparse { + for col_meta in region_metadata.primary_key_columns() { + let col_idx = column_indices + .get(col_meta.column_schema.name.as_str()) + .context(ColumnNotFoundSnafu { + column: &col_meta.column_schema.name, + })?; + let col = part.batch.column(*col_idx); + + // Converts to dictionary if needed for string types + let col = if col_meta.column_schema.data_type.is_string() { + let target_type = ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt32), + Box::new(ArrowDataType::Utf8), + ); + arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)? + } else { + col.clone() + }; + output_columns.push(col); + } + } + + // Adds field columns + for col_meta in region_metadata.field_columns() { + let col_idx = column_indices + .get(col_meta.column_schema.name.as_str()) + .context(ColumnNotFoundSnafu { + column: &col_meta.column_schema.name, + })?; + output_columns.push(part.batch.column(*col_idx).clone()); + } + + // Adds timestamp column + let new_timestamp_index = output_columns.len(); + let ts_col_idx = column_indices + .get( + region_metadata + .time_index_column() + .column_schema + .name + .as_str(), + ) + .context(ColumnNotFoundSnafu { + column: ®ion_metadata.time_index_column().column_schema.name, + })?; + output_columns.push(part.batch.column(*ts_col_idx).clone()); + + // Adds encoded primary key dictionary column + let pk_dictionary = if let Some(pk_dict_array) = pk_array { + Arc::new(pk_dict_array) as ArrayRef + } else { + let pk_col_idx = + column_indices + .get(PRIMARY_KEY_COLUMN_NAME) + .context(ColumnNotFoundSnafu { + column: PRIMARY_KEY_COLUMN_NAME, + })?; + let col = part.batch.column(*pk_col_idx); + + // Casts to dictionary type if needed + let target_type = ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt32), + Box::new(ArrowDataType::Binary), + ); + arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)? + }; + output_columns.push(pk_dictionary); + + let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]); + output_columns.push(Arc::new(sequence_array) as ArrayRef); + + let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]); + output_columns.push(Arc::new(op_type_array) as ArrayRef); + + let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?; + + // Sorts the batch by (primary_key, timestamp, sequence desc) + let sorted_batch = sort_primary_key_record_batch(&batch)?; + + Ok(Some(BulkPart { + batch: sorted_batch, + max_timestamp: part.max_timestamp, + min_timestamp: part.min_timestamp, + sequence: part.sequence, + timestamp_index: new_timestamp_index, + raw_data: None, + })) +} + #[derive(Debug, Clone)] pub struct EncodedBulkPart { data: Bytes, @@ -1189,11 +1462,14 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result { mod tests { use std::collections::VecDeque; - use api::v1::{Row, WriteHint}; + use api::v1::{Row, SemanticType, WriteHint}; use datafusion_common::ScalarValue; use datatypes::arrow::array::Float64Array; use datatypes::prelude::{ConcreteDataType, ScalarVector, Value}; + use datatypes::schema::ColumnSchema; use datatypes::vectors::{Float64Vector, TimestampMillisecondVector}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; use store_api::storage::consts::ReservedColumnId; use super::*; @@ -2166,4 +2442,379 @@ mod tests { ); } } + + #[test] + fn test_convert_bulk_part_empty() { + let metadata = metadata_for_test(); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + let primary_key_codec = build_primary_key_codec(&metadata); + + // Create empty batch + let empty_batch = RecordBatch::new_empty(schema.clone()); + let empty_part = BulkPart { + batch: empty_batch, + max_timestamp: 0, + min_timestamp: 0, + sequence: 0, + timestamp_index: 0, + raw_data: None, + }; + + let result = + convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_convert_bulk_part_dense_with_pk_columns() { + let metadata = metadata_for_test(); + let primary_key_codec = build_primary_key_codec(&metadata); + + let k0_array = Arc::new(arrow::array::StringArray::from(vec![ + "key1", "key2", "key1", + ])); + let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1])); + let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300])); + let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0])); + let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500])); + + let input_schema = Arc::new(Schema::new(vec![ + Field::new("k0", ArrowDataType::Utf8, false), + Field::new("k1", ArrowDataType::UInt32, false), + Field::new("v0", ArrowDataType::Int64, true), + Field::new("v1", ArrowDataType::Float64, true), + Field::new( + "ts", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + ])); + + let input_batch = RecordBatch::try_new( + input_schema, + vec![k0_array, k1_array, v0_array, v1_array, ts_array], + ) + .unwrap(); + + let part = BulkPart { + batch: input_batch, + max_timestamp: 2000, + min_timestamp: 1000, + sequence: 5, + timestamp_index: 4, + raw_data: None, + }; + + let output_schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + + let result = convert_bulk_part( + part, + &metadata, + primary_key_codec, + output_schema, + true, // store primary key columns + ) + .unwrap(); + + let converted = result.unwrap(); + + assert_eq!(converted.num_rows(), 3); + assert_eq!(converted.max_timestamp, 2000); + assert_eq!(converted.min_timestamp, 1000); + assert_eq!(converted.sequence, 5); + + let schema = converted.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec![ + "k0", + "k1", + "v0", + "v1", + "ts", + "__primary_key", + "__sequence", + "__op_type" + ] + ); + + let k0_col = converted.batch.column_by_name("k0").unwrap(); + assert!(matches!( + k0_col.data_type(), + ArrowDataType::Dictionary(_, _) + )); + + let pk_col = converted.batch.column_by_name("__primary_key").unwrap(); + let dict_array = pk_col + .as_any() + .downcast_ref::>() + .unwrap(); + let keys = dict_array.keys(); + + assert_eq!(keys.len(), 3); + } + + #[test] + fn test_convert_bulk_part_dense_without_pk_columns() { + let metadata = metadata_for_test(); + let primary_key_codec = build_primary_key_codec(&metadata); + + // Create input batch with primary key columns (k0, k1) + let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"])); + let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2])); + let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200])); + let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0])); + let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])); + + let input_schema = Arc::new(Schema::new(vec![ + Field::new("k0", ArrowDataType::Utf8, false), + Field::new("k1", ArrowDataType::UInt32, false), + Field::new("v0", ArrowDataType::Int64, true), + Field::new("v1", ArrowDataType::Float64, true), + Field::new( + "ts", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + ])); + + let input_batch = RecordBatch::try_new( + input_schema, + vec![k0_array, k1_array, v0_array, v1_array, ts_array], + ) + .unwrap(); + + let part = BulkPart { + batch: input_batch, + max_timestamp: 2000, + min_timestamp: 1000, + sequence: 3, + timestamp_index: 4, + raw_data: None, + }; + + let output_schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions { + raw_pk_columns: false, + string_pk_use_dict: true, + }, + ); + + let result = convert_bulk_part( + part, + &metadata, + primary_key_codec, + output_schema, + false, // don't store primary key columns + ) + .unwrap(); + + let converted = result.unwrap(); + + assert_eq!(converted.num_rows(), 2); + assert_eq!(converted.max_timestamp, 2000); + assert_eq!(converted.min_timestamp, 1000); + assert_eq!(converted.sequence, 3); + + // Verify schema does NOT include individual primary key columns + let schema = converted.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"] + ); + + // Verify __primary_key column is present and is a dictionary + let pk_col = converted.batch.column_by_name("__primary_key").unwrap(); + assert!(matches!( + pk_col.data_type(), + ArrowDataType::Dictionary(_, _) + )); + } + + #[test] + fn test_convert_bulk_part_sparse_encoding() { + let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 4, + }) + .primary_key(vec![0, 1]) + .primary_key_encoding(PrimaryKeyEncoding::Sparse); + let metadata = Arc::new(builder.build().unwrap()); + + let primary_key_codec = build_primary_key_codec(&metadata); + + // Create input batch with __primary_key column (sparse encoding) + let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![ + b"encoded_key_1".as_slice(), + b"encoded_key_2".as_slice(), + ])); + let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200])); + let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0])); + let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])); + + let input_schema = Arc::new(Schema::new(vec![ + Field::new("__primary_key", ArrowDataType::Binary, false), + Field::new("v0", ArrowDataType::Int64, true), + Field::new("v1", ArrowDataType::Float64, true), + Field::new( + "ts", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + ])); + + let input_batch = + RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array]) + .unwrap(); + + let part = BulkPart { + batch: input_batch, + max_timestamp: 2000, + min_timestamp: 1000, + sequence: 7, + timestamp_index: 3, + raw_data: None, + }; + + let output_schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + + let result = convert_bulk_part( + part, + &metadata, + primary_key_codec, + output_schema, + true, // store_primary_key_columns (ignored for sparse) + ) + .unwrap(); + + let converted = result.unwrap(); + + assert_eq!(converted.num_rows(), 2); + assert_eq!(converted.max_timestamp, 2000); + assert_eq!(converted.min_timestamp, 1000); + assert_eq!(converted.sequence, 7); + + // Verify schema does NOT include individual primary key columns (sparse encoding) + let schema = converted.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"] + ); + + // Verify __primary_key is dictionary encoded + let pk_col = converted.batch.column_by_name("__primary_key").unwrap(); + assert!(matches!( + pk_col.data_type(), + ArrowDataType::Dictionary(_, _) + )); + } + + #[test] + fn test_convert_bulk_part_sorting_with_multiple_series() { + let metadata = metadata_for_test(); + let primary_key_codec = build_primary_key_codec(&metadata); + + // Create unsorted batch with multiple series and timestamps + let k0_array = Arc::new(arrow::array::StringArray::from(vec![ + "series_b", "series_a", "series_b", "series_a", + ])); + let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1])); + let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300])); + let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0])); + let ts_array = Arc::new(TimestampMillisecondArray::from(vec![ + 2000, 1000, 4000, 3000, + ])); + + let input_schema = Arc::new(Schema::new(vec![ + Field::new("k0", ArrowDataType::Utf8, false), + Field::new("k1", ArrowDataType::UInt32, false), + Field::new("v0", ArrowDataType::Int64, true), + Field::new("v1", ArrowDataType::Float64, true), + Field::new( + "ts", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + ])); + + let input_batch = RecordBatch::try_new( + input_schema, + vec![k0_array, k1_array, v0_array, v1_array, ts_array], + ) + .unwrap(); + + let part = BulkPart { + batch: input_batch, + max_timestamp: 4000, + min_timestamp: 1000, + sequence: 10, + timestamp_index: 4, + raw_data: None, + }; + + let output_schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + + let result = + convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap(); + + let converted = result.unwrap(); + + assert_eq!(converted.num_rows(), 4); + + // Verify data is sorted by (primary_key, timestamp, sequence desc) + let ts_col = converted.batch.column(converted.timestamp_index); + let ts_array = ts_col + .as_any() + .downcast_ref::() + .unwrap(); + + // After sorting by (pk, ts), we should have: + // series_a,1: ts=1000, 3000 + // series_b,2: ts=2000, 4000 + let timestamps: Vec = ts_array.values().to_vec(); + assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]); + } } diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 9131de32a5..6f11c813cb 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -261,7 +261,7 @@ impl TimePartitions { converter.append_key_values(kvs)?; let part = converter.convert()?; - return self.write_bulk(part); + return self.write_bulk_inner(part); } // Get all parts. @@ -291,7 +291,31 @@ impl TimePartitions { self.write_multi_parts(kvs, &parts) } + /// Writes a bulk part. pub fn write_bulk(&self, part: BulkPart) -> Result<()> { + // Convert the bulk part if bulk_schema is Some + let part = if let Some(bulk_schema) = &self.bulk_schema { + let converted = crate::memtable::bulk::part::convert_bulk_part( + part, + &self.metadata, + self.primary_key_codec.clone(), + bulk_schema.clone(), + // Always store primary keys for bulk mode. + true, + )?; + match converted { + Some(p) => p, + None => return Ok(()), + } + } else { + part + }; + + self.write_bulk_inner(part) + } + + /// Writes a bulk part without converting. + fn write_bulk_inner(&self, part: BulkPart) -> Result<()> { let time_type = self .metadata .time_index_column() diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index e86aa67630..1a23d4487f 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -388,17 +388,14 @@ impl RegionWorkerLoop { let need_fill_missing_columns = region_ctx.version().metadata.schema_version != bulk_req.region_metadata.schema_version; - // Only fill missing columns if primary key is dense encoded. - if need_fill_missing_columns { - // todo(hl): support filling default columns - bulk_req.sender.send( - InvalidRequestSnafu { - region_id, - reason: "Schema mismatch", - } - .fail(), - ); - return; + // Fill missing columns if needed + if need_fill_missing_columns + && let Err(e) = bulk_req + .request + .fill_missing_columns(®ion_ctx.version().metadata) + { + bulk_req.sender.send(Err(e)); + continue; } // Collect requests by region. diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index 15b92958b4..a06cc9503c 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -66,6 +66,7 @@ impl Inserter { return Ok(0); } + // TODO(yingwen): Fill record batch impure default values. // notify flownode to update dirty timestamps if flow is configured. self.maybe_update_flow_dirty_window(table_info.clone(), record_batch.clone()); diff --git a/src/operator/src/req_convert/insert/fill_impure_default.rs b/src/operator/src/req_convert/insert/fill_impure_default.rs index 0de49611d9..2029b8b96f 100644 --- a/src/operator/src/req_convert/insert/fill_impure_default.rs +++ b/src/operator/src/req_convert/insert/fill_impure_default.rs @@ -36,6 +36,7 @@ pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec { .collect() } +// TODO(yingwen): Support Bulk insert request. /// Fill impure default values in the request pub struct ImpureDefaultFiller { impure_columns: HashMap,