feat: Implement a converter to converts KeyValues into BulkPart (#6620)

* chore: add api to memtable to check bulk capability

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: Add a converter to convert KeyValues into BulkPart

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: move supports_bulk_insert to MemtableBuilder

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: benchmark

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: use write_bulk if the memtable benefits from it

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test BulkPartConverter

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add a flag to store unencoded primary keys

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: cache schema for converter

Implements to_flat_sst_arrow_schema

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: simplify tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: don't use bulk convert branch now

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address review comments

* simplify primary_key_column_builders check
* return error if value is not string

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add FlatSchemaOptions::from_encoding and test sparse encoding

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-01 15:59:11 +08:00
committed by GitHub
parent 869f8bf68a
commit 52466fdd92
10 changed files with 1007 additions and 38 deletions

View File

@@ -21,10 +21,12 @@ use datafusion_common::Column;
use datafusion_expr::{lit, Expr};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::memtable::bulk::part::BulkPartConverter;
use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable};
use mito2::region::options::MergeMode;
use mito2::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
use mito_codec::row_converter::DensePrimaryKeyCodec;
use rand::rngs::ThreadRng;
@@ -38,7 +40,7 @@ use table::predicate::Predicate;
/// Writes rows.
fn write_rows(c: &mut Criterion) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
let timestamps = (0..100).collect::<Vec<_>>();
// Note that this test only generate one time series.
@@ -359,5 +361,71 @@ fn cpu_metadata() -> RegionMetadata {
builder.build().unwrap()
}
criterion_group!(benches, write_rows, full_scan, filter_1_host);
fn bulk_part_converter(c: &mut Criterion) {
let metadata = Arc::new(cpu_metadata());
let start_sec = 1710043200;
let mut group = c.benchmark_group("bulk_part_converter");
for &rows in &[1024, 2048, 4096, 8192] {
// Benchmark without storing primary key columns (baseline)
group.bench_with_input(format!("{}_rows_no_pk_columns", rows), &rows, |b, &rows| {
b.iter(|| {
let generator =
CpuDataGenerator::new(metadata.clone(), rows, start_sec, start_sec + 1);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: false,
},
);
let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false);
if let Some(kvs) = generator.iter().next() {
converter.append_key_values(&kvs).unwrap();
}
let _bulk_part = converter.convert().unwrap();
});
});
// Benchmark with storing primary key columns
group.bench_with_input(
format!("{}_rows_with_pk_columns", rows),
&rows,
|b, &rows| {
b.iter(|| {
let generator =
CpuDataGenerator::new(metadata.clone(), rows, start_sec, start_sec + 1);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions {
raw_pk_columns: true,
string_pk_use_dict: true,
},
);
let mut converter =
BulkPartConverter::new(&metadata, schema, rows, codec, true);
if let Some(kvs) = generator.iter().next() {
converter.append_key_values(&kvs).unwrap();
}
let _bulk_part = converter.convert().unwrap();
});
},
);
}
}
criterion_group!(
benches,
write_rows,
full_scan,
filter_1_host,
bulk_part_converter,
);
criterion_main!(benches);

View File

@@ -204,6 +204,12 @@ pub type MemtableRef = Arc<dyn Memtable>;
pub trait MemtableBuilder: Send + Sync + fmt::Debug {
/// Builds a new memtable instance.
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
/// Returns true if the memtable supports bulk insert and benefits from it.
fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
let _metadata = metadata;
false
}
}
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;

View File

@@ -29,7 +29,7 @@ use crate::memtable::{
#[allow(unused)]
mod context;
#[allow(unused)]
pub(crate) mod part;
pub mod part;
mod part_reader;
mod row_group_reader;

View File

@@ -24,41 +24,49 @@ use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::DfRecordBatch as RecordBatch;
use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
UInt8Builder,
Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt64Builder,
UInt8Array, UInt8Builder,
};
use datatypes::arrow::compute::TakeOptions;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::value::Value;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::Helper;
use mito_codec::key_values::{KeyValue, KeyValuesRef};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
use mito_codec::row_converter::{
build_primary_key_codec, DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt,
};
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::{OptionExt, ResultExt, Snafu};
use store_api::metadata::RegionMetadataRef;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::SequenceNumber;
use table::predicate::Predicate;
use crate::error::{
self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result,
self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
EncodeSnafu, NewRecordBatchSnafu, Result,
};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::time_series::{ValueBuilder, Values};
use crate::memtable::BoxedBatchIterator;
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
const INIT_DICT_VALUE_CAPACITY: usize = 8;
#[derive(Clone)]
pub struct BulkPart {
pub batch: RecordBatch,
@@ -209,6 +217,281 @@ impl BulkPart {
}
}
/// Builder type for primary key dictionary array.
type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
/// Primary key column builder for handling strings specially.
enum PrimaryKeyColumnBuilder {
/// String dictionary builder for string types.
StringDict(StringDictionaryBuilder<UInt32Type>),
/// Generic mutable vector for other types.
Vector(Box<dyn MutableVector>),
}
impl PrimaryKeyColumnBuilder {
/// Appends a value to the builder.
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
match self {
PrimaryKeyColumnBuilder::StringDict(builder) => {
if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
// We know the value is a string.
builder.append_value(s);
} else {
builder.append_null();
}
}
PrimaryKeyColumnBuilder::Vector(builder) => {
builder.push_value_ref(value);
}
}
Ok(())
}
/// Converts the builder to an ArrayRef.
fn into_arrow_array(self) -> ArrayRef {
match self {
PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
}
}
}
/// Converter that converts structs into [BulkPart].
pub struct BulkPartConverter {
/// Region metadata.
region_metadata: RegionMetadataRef,
/// Schema of the converted batch.
schema: SchemaRef,
/// Primary key codec for encoding keys
primary_key_codec: Arc<dyn PrimaryKeyCodec>,
/// Buffer for encoding primary key.
key_buf: Vec<u8>,
/// Primary key array builder.
key_array_builder: PrimaryKeyArrayBuilder,
/// Builders for non-primary key columns.
value_builder: ValueBuilder,
/// Builders for individual primary key columns.
/// The order of builders is the same as the order of primary key columns in the region metadata.
primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
/// Max timestamp value.
max_ts: i64,
/// Min timestamp value.
min_ts: i64,
/// Max sequence number.
max_sequence: SequenceNumber,
}
impl BulkPartConverter {
/// Creates a new converter.
///
/// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
/// stores primary key columns in arrays additionally.
pub fn new(
region_metadata: &RegionMetadataRef,
schema: SchemaRef,
capacity: usize,
primary_key_codec: Arc<dyn PrimaryKeyCodec>,
store_primary_key_columns: bool,
) -> Self {
debug_assert_eq!(
region_metadata.primary_key_encoding,
primary_key_codec.encoding()
);
let primary_key_column_builders = if store_primary_key_columns
&& region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
{
new_primary_key_column_builders(region_metadata, capacity)
} else {
Vec::new()
};
Self {
region_metadata: region_metadata.clone(),
schema,
primary_key_codec,
key_buf: Vec::new(),
key_array_builder: PrimaryKeyArrayBuilder::new(),
value_builder: ValueBuilder::new(region_metadata, capacity),
primary_key_column_builders,
min_ts: i64::MAX,
max_ts: i64::MIN,
max_sequence: SequenceNumber::MIN,
}
}
/// Appends a [KeyValues] into the converter.
pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
for kv in key_values.iter() {
self.append_key_value(&kv)?;
}
Ok(())
}
/// Appends a [KeyValue] to builders.
///
/// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
// Handles primary key based on encoding type
if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
// For sparse encoding, the primary key is already encoded in the KeyValue
// Gets the first (and only) primary key value which contains the encoded key
let mut primary_keys = kv.primary_keys();
if let Some(encoded) = primary_keys
.next()
.context(ColumnNotFoundSnafu {
column: PRIMARY_KEY_COLUMN_NAME,
})?
.as_binary()
.context(DataTypeMismatchSnafu)?
{
self.key_array_builder
.append(encoded)
.context(ComputeArrowSnafu)?;
} else {
self.key_array_builder
.append("")
.context(ComputeArrowSnafu)?;
}
} else {
// For dense encoding, we need to encode the primary key columns
self.key_buf.clear();
self.primary_key_codec
.encode_key_value(kv, &mut self.key_buf)
.context(EncodeSnafu)?;
self.key_array_builder
.append(&self.key_buf)
.context(ComputeArrowSnafu)?;
};
// If storing primary key columns, append values to individual builders
if !self.primary_key_column_builders.is_empty() {
for (builder, pk_value) in self
.primary_key_column_builders
.iter_mut()
.zip(kv.primary_keys())
{
builder.push_value_ref(pk_value)?;
}
}
// Pushes other columns.
self.value_builder.push(
kv.timestamp(),
kv.sequence(),
kv.op_type() as u8,
kv.fields(),
);
// Updates statistics
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
self.min_ts = self.min_ts.min(ts);
self.max_ts = self.max_ts.max(ts);
self.max_sequence = self.max_sequence.max(kv.sequence());
Ok(())
}
/// Converts buffered content into a [BulkPart].
///
/// It sorts the record batch by (primary key, timestamp, sequence desc).
pub fn convert(mut self) -> Result<BulkPart> {
let values = Values::from(self.value_builder);
let mut columns =
Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
// Build primary key column arrays if enabled.
for builder in self.primary_key_column_builders {
columns.push(builder.into_arrow_array());
}
// Then fields columns.
columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
// Time index.
let timestamp_index = columns.len();
columns.push(values.timestamp.to_arrow_array());
// Primary key.
let pk_array = self.key_array_builder.finish();
columns.push(Arc::new(pk_array));
// Sequence and op type.
columns.push(values.sequence.to_arrow_array());
columns.push(values.op_type.to_arrow_array());
let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
// Sorts the record batch.
let batch = sort_primary_key_record_batch(&batch)?;
Ok(BulkPart {
batch,
max_ts: self.max_ts,
min_ts: self.min_ts,
sequence: self.max_sequence,
timestamp_index,
raw_data: None,
})
}
}
fn new_primary_key_column_builders(
metadata: &RegionMetadata,
capacity: usize,
) -> Vec<PrimaryKeyColumnBuilder> {
metadata
.primary_key_columns()
.map(|col| {
if col.column_schema.data_type.is_string() {
PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
capacity,
INIT_DICT_VALUE_CAPACITY,
capacity,
))
} else {
PrimaryKeyColumnBuilder::Vector(
col.column_schema.data_type.create_mutable_vector(capacity),
)
}
})
.collect()
}
/// Sorts the record batch with primary key format.
fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
let total_columns = batch.num_columns();
let sort_columns = vec![
// Primary key column (ascending)
SortColumn {
values: batch.column(total_columns - 3).clone(),
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
// Time index column (ascending)
SortColumn {
values: batch.column(total_columns - 4).clone(),
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
// Sequence column (descending)
SortColumn {
values: batch.column(total_columns - 2).clone(),
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
];
let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
.context(ComputeArrowSnafu)?;
datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
}
#[derive(Debug)]
pub struct EncodedBulkPart {
data: Bytes,
@@ -596,14 +879,20 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
mod tests {
use std::collections::VecDeque;
use api::v1::{Row, WriteHint};
use datafusion_common::ScalarValue;
use datatypes::prelude::{ScalarVector, Value};
use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::memtable::bulk::context::BulkIterContext;
use crate::sst::parquet::format::ReadFormat;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key,
region_metadata_to_row_schema,
};
fn check_binary_array_to_dictionary(
input: &[&[u8]],
@@ -1084,4 +1373,465 @@ mod tests {
1,
);
}
#[test]
fn test_bulk_part_converter_append_and_convert() {
let metadata = metadata_for_test();
let capacity = 100;
let primary_key_codec = build_primary_key_codec(&metadata);
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let mut converter =
BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
let key_values1 = build_key_values_with_ts_seq_values(
&metadata,
"key1".to_string(),
1u32,
vec![1000, 2000].into_iter(),
vec![Some(1.0), Some(2.0)].into_iter(),
1,
);
let key_values2 = build_key_values_with_ts_seq_values(
&metadata,
"key2".to_string(),
2u32,
vec![1500].into_iter(),
vec![Some(3.0)].into_iter(),
2,
);
converter.append_key_values(&key_values1).unwrap();
converter.append_key_values(&key_values2).unwrap();
let bulk_part = converter.convert().unwrap();
assert_eq!(bulk_part.num_rows(), 3);
assert_eq!(bulk_part.min_ts, 1000);
assert_eq!(bulk_part.max_ts, 2000);
assert_eq!(bulk_part.sequence, 2);
assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
// Validate primary key columns are stored
// Schema should include primary key columns k0 and k1 at the beginning
let schema = bulk_part.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec![
"k0",
"k1",
"v0",
"v1",
"ts",
"__primary_key",
"__sequence",
"__op_type"
]
);
}
#[test]
fn test_bulk_part_converter_sorting() {
let metadata = metadata_for_test();
let capacity = 100;
let primary_key_codec = build_primary_key_codec(&metadata);
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let mut converter =
BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
let key_values1 = build_key_values_with_ts_seq_values(
&metadata,
"z_key".to_string(),
3u32,
vec![3000].into_iter(),
vec![Some(3.0)].into_iter(),
3,
);
let key_values2 = build_key_values_with_ts_seq_values(
&metadata,
"a_key".to_string(),
1u32,
vec![1000].into_iter(),
vec![Some(1.0)].into_iter(),
1,
);
let key_values3 = build_key_values_with_ts_seq_values(
&metadata,
"m_key".to_string(),
2u32,
vec![2000].into_iter(),
vec![Some(2.0)].into_iter(),
2,
);
converter.append_key_values(&key_values1).unwrap();
converter.append_key_values(&key_values2).unwrap();
converter.append_key_values(&key_values3).unwrap();
let bulk_part = converter.convert().unwrap();
assert_eq!(bulk_part.num_rows(), 3);
let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
let ts_array = ts_column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
assert_eq!(seq_array.values(), &[1, 2, 3]);
// Validate primary key columns are stored
let schema = bulk_part.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec![
"k0",
"k1",
"v0",
"v1",
"ts",
"__primary_key",
"__sequence",
"__op_type"
]
);
}
#[test]
fn test_bulk_part_converter_empty() {
let metadata = metadata_for_test();
let capacity = 10;
let primary_key_codec = build_primary_key_codec(&metadata);
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
let converter =
BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
let bulk_part = converter.convert().unwrap();
assert_eq!(bulk_part.num_rows(), 0);
assert_eq!(bulk_part.min_ts, i64::MAX);
assert_eq!(bulk_part.max_ts, i64::MIN);
assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
// Validate primary key columns are present in schema even for empty batch
let schema = bulk_part.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec![
"k0",
"k1",
"v0",
"v1",
"ts",
"__primary_key",
"__sequence",
"__op_type"
]
);
}
#[test]
fn test_bulk_part_converter_without_primary_key_columns() {
let metadata = metadata_for_test();
let primary_key_codec = build_primary_key_codec(&metadata);
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
},
);
let capacity = 100;
let mut converter =
BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
let key_values1 = build_key_values_with_ts_seq_values(
&metadata,
"key1".to_string(),
1u32,
vec![1000, 2000].into_iter(),
vec![Some(1.0), Some(2.0)].into_iter(),
1,
);
let key_values2 = build_key_values_with_ts_seq_values(
&metadata,
"key2".to_string(),
2u32,
vec![1500].into_iter(),
vec![Some(3.0)].into_iter(),
2,
);
converter.append_key_values(&key_values1).unwrap();
converter.append_key_values(&key_values2).unwrap();
let bulk_part = converter.convert().unwrap();
assert_eq!(bulk_part.num_rows(), 3);
assert_eq!(bulk_part.min_ts, 1000);
assert_eq!(bulk_part.max_ts, 2000);
assert_eq!(bulk_part.sequence, 2);
assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
// Validate primary key columns are NOT stored individually
let schema = bulk_part.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
);
}
#[allow(clippy::too_many_arguments)]
fn build_key_values_with_sparse_encoding(
metadata: &RegionMetadataRef,
primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
table_id: u32,
tsid: u64,
k0: String,
k1: String,
timestamps: impl Iterator<Item = i64>,
values: impl Iterator<Item = Option<f64>>,
sequence: SequenceNumber,
) -> KeyValues {
// Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
let pk_values = vec![
(ReservedColumnId::table_id(), Value::UInt32(table_id)),
(ReservedColumnId::tsid(), Value::UInt64(tsid)),
(0, Value::String(k0.clone().into())),
(1, Value::String(k1.clone().into())),
];
let mut encoded_key = Vec::new();
primary_key_codec
.encode_values(&pk_values, &mut encoded_key)
.unwrap();
assert!(!encoded_key.is_empty());
// Create schema for sparse encoding: __primary_key, ts, v0, v1
let column_schema = vec![
api::v1::ColumnSchema {
column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
datatype: api::helper::ColumnDataTypeWrapper::try_from(
ConcreteDataType::binary_datatype(),
)
.unwrap()
.datatype() as i32,
semantic_type: api::v1::SemanticType::Tag as i32,
..Default::default()
},
api::v1::ColumnSchema {
column_name: "ts".to_string(),
datatype: api::helper::ColumnDataTypeWrapper::try_from(
ConcreteDataType::timestamp_millisecond_datatype(),
)
.unwrap()
.datatype() as i32,
semantic_type: api::v1::SemanticType::Timestamp as i32,
..Default::default()
},
api::v1::ColumnSchema {
column_name: "v0".to_string(),
datatype: api::helper::ColumnDataTypeWrapper::try_from(
ConcreteDataType::int64_datatype(),
)
.unwrap()
.datatype() as i32,
semantic_type: api::v1::SemanticType::Field as i32,
..Default::default()
},
api::v1::ColumnSchema {
column_name: "v1".to_string(),
datatype: api::helper::ColumnDataTypeWrapper::try_from(
ConcreteDataType::float64_datatype(),
)
.unwrap()
.datatype() as i32,
semantic_type: api::v1::SemanticType::Field as i32,
..Default::default()
},
];
let rows = timestamps
.zip(values)
.map(|(ts, v)| Row {
values: vec![
api::v1::Value {
value_data: Some(api::v1::value::ValueData::BinaryValue(
encoded_key.clone(),
)),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::I64Value(ts)),
},
api::v1::Value {
value_data: v.map(api::v1::value::ValueData::F64Value),
},
],
})
.collect();
let mutation = api::v1::Mutation {
op_type: 1,
sequence,
rows: Some(api::v1::Rows {
schema: column_schema,
rows,
}),
write_hint: Some(WriteHint {
primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
}),
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}
#[test]
fn test_bulk_part_converter_sparse_primary_key_encoding() {
use api::v1::SemanticType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![0, 1])
.primary_key_encoding(PrimaryKeyEncoding::Sparse);
let metadata = Arc::new(builder.build().unwrap());
let primary_key_codec = build_primary_key_codec(&metadata);
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
let capacity = 100;
let mut converter =
BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
let key_values1 = build_key_values_with_sparse_encoding(
&metadata,
&primary_key_codec,
2048u32, // table_id
100u64, // tsid
"key11".to_string(),
"key21".to_string(),
vec![1000, 2000].into_iter(),
vec![Some(1.0), Some(2.0)].into_iter(),
1,
);
let key_values2 = build_key_values_with_sparse_encoding(
&metadata,
&primary_key_codec,
4096u32, // table_id
200u64, // tsid
"key12".to_string(),
"key22".to_string(),
vec![1500].into_iter(),
vec![Some(3.0)].into_iter(),
2,
);
converter.append_key_values(&key_values1).unwrap();
converter.append_key_values(&key_values2).unwrap();
let bulk_part = converter.convert().unwrap();
assert_eq!(bulk_part.num_rows(), 3);
assert_eq!(bulk_part.min_ts, 1000);
assert_eq!(bulk_part.max_ts, 2000);
assert_eq!(bulk_part.sequence, 2);
assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
// For sparse encoding, primary key columns should NOT be stored individually
// even when store_primary_key_columns is true, because sparse encoding
// stores the encoded primary key in the __primary_key column
let schema = bulk_part.batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
);
// Verify the __primary_key column contains encoded sparse keys
let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
let dict_array = primary_key_column
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
.unwrap();
// Should have non-zero entries indicating encoded primary keys
assert!(!dict_array.is_empty());
assert_eq!(dict_array.len(), 3); // 3 rows total
// Verify values are properly encoded binary data (not empty)
let values = dict_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
for i in 0..values.len() {
assert!(
!values.value(i).is_empty(),
"Encoded primary key should not be empty"
);
}
}
}

View File

@@ -350,6 +350,10 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder {
&self.config,
))
}
fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
false
}
}
struct PartitionTreeIterBuilder {
@@ -373,6 +377,7 @@ impl IterBuilder for PartitionTreeIterBuilder {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
@@ -402,9 +407,9 @@ mod tests {
fn write_iter_sorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
} else {
memtable_util::metadata_with_primary_key(vec![], false)
Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
};
let timestamps = (0..100).collect::<Vec<_>>();
let kvs =
@@ -447,9 +452,9 @@ mod tests {
fn write_iter_unsorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
} else {
memtable_util::metadata_with_primary_key(vec![], false)
Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
};
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
@@ -512,9 +517,9 @@ mod tests {
fn write_iter_projection(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
} else {
memtable_util::metadata_with_primary_key(vec![], false)
Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
};
// Try to build a memtable via the builder.
let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
@@ -552,7 +557,7 @@ mod tests {
}
fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
@@ -602,7 +607,7 @@ mod tests {
#[test]
fn test_memtable_filter() {
let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
// Try to build a memtable via the builder.
let memtable = PartitionTreeMemtableBuilder::new(
PartitionTreeConfig {

View File

@@ -28,17 +28,19 @@ use datatypes::arrow::array::{
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
};
use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
use datatypes::arrow::datatypes::{DataType, Int64Type};
use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
use smallvec::{smallvec, SmallVec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use crate::error;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
/// Initial time window if not specified.
const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
@@ -208,6 +210,12 @@ pub struct TimePartitions {
metadata: RegionMetadataRef,
/// Builder of memtables.
builder: MemtableBuilderRef,
/// Primary key encoder.
primary_key_codec: Arc<dyn PrimaryKeyCodec>,
/// Cached schema for bulk insert.
/// This field is Some if the memtable uses bulk insert.
bulk_schema: Option<SchemaRef>,
}
pub type TimePartitionsRef = Arc<TimePartitions>;
@@ -221,11 +229,19 @@ impl TimePartitions {
part_duration: Option<Duration>,
) -> Self {
let inner = PartitionsInner::new(next_memtable_id);
let primary_key_codec = build_primary_key_codec(&metadata);
let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
to_flat_sst_arrow_schema(&metadata, &opts)
});
Self {
inner: Mutex::new(inner),
part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
metadata,
builder,
primary_key_codec,
bulk_schema,
}
}
@@ -233,6 +249,21 @@ impl TimePartitions {
///
/// It creates new partitions if necessary.
pub fn write(&self, kvs: &KeyValues) -> Result<()> {
if let Some(bulk_schema) = &self.bulk_schema {
let mut converter = BulkPartConverter::new(
&self.metadata,
bulk_schema.clone(),
kvs.num_rows(),
self.primary_key_codec.clone(),
// Always store primary keys for bulk mode.
true,
);
converter.append_key_values(kvs)?;
let part = converter.convert()?;
return self.write_bulk(part);
}
// Get all parts.
let parts = self.list_partitions();
@@ -413,6 +444,8 @@ impl TimePartitions {
part_duration,
metadata: metadata.clone(),
builder: self.builder.clone(),
primary_key_codec: self.primary_key_codec.clone(),
bulk_schema: self.bulk_schema.clone(),
}
}

View File

@@ -112,6 +112,12 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder {
))
}
}
fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
// Now if we can use simple bulk memtable, the input request is already
// a bulk write request and won't call this method.
false
}
}
/// Memtable implementation that groups rows by their primary key.
@@ -828,7 +834,7 @@ impl Series {
}
/// `ValueBuilder` holds all the vector builders for field columns.
struct ValueBuilder {
pub(crate) struct ValueBuilder {
timestamp: Vec<i64>,
timestamp_type: ConcreteDataType,
sequence: Vec<u64>,
@@ -872,7 +878,7 @@ impl ValueBuilder {
/// Returns the size of field values.
///
/// In this method, we don't check the data type of the value, because it is already checked in the caller.
fn push<'a>(
pub(crate) fn push<'a>(
&mut self,
ts: ValueRef,
sequence: u64,
@@ -1103,10 +1109,10 @@ impl ValueBuilder {
/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
#[derive(Clone)]
pub struct Values {
timestamp: VectorRef,
sequence: Arc<UInt64Vector>,
op_type: Arc<UInt8Vector>,
fields: Vec<VectorRef>,
pub(crate) timestamp: VectorRef,
pub(crate) sequence: Arc<UInt64Vector>,
pub(crate) op_type: Arc<UInt8Vector>,
pub(crate) fields: Vec<VectorRef>,
}
impl Values {

View File

@@ -21,6 +21,7 @@ use common_base::readable_size::ReadableSize;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadata;
use store_api::storage::consts::{
OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
@@ -63,6 +64,101 @@ pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
Arc::new(Schema::new(fields))
}
/// Options of flat schema.
pub struct FlatSchemaOptions {
/// Whether to store primary key columns additionally instead of an encoded column.
pub raw_pk_columns: bool,
/// Whether to use dictionary encoding for string primary key columns
/// when storing primary key columns.
/// Only takes effect when `raw_pk_columns` is true.
pub string_pk_use_dict: bool,
}
impl Default for FlatSchemaOptions {
fn default() -> Self {
Self {
raw_pk_columns: true,
string_pk_use_dict: true,
}
}
}
impl FlatSchemaOptions {
/// Creates a options according to the primary key encoding.
pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
if encoding == PrimaryKeyEncoding::Dense {
Self::default()
} else {
Self {
raw_pk_columns: false,
string_pk_use_dict: false,
}
}
}
}
/// Gets the arrow schema to store in parquet.
///
/// The schema is:
/// ```text
/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type
/// ```
///
/// # Panics
/// Panics if the metadata is invalid.
pub fn to_flat_sst_arrow_schema(
metadata: &RegionMetadata,
options: &FlatSchemaOptions,
) -> SchemaRef {
let num_fields = if options.raw_pk_columns {
metadata.column_metadatas.len() + 3
} else {
metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
};
let mut fields = Vec::with_capacity(num_fields);
let schema = metadata.schema.arrow_schema();
if options.raw_pk_columns {
for pk_id in &metadata.primary_key {
let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
if options.string_pk_use_dict
&& metadata.column_metadatas[pk_index]
.column_schema
.data_type
.is_string()
{
let field = &schema.fields[pk_index];
let field = Arc::new(Field::new_dictionary(
field.name(),
datatypes::arrow::datatypes::DataType::UInt32,
field.data_type().clone(),
field.is_nullable(),
));
fields.push(field);
} else {
fields.push(schema.fields[pk_index].clone());
}
}
}
let remaining_fields = schema
.fields()
.iter()
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
Some(field.clone())
} else {
None
}
})
.chain([metadata.time_index_field()])
.chain(internal_fields());
for field in remaining_fields {
fields.push(field);
}
Arc::new(Schema::new(fields))
}
/// Fields for internal columns.
fn internal_fields() -> [FieldRef; 3] {
// Internal columns are always not null.

View File

@@ -27,7 +27,9 @@ use datatypes::schema::ColumnSchema;
use datatypes::vectors::TimestampMillisecondVector;
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use table::predicate::Predicate;
@@ -126,13 +128,17 @@ impl MemtableBuilder for EmptyMemtableBuilder {
fn build(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(EmptyMemtable::new(id))
}
fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
true
}
}
/// Creates a region metadata to test memtable with default pk.
///
/// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`.
pub(crate) fn metadata_for_test() -> RegionMetadataRef {
metadata_with_primary_key(vec![0, 1], false)
Arc::new(metadata_with_primary_key(vec![0, 1], false))
}
/// Creates a region metadata to test memtable and specific primary key.
@@ -142,7 +148,7 @@ pub(crate) fn metadata_for_test() -> RegionMetadataRef {
pub fn metadata_with_primary_key(
primary_key: Vec<ColumnId>,
enable_table_id: bool,
) -> RegionMetadataRef {
) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
let maybe_table_id = if enable_table_id { "__table_id" } else { "k1" };
builder
@@ -180,8 +186,7 @@ pub fn metadata_with_primary_key(
column_id: 4,
})
.primary_key(primary_key);
let region_metadata = builder.build().unwrap();
Arc::new(region_metadata)
builder.build().unwrap()
}
fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> SemanticType {

View File

@@ -33,7 +33,7 @@ use crate::error::Result;
use crate::flush::FlushScheduler;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::{WorkerRequest, WorkerRequestWithTime};
use crate::request::WorkerRequestWithTime;
use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;