fix: implement bulk write for time partitions and bulk memtable (#7293)

* feat: implement convert_bulk_part

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: convert bulk part in TimePartitions

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: fill missing columns for bulk parts

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: cast to dictionary type

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add unit tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: do not convert part if bulk is written by write()

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-11-27 16:01:45 +08:00
committed by GitHub
parent e44323c433
commit afefc0c604
5 changed files with 693 additions and 19 deletions

View File

@@ -14,7 +14,7 @@
//! Bulk part encoder/decoder.
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -34,7 +34,9 @@ use datatypes::arrow::array::{
UInt64Array, UInt64Builder,
};
use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
};
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
@@ -51,14 +53,15 @@ use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::{OptionExt, ResultExt, Snafu};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::{FileId, SequenceNumber, SequenceRange};
use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
use table::predicate::Predicate;
use crate::error::{
self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
@@ -167,6 +170,86 @@ impl BulkPart {
}
}
/// Fills missing columns in the BulkPart batch with default values.
///
/// This function checks if the batch schema matches the region metadata schema,
/// and if there are missing columns, it fills them with default values (or null
/// for nullable columns).
///
/// # Arguments
///
/// * `region_metadata` - The region metadata containing the expected schema
pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
// Builds a map of existing columns in the batch
let batch_schema = self.batch.schema();
let batch_columns: HashSet<_> = batch_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
// Finds columns that need to be filled
let mut columns_to_fill = Vec::new();
for column_meta in &region_metadata.column_metadatas {
// TODO(yingwen): Returns error if it is impure default after we support filling
// bulk insert request in the frontend
if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
columns_to_fill.push(column_meta);
}
}
if columns_to_fill.is_empty() {
return Ok(());
}
let num_rows = self.batch.num_rows();
let mut new_columns = Vec::new();
let mut new_fields = Vec::new();
// First, adds all existing columns
new_fields.extend(batch_schema.fields().iter().cloned());
new_columns.extend_from_slice(self.batch.columns());
let region_id = region_metadata.region_id;
// Then adds the missing columns with default values
for column_meta in columns_to_fill {
let default_vector = column_meta
.column_schema
.create_default_vector(num_rows)
.context(CreateDefaultSnafu {
region_id,
column: &column_meta.column_schema.name,
})?
.with_context(|| InvalidRequestSnafu {
region_id,
reason: format!(
"column {} does not have default value",
column_meta.column_schema.name
),
})?;
let arrow_array = default_vector.to_arrow_array();
column_meta.column_schema.data_type.as_arrow_type();
new_fields.push(Arc::new(Field::new(
column_meta.column_schema.name.clone(),
column_meta.column_schema.data_type.as_arrow_type(),
column_meta.column_schema.is_nullable(),
)));
new_columns.push(arrow_array);
}
// Create a new schema and batch with the filled columns
let new_schema = Arc::new(Schema::new(new_fields));
let new_batch =
RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
// Update the batch
self.batch = new_batch;
Ok(())
}
/// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
let vectors = region_metadata
@@ -667,6 +750,196 @@ fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
}
/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
/// with the same format as produced by [BulkPartConverter].
///
/// This function takes a `BulkPart` where:
/// - For dense encoding: Primary key columns may be stored as individual columns
/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
/// - The batch may not be sorted
///
/// And produces a `BulkPart` where:
/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
/// - An encoded `__primary_key` dictionary column is present
/// - The batch is sorted by (primary_key, timestamp, sequence desc)
///
/// # Arguments
///
/// * `part` - The input `BulkPart` to convert
/// * `region_metadata` - Region metadata containing schema information
/// * `primary_key_codec` - Codec for encoding primary keys
/// * `schema` - Target schema for the output batch
/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
///
/// # Returns
///
/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
/// encoded primary keys and sorted data.
pub fn convert_bulk_part(
part: BulkPart,
region_metadata: &RegionMetadataRef,
primary_key_codec: Arc<dyn PrimaryKeyCodec>,
schema: SchemaRef,
store_primary_key_columns: bool,
) -> Result<Option<BulkPart>> {
if part.num_rows() == 0 {
return Ok(None);
}
let num_rows = part.num_rows();
let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
// Builds a column name-to-index map for efficient lookups
let input_schema = part.batch.schema();
let column_indices: HashMap<&str, usize> = input_schema
.fields()
.iter()
.enumerate()
.map(|(idx, field)| (field.name().as_str(), idx))
.collect();
// Determines the structure of the input batch by looking up columns by name
let mut output_columns = Vec::new();
// Extracts primary key columns if we need to encode them (dense encoding)
let pk_array = if is_sparse {
// For sparse encoding, the input should already have the __primary_key column
// We need to find it in the input batch
None
} else {
// For dense encoding, extract and encode primary key columns by name
let pk_vectors: Result<Vec<_>> = region_metadata
.primary_key_columns()
.map(|col_meta| {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
let col = part.batch.column(*col_idx);
Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
})
.collect();
let pk_vectors = pk_vectors?;
let mut key_array_builder = PrimaryKeyArrayBuilder::new();
let mut encode_buf = Vec::new();
for row_idx in 0..num_rows {
encode_buf.clear();
// Collects primary key values with column IDs for this row
let pk_values_with_ids: Vec<_> = region_metadata
.primary_key
.iter()
.zip(pk_vectors.iter())
.map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
.collect();
// Encodes the primary key
primary_key_codec
.encode_value_refs(&pk_values_with_ids, &mut encode_buf)
.context(EncodeSnafu)?;
key_array_builder
.append(&encode_buf)
.context(ComputeArrowSnafu)?;
}
Some(key_array_builder.finish())
};
// Adds primary key columns if storing them (only for dense encoding)
if store_primary_key_columns && !is_sparse {
for col_meta in region_metadata.primary_key_columns() {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
let col = part.batch.column(*col_idx);
// Converts to dictionary if needed for string types
let col = if col_meta.column_schema.data_type.is_string() {
let target_type = ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Utf8),
);
arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
} else {
col.clone()
};
output_columns.push(col);
}
}
// Adds field columns
for col_meta in region_metadata.field_columns() {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
output_columns.push(part.batch.column(*col_idx).clone());
}
// Adds timestamp column
let new_timestamp_index = output_columns.len();
let ts_col_idx = column_indices
.get(
region_metadata
.time_index_column()
.column_schema
.name
.as_str(),
)
.context(ColumnNotFoundSnafu {
column: &region_metadata.time_index_column().column_schema.name,
})?;
output_columns.push(part.batch.column(*ts_col_idx).clone());
// Adds encoded primary key dictionary column
let pk_dictionary = if let Some(pk_dict_array) = pk_array {
Arc::new(pk_dict_array) as ArrayRef
} else {
let pk_col_idx =
column_indices
.get(PRIMARY_KEY_COLUMN_NAME)
.context(ColumnNotFoundSnafu {
column: PRIMARY_KEY_COLUMN_NAME,
})?;
let col = part.batch.column(*pk_col_idx);
// Casts to dictionary type if needed
let target_type = ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
);
arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
};
output_columns.push(pk_dictionary);
let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
output_columns.push(Arc::new(sequence_array) as ArrayRef);
let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
output_columns.push(Arc::new(op_type_array) as ArrayRef);
let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
// Sorts the batch by (primary_key, timestamp, sequence desc)
let sorted_batch = sort_primary_key_record_batch(&batch)?;
Ok(Some(BulkPart {
batch: sorted_batch,
max_timestamp: part.max_timestamp,
min_timestamp: part.min_timestamp,
sequence: part.sequence,
timestamp_index: new_timestamp_index,
raw_data: None,
}))
}
#[derive(Debug, Clone)]
pub struct EncodedBulkPart {
data: Bytes,
@@ -1189,11 +1462,14 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
mod tests {
use std::collections::VecDeque;
use api::v1::{Row, WriteHint};
use api::v1::{Row, SemanticType, WriteHint};
use datafusion_common::ScalarValue;
use datatypes::arrow::array::Float64Array;
use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use store_api::storage::consts::ReservedColumnId;
use super::*;
@@ -2166,4 +2442,379 @@ mod tests {
);
}
}
#[test]
fn test_convert_bulk_part_empty() {
let metadata = metadata_for_test();
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let primary_key_codec = build_primary_key_codec(&metadata);
// Create empty batch
let empty_batch = RecordBatch::new_empty(schema.clone());
let empty_part = BulkPart {
batch: empty_batch,
max_timestamp: 0,
min_timestamp: 0,
sequence: 0,
timestamp_index: 0,
raw_data: None,
};
let result =
convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
assert!(result.is_none());
}
#[test]
fn test_convert_bulk_part_dense_with_pk_columns() {
let metadata = metadata_for_test();
let primary_key_codec = build_primary_key_codec(&metadata);
let k0_array = Arc::new(arrow::array::StringArray::from(vec![
"key1", "key2", "key1",
]));
let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", ArrowDataType::Utf8, false),
Field::new("k1", ArrowDataType::UInt32, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch = RecordBatch::try_new(
input_schema,
vec![k0_array, k1_array, v0_array, v1_array, ts_array],
)
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 2000,
min_timestamp: 1000,
sequence: 5,
timestamp_index: 4,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let result = convert_bulk_part(
part,
&metadata,
primary_key_codec,
output_schema,
true, // store primary key columns
)
.unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 3);
assert_eq!(converted.max_timestamp, 2000);
assert_eq!(converted.min_timestamp, 1000);
assert_eq!(converted.sequence, 5);
let schema = converted.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec![
"k0",
"k1",
"v0",
"v1",
"ts",
"__primary_key",
"__sequence",
"__op_type"
]
);
let k0_col = converted.batch.column_by_name("k0").unwrap();
assert!(matches!(
k0_col.data_type(),
ArrowDataType::Dictionary(_, _)
));
let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
let dict_array = pk_col
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
.unwrap();
let keys = dict_array.keys();
assert_eq!(keys.len(), 3);
}
#[test]
fn test_convert_bulk_part_dense_without_pk_columns() {
let metadata = metadata_for_test();
let primary_key_codec = build_primary_key_codec(&metadata);
// Create input batch with primary key columns (k0, k1)
let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", ArrowDataType::Utf8, false),
Field::new("k1", ArrowDataType::UInt32, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch = RecordBatch::try_new(
input_schema,
vec![k0_array, k1_array, v0_array, v1_array, ts_array],
)
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 2000,
min_timestamp: 1000,
sequence: 3,
timestamp_index: 4,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
},
);
let result = convert_bulk_part(
part,
&metadata,
primary_key_codec,
output_schema,
false, // don't store primary key columns
)
.unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 2);
assert_eq!(converted.max_timestamp, 2000);
assert_eq!(converted.min_timestamp, 1000);
assert_eq!(converted.sequence, 3);
// Verify schema does NOT include individual primary key columns
let schema = converted.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
);
// Verify __primary_key column is present and is a dictionary
let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
assert!(matches!(
pk_col.data_type(),
ArrowDataType::Dictionary(_, _)
));
}
#[test]
fn test_convert_bulk_part_sparse_encoding() {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![0, 1])
.primary_key_encoding(PrimaryKeyEncoding::Sparse);
let metadata = Arc::new(builder.build().unwrap());
let primary_key_codec = build_primary_key_codec(&metadata);
// Create input batch with __primary_key column (sparse encoding)
let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
b"encoded_key_1".as_slice(),
b"encoded_key_2".as_slice(),
]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("__primary_key", ArrowDataType::Binary, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch =
RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 2000,
min_timestamp: 1000,
sequence: 7,
timestamp_index: 3,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let result = convert_bulk_part(
part,
&metadata,
primary_key_codec,
output_schema,
true, // store_primary_key_columns (ignored for sparse)
)
.unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 2);
assert_eq!(converted.max_timestamp, 2000);
assert_eq!(converted.min_timestamp, 1000);
assert_eq!(converted.sequence, 7);
// Verify schema does NOT include individual primary key columns (sparse encoding)
let schema = converted.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
);
// Verify __primary_key is dictionary encoded
let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
assert!(matches!(
pk_col.data_type(),
ArrowDataType::Dictionary(_, _)
));
}
#[test]
fn test_convert_bulk_part_sorting_with_multiple_series() {
let metadata = metadata_for_test();
let primary_key_codec = build_primary_key_codec(&metadata);
// Create unsorted batch with multiple series and timestamps
let k0_array = Arc::new(arrow::array::StringArray::from(vec![
"series_b", "series_a", "series_b", "series_a",
]));
let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2000, 1000, 4000, 3000,
]));
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", ArrowDataType::Utf8, false),
Field::new("k1", ArrowDataType::UInt32, false),
Field::new("v0", ArrowDataType::Int64, true),
Field::new("v1", ArrowDataType::Float64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let input_batch = RecordBatch::try_new(
input_schema,
vec![k0_array, k1_array, v0_array, v1_array, ts_array],
)
.unwrap();
let part = BulkPart {
batch: input_batch,
max_timestamp: 4000,
min_timestamp: 1000,
sequence: 10,
timestamp_index: 4,
raw_data: None,
};
let output_schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let result =
convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
let converted = result.unwrap();
assert_eq!(converted.num_rows(), 4);
// Verify data is sorted by (primary_key, timestamp, sequence desc)
let ts_col = converted.batch.column(converted.timestamp_index);
let ts_array = ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
// After sorting by (pk, ts), we should have:
// series_a,1: ts=1000, 3000
// series_b,2: ts=2000, 4000
let timestamps: Vec<i64> = ts_array.values().to_vec();
assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
}
}

View File

@@ -261,7 +261,7 @@ impl TimePartitions {
converter.append_key_values(kvs)?;
let part = converter.convert()?;
return self.write_bulk(part);
return self.write_bulk_inner(part);
}
// Get all parts.
@@ -291,7 +291,31 @@ impl TimePartitions {
self.write_multi_parts(kvs, &parts)
}
/// Writes a bulk part.
pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
// Convert the bulk part if bulk_schema is Some
let part = if let Some(bulk_schema) = &self.bulk_schema {
let converted = crate::memtable::bulk::part::convert_bulk_part(
part,
&self.metadata,
self.primary_key_codec.clone(),
bulk_schema.clone(),
// Always store primary keys for bulk mode.
true,
)?;
match converted {
Some(p) => p,
None => return Ok(()),
}
} else {
part
};
self.write_bulk_inner(part)
}
/// Writes a bulk part without converting.
fn write_bulk_inner(&self, part: BulkPart) -> Result<()> {
let time_type = self
.metadata
.time_index_column()

View File

@@ -388,17 +388,14 @@ impl<S> RegionWorkerLoop<S> {
let need_fill_missing_columns = region_ctx.version().metadata.schema_version
!= bulk_req.region_metadata.schema_version;
// Only fill missing columns if primary key is dense encoded.
if need_fill_missing_columns {
// todo(hl): support filling default columns
bulk_req.sender.send(
InvalidRequestSnafu {
region_id,
reason: "Schema mismatch",
}
.fail(),
);
return;
// Fill missing columns if needed
if need_fill_missing_columns
&& let Err(e) = bulk_req
.request
.fill_missing_columns(&region_ctx.version().metadata)
{
bulk_req.sender.send(Err(e));
continue;
}
// Collect requests by region.

View File

@@ -66,6 +66,7 @@ impl Inserter {
return Ok(0);
}
// TODO(yingwen): Fill record batch impure default values.
// notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info.clone(), record_batch.clone());

View File

@@ -36,6 +36,7 @@ pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
.collect()
}
// TODO(yingwen): Support Bulk insert request.
/// Fill impure default values in the request
pub struct ImpureDefaultFiller {
impure_columns: HashMap<String, (api::v1::ColumnSchema, api::v1::Value)>,