From 52466fdd92c209ae4d921d28457c1aa9cfb3c8e8 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 1 Aug 2025 15:59:11 +0800 Subject: [PATCH] feat: Implement a converter to converts KeyValues into BulkPart (#6620) * chore: add api to memtable to check bulk capability Signed-off-by: evenyag * feat: Add a converter to convert KeyValues into BulkPart Signed-off-by: evenyag * feat: move supports_bulk_insert to MemtableBuilder Signed-off-by: evenyag * chore: benchmark Signed-off-by: evenyag * feat: use write_bulk if the memtable benefits from it Signed-off-by: evenyag * test: test BulkPartConverter Signed-off-by: evenyag * feat: add a flag to store unencoded primary keys Signed-off-by: evenyag * feat: cache schema for converter Implements to_flat_sst_arrow_schema Signed-off-by: evenyag * chore: simplify tests Signed-off-by: evenyag * fix: don't use bulk convert branch now Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * chore: address review comments * simplify primary_key_column_builders check * return error if value is not string Signed-off-by: evenyag * feat: add FlatSchemaOptions::from_encoding and test sparse encoding Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/benches/memtable_bench.rs | 72 +- src/mito2/src/memtable.rs | 6 + src/mito2/src/memtable/bulk.rs | 2 +- src/mito2/src/memtable/bulk/part.rs | 776 +++++++++++++++++++++- src/mito2/src/memtable/partition_tree.rs | 21 +- src/mito2/src/memtable/time_partition.rs | 37 +- src/mito2/src/memtable/time_series.rs | 18 +- src/mito2/src/sst.rs | 96 +++ src/mito2/src/test_util/memtable_util.rs | 15 +- src/mito2/src/test_util/scheduler_util.rs | 2 +- 10 files changed, 1007 insertions(+), 38 deletions(-) diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 040355116b..76a5a0ca48 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -21,10 +21,12 @@ use datafusion_common::Column; use datafusion_expr::{lit, Expr}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; +use mito2::memtable::bulk::part::BulkPartConverter; use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable}; use mito2::memtable::time_series::TimeSeriesMemtable; use mito2::memtable::{KeyValues, Memtable}; use mito2::region::options::MergeMode; +use mito2::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema}; use mito_codec::row_converter::DensePrimaryKeyCodec; use rand::rngs::ThreadRng; @@ -38,7 +40,7 @@ use table::predicate::Predicate; /// Writes rows. fn write_rows(c: &mut Criterion) { - let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true); + let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true)); let timestamps = (0..100).collect::>(); // Note that this test only generate one time series. @@ -359,5 +361,71 @@ fn cpu_metadata() -> RegionMetadata { builder.build().unwrap() } -criterion_group!(benches, write_rows, full_scan, filter_1_host); +fn bulk_part_converter(c: &mut Criterion) { + let metadata = Arc::new(cpu_metadata()); + let start_sec = 1710043200; + + let mut group = c.benchmark_group("bulk_part_converter"); + + for &rows in &[1024, 2048, 4096, 8192] { + // Benchmark without storing primary key columns (baseline) + group.bench_with_input(format!("{}_rows_no_pk_columns", rows), &rows, |b, &rows| { + b.iter(|| { + let generator = + CpuDataGenerator::new(metadata.clone(), rows, start_sec, start_sec + 1); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions { + raw_pk_columns: false, + string_pk_use_dict: false, + }, + ); + let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false); + + if let Some(kvs) = generator.iter().next() { + converter.append_key_values(&kvs).unwrap(); + } + + let _bulk_part = converter.convert().unwrap(); + }); + }); + + // Benchmark with storing primary key columns + group.bench_with_input( + format!("{}_rows_with_pk_columns", rows), + &rows, + |b, &rows| { + b.iter(|| { + let generator = + CpuDataGenerator::new(metadata.clone(), rows, start_sec, start_sec + 1); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions { + raw_pk_columns: true, + string_pk_use_dict: true, + }, + ); + let mut converter = + BulkPartConverter::new(&metadata, schema, rows, codec, true); + + if let Some(kvs) = generator.iter().next() { + converter.append_key_values(&kvs).unwrap(); + } + + let _bulk_part = converter.convert().unwrap(); + }); + }, + ); + } +} + +criterion_group!( + benches, + write_rows, + full_scan, + filter_1_host, + bulk_part_converter, +); criterion_main!(benches); diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index cf2aa4c576..b5d4000d9e 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -204,6 +204,12 @@ pub type MemtableRef = Arc; pub trait MemtableBuilder: Send + Sync + fmt::Debug { /// Builds a new memtable instance. fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef; + + /// Returns true if the memtable supports bulk insert and benefits from it. + fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool { + let _metadata = metadata; + false + } } pub type MemtableBuilderRef = Arc; diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index c15b070546..706f6f0fe2 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -29,7 +29,7 @@ use crate::memtable::{ #[allow(unused)] mod context; #[allow(unused)] -pub(crate) mod part; +pub mod part; mod part_reader; mod row_group_reader; diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 17ea2987d8..284ca5a3d9 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -24,41 +24,49 @@ use bytes::Bytes; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_recordbatch::DfRecordBatch as RecordBatch; use common_time::timestamp::TimeUnit; -use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder}; use datatypes::arrow; use datatypes::arrow::array::{ - Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array, - UInt8Builder, + Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder, + StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt64Builder, + UInt8Array, UInt8Builder, }; -use datatypes::arrow::compute::TakeOptions; -use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions}; +use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; -use datatypes::value::Value; +use datatypes::value::{Value, ValueRef}; use datatypes::vectors::Helper; -use mito_codec::key_values::{KeyValue, KeyValuesRef}; -use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; +use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef}; +use mito_codec::row_converter::{ + build_primary_key_codec, DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, +}; use parquet::arrow::ArrowWriter; use parquet::data_type::AsBytes; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use snafu::{OptionExt, ResultExt, Snafu}; -use store_api::metadata::RegionMetadataRef; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; use store_api::storage::SequenceNumber; use table::predicate::Predicate; use crate::error::{ - self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result, + self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu, + EncodeSnafu, NewRecordBatchSnafu, Result, }; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::BulkPartIter; +use crate::memtable::time_series::{ValueBuilder, Values}; use crate::memtable::BoxedBatchIterator; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; +const INIT_DICT_VALUE_CAPACITY: usize = 8; + #[derive(Clone)] pub struct BulkPart { pub batch: RecordBatch, @@ -209,6 +217,281 @@ impl BulkPart { } } +/// Builder type for primary key dictionary array. +type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder; + +/// Primary key column builder for handling strings specially. +enum PrimaryKeyColumnBuilder { + /// String dictionary builder for string types. + StringDict(StringDictionaryBuilder), + /// Generic mutable vector for other types. + Vector(Box), +} + +impl PrimaryKeyColumnBuilder { + /// Appends a value to the builder. + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + match self { + PrimaryKeyColumnBuilder::StringDict(builder) => { + if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? { + // We know the value is a string. + builder.append_value(s); + } else { + builder.append_null(); + } + } + PrimaryKeyColumnBuilder::Vector(builder) => { + builder.push_value_ref(value); + } + } + Ok(()) + } + + /// Converts the builder to an ArrayRef. + fn into_arrow_array(self) -> ArrayRef { + match self { + PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()), + PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(), + } + } +} + +/// Converter that converts structs into [BulkPart]. +pub struct BulkPartConverter { + /// Region metadata. + region_metadata: RegionMetadataRef, + /// Schema of the converted batch. + schema: SchemaRef, + /// Primary key codec for encoding keys + primary_key_codec: Arc, + /// Buffer for encoding primary key. + key_buf: Vec, + /// Primary key array builder. + key_array_builder: PrimaryKeyArrayBuilder, + /// Builders for non-primary key columns. + value_builder: ValueBuilder, + /// Builders for individual primary key columns. + /// The order of builders is the same as the order of primary key columns in the region metadata. + primary_key_column_builders: Vec, + + /// Max timestamp value. + max_ts: i64, + /// Min timestamp value. + min_ts: i64, + /// Max sequence number. + max_sequence: SequenceNumber, +} + +impl BulkPartConverter { + /// Creates a new converter. + /// + /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it + /// stores primary key columns in arrays additionally. + pub fn new( + region_metadata: &RegionMetadataRef, + schema: SchemaRef, + capacity: usize, + primary_key_codec: Arc, + store_primary_key_columns: bool, + ) -> Self { + debug_assert_eq!( + region_metadata.primary_key_encoding, + primary_key_codec.encoding() + ); + + let primary_key_column_builders = if store_primary_key_columns + && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse + { + new_primary_key_column_builders(region_metadata, capacity) + } else { + Vec::new() + }; + + Self { + region_metadata: region_metadata.clone(), + schema, + primary_key_codec, + key_buf: Vec::new(), + key_array_builder: PrimaryKeyArrayBuilder::new(), + value_builder: ValueBuilder::new(region_metadata, capacity), + primary_key_column_builders, + min_ts: i64::MAX, + max_ts: i64::MIN, + max_sequence: SequenceNumber::MIN, + } + } + + /// Appends a [KeyValues] into the converter. + pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> { + for kv in key_values.iter() { + self.append_key_value(&kv)?; + } + + Ok(()) + } + + /// Appends a [KeyValue] to builders. + /// + /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue]. + fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> { + // Handles primary key based on encoding type + if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse { + // For sparse encoding, the primary key is already encoded in the KeyValue + // Gets the first (and only) primary key value which contains the encoded key + let mut primary_keys = kv.primary_keys(); + if let Some(encoded) = primary_keys + .next() + .context(ColumnNotFoundSnafu { + column: PRIMARY_KEY_COLUMN_NAME, + })? + .as_binary() + .context(DataTypeMismatchSnafu)? + { + self.key_array_builder + .append(encoded) + .context(ComputeArrowSnafu)?; + } else { + self.key_array_builder + .append("") + .context(ComputeArrowSnafu)?; + } + } else { + // For dense encoding, we need to encode the primary key columns + self.key_buf.clear(); + self.primary_key_codec + .encode_key_value(kv, &mut self.key_buf) + .context(EncodeSnafu)?; + self.key_array_builder + .append(&self.key_buf) + .context(ComputeArrowSnafu)?; + }; + + // If storing primary key columns, append values to individual builders + if !self.primary_key_column_builders.is_empty() { + for (builder, pk_value) in self + .primary_key_column_builders + .iter_mut() + .zip(kv.primary_keys()) + { + builder.push_value_ref(pk_value)?; + } + } + + // Pushes other columns. + self.value_builder.push( + kv.timestamp(), + kv.sequence(), + kv.op_type() as u8, + kv.fields(), + ); + + // Updates statistics + // Safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + self.min_ts = self.min_ts.min(ts); + self.max_ts = self.max_ts.max(ts); + self.max_sequence = self.max_sequence.max(kv.sequence()); + + Ok(()) + } + + /// Converts buffered content into a [BulkPart]. + /// + /// It sorts the record batch by (primary key, timestamp, sequence desc). + pub fn convert(mut self) -> Result { + let values = Values::from(self.value_builder); + let mut columns = + Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len()); + + // Build primary key column arrays if enabled. + for builder in self.primary_key_column_builders { + columns.push(builder.into_arrow_array()); + } + // Then fields columns. + columns.extend(values.fields.iter().map(|field| field.to_arrow_array())); + // Time index. + let timestamp_index = columns.len(); + columns.push(values.timestamp.to_arrow_array()); + // Primary key. + let pk_array = self.key_array_builder.finish(); + columns.push(Arc::new(pk_array)); + // Sequence and op type. + columns.push(values.sequence.to_arrow_array()); + columns.push(values.op_type.to_arrow_array()); + + let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?; + // Sorts the record batch. + let batch = sort_primary_key_record_batch(&batch)?; + + Ok(BulkPart { + batch, + max_ts: self.max_ts, + min_ts: self.min_ts, + sequence: self.max_sequence, + timestamp_index, + raw_data: None, + }) + } +} + +fn new_primary_key_column_builders( + metadata: &RegionMetadata, + capacity: usize, +) -> Vec { + metadata + .primary_key_columns() + .map(|col| { + if col.column_schema.data_type.is_string() { + PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity( + capacity, + INIT_DICT_VALUE_CAPACITY, + capacity, + )) + } else { + PrimaryKeyColumnBuilder::Vector( + col.column_schema.data_type.create_mutable_vector(capacity), + ) + } + }) + .collect() +} + +/// Sorts the record batch with primary key format. +fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result { + let total_columns = batch.num_columns(); + let sort_columns = vec![ + // Primary key column (ascending) + SortColumn { + values: batch.column(total_columns - 3).clone(), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + // Time index column (ascending) + SortColumn { + values: batch.column(total_columns - 4).clone(), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + // Sequence column (descending) + SortColumn { + values: batch.column(total_columns - 2).clone(), + options: Some(SortOptions { + descending: true, + nulls_first: true, + }), + }, + ]; + + let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None) + .context(ComputeArrowSnafu)?; + + datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu) +} + #[derive(Debug)] pub struct EncodedBulkPart { data: Bytes, @@ -596,14 +879,20 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result { mod tests { use std::collections::VecDeque; + use api::v1::{Row, WriteHint}; use datafusion_common::ScalarValue; - use datatypes::prelude::{ScalarVector, Value}; + use datatypes::prelude::{ConcreteDataType, ScalarVector, Value}; use datatypes::vectors::{Float64Vector, TimestampMillisecondVector}; + use store_api::storage::consts::ReservedColumnId; use super::*; use crate::memtable::bulk::context::BulkIterContext; use crate::sst::parquet::format::ReadFormat; - use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; + use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; + use crate::test_util::memtable_util::{ + build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key, + region_metadata_to_row_schema, + }; fn check_binary_array_to_dictionary( input: &[&[u8]], @@ -1084,4 +1373,465 @@ mod tests { 1, ); } + + #[test] + fn test_bulk_part_converter_append_and_convert() { + let metadata = metadata_for_test(); + let capacity = 100; + let primary_key_codec = build_primary_key_codec(&metadata); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + + let mut converter = + BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true); + + let key_values1 = build_key_values_with_ts_seq_values( + &metadata, + "key1".to_string(), + 1u32, + vec![1000, 2000].into_iter(), + vec![Some(1.0), Some(2.0)].into_iter(), + 1, + ); + + let key_values2 = build_key_values_with_ts_seq_values( + &metadata, + "key2".to_string(), + 2u32, + vec![1500].into_iter(), + vec![Some(3.0)].into_iter(), + 2, + ); + + converter.append_key_values(&key_values1).unwrap(); + converter.append_key_values(&key_values2).unwrap(); + + let bulk_part = converter.convert().unwrap(); + + assert_eq!(bulk_part.num_rows(), 3); + assert_eq!(bulk_part.min_ts, 1000); + assert_eq!(bulk_part.max_ts, 2000); + assert_eq!(bulk_part.sequence, 2); + assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4); + + // Validate primary key columns are stored + // Schema should include primary key columns k0 and k1 at the beginning + let schema = bulk_part.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec![ + "k0", + "k1", + "v0", + "v1", + "ts", + "__primary_key", + "__sequence", + "__op_type" + ] + ); + } + + #[test] + fn test_bulk_part_converter_sorting() { + let metadata = metadata_for_test(); + let capacity = 100; + let primary_key_codec = build_primary_key_codec(&metadata); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + + let mut converter = + BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true); + + let key_values1 = build_key_values_with_ts_seq_values( + &metadata, + "z_key".to_string(), + 3u32, + vec![3000].into_iter(), + vec![Some(3.0)].into_iter(), + 3, + ); + + let key_values2 = build_key_values_with_ts_seq_values( + &metadata, + "a_key".to_string(), + 1u32, + vec![1000].into_iter(), + vec![Some(1.0)].into_iter(), + 1, + ); + + let key_values3 = build_key_values_with_ts_seq_values( + &metadata, + "m_key".to_string(), + 2u32, + vec![2000].into_iter(), + vec![Some(2.0)].into_iter(), + 2, + ); + + converter.append_key_values(&key_values1).unwrap(); + converter.append_key_values(&key_values2).unwrap(); + converter.append_key_values(&key_values3).unwrap(); + + let bulk_part = converter.convert().unwrap(); + + assert_eq!(bulk_part.num_rows(), 3); + + let ts_column = bulk_part.batch.column(bulk_part.timestamp_index); + let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2); + + let ts_array = ts_column + .as_any() + .downcast_ref::() + .unwrap(); + let seq_array = seq_column.as_any().downcast_ref::().unwrap(); + + assert_eq!(ts_array.values(), &[1000, 2000, 3000]); + assert_eq!(seq_array.values(), &[1, 2, 3]); + + // Validate primary key columns are stored + let schema = bulk_part.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec![ + "k0", + "k1", + "v0", + "v1", + "ts", + "__primary_key", + "__sequence", + "__op_type" + ] + ); + } + + #[test] + fn test_bulk_part_converter_empty() { + let metadata = metadata_for_test(); + let capacity = 10; + let primary_key_codec = build_primary_key_codec(&metadata); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + + let converter = + BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true); + + let bulk_part = converter.convert().unwrap(); + + assert_eq!(bulk_part.num_rows(), 0); + assert_eq!(bulk_part.min_ts, i64::MAX); + assert_eq!(bulk_part.max_ts, i64::MIN); + assert_eq!(bulk_part.sequence, SequenceNumber::MIN); + + // Validate primary key columns are present in schema even for empty batch + let schema = bulk_part.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec![ + "k0", + "k1", + "v0", + "v1", + "ts", + "__primary_key", + "__sequence", + "__op_type" + ] + ); + } + + #[test] + fn test_bulk_part_converter_without_primary_key_columns() { + let metadata = metadata_for_test(); + let primary_key_codec = build_primary_key_codec(&metadata); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions { + raw_pk_columns: false, + string_pk_use_dict: true, + }, + ); + + let capacity = 100; + let mut converter = + BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false); + + let key_values1 = build_key_values_with_ts_seq_values( + &metadata, + "key1".to_string(), + 1u32, + vec![1000, 2000].into_iter(), + vec![Some(1.0), Some(2.0)].into_iter(), + 1, + ); + + let key_values2 = build_key_values_with_ts_seq_values( + &metadata, + "key2".to_string(), + 2u32, + vec![1500].into_iter(), + vec![Some(3.0)].into_iter(), + 2, + ); + + converter.append_key_values(&key_values1).unwrap(); + converter.append_key_values(&key_values2).unwrap(); + + let bulk_part = converter.convert().unwrap(); + + assert_eq!(bulk_part.num_rows(), 3); + assert_eq!(bulk_part.min_ts, 1000); + assert_eq!(bulk_part.max_ts, 2000); + assert_eq!(bulk_part.sequence, 2); + assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4); + + // Validate primary key columns are NOT stored individually + let schema = bulk_part.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"] + ); + } + + #[allow(clippy::too_many_arguments)] + fn build_key_values_with_sparse_encoding( + metadata: &RegionMetadataRef, + primary_key_codec: &Arc, + table_id: u32, + tsid: u64, + k0: String, + k1: String, + timestamps: impl Iterator, + values: impl Iterator>, + sequence: SequenceNumber, + ) -> KeyValues { + // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec + let pk_values = vec![ + (ReservedColumnId::table_id(), Value::UInt32(table_id)), + (ReservedColumnId::tsid(), Value::UInt64(tsid)), + (0, Value::String(k0.clone().into())), + (1, Value::String(k1.clone().into())), + ]; + let mut encoded_key = Vec::new(); + primary_key_codec + .encode_values(&pk_values, &mut encoded_key) + .unwrap(); + assert!(!encoded_key.is_empty()); + + // Create schema for sparse encoding: __primary_key, ts, v0, v1 + let column_schema = vec![ + api::v1::ColumnSchema { + column_name: PRIMARY_KEY_COLUMN_NAME.to_string(), + datatype: api::helper::ColumnDataTypeWrapper::try_from( + ConcreteDataType::binary_datatype(), + ) + .unwrap() + .datatype() as i32, + semantic_type: api::v1::SemanticType::Tag as i32, + ..Default::default() + }, + api::v1::ColumnSchema { + column_name: "ts".to_string(), + datatype: api::helper::ColumnDataTypeWrapper::try_from( + ConcreteDataType::timestamp_millisecond_datatype(), + ) + .unwrap() + .datatype() as i32, + semantic_type: api::v1::SemanticType::Timestamp as i32, + ..Default::default() + }, + api::v1::ColumnSchema { + column_name: "v0".to_string(), + datatype: api::helper::ColumnDataTypeWrapper::try_from( + ConcreteDataType::int64_datatype(), + ) + .unwrap() + .datatype() as i32, + semantic_type: api::v1::SemanticType::Field as i32, + ..Default::default() + }, + api::v1::ColumnSchema { + column_name: "v1".to_string(), + datatype: api::helper::ColumnDataTypeWrapper::try_from( + ConcreteDataType::float64_datatype(), + ) + .unwrap() + .datatype() as i32, + semantic_type: api::v1::SemanticType::Field as i32, + ..Default::default() + }, + ]; + + let rows = timestamps + .zip(values) + .map(|(ts, v)| Row { + values: vec![ + api::v1::Value { + value_data: Some(api::v1::value::ValueData::BinaryValue( + encoded_key.clone(), + )), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::I64Value(ts)), + }, + api::v1::Value { + value_data: v.map(api::v1::value::ValueData::F64Value), + }, + ], + }) + .collect(); + + let mutation = api::v1::Mutation { + op_type: 1, + sequence, + rows: Some(api::v1::Rows { + schema: column_schema, + rows, + }), + write_hint: Some(WriteHint { + primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(), + }), + }; + KeyValues::new(metadata.as_ref(), mutation).unwrap() + } + + #[test] + fn test_bulk_part_converter_sparse_primary_key_encoding() { + use api::v1::SemanticType; + use datatypes::schema::ColumnSchema; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 4, + }) + .primary_key(vec![0, 1]) + .primary_key_encoding(PrimaryKeyEncoding::Sparse); + let metadata = Arc::new(builder.build().unwrap()); + + let primary_key_codec = build_primary_key_codec(&metadata); + let schema = to_flat_sst_arrow_schema( + &metadata, + &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding), + ); + + assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse); + assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse); + + let capacity = 100; + let mut converter = + BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true); + + let key_values1 = build_key_values_with_sparse_encoding( + &metadata, + &primary_key_codec, + 2048u32, // table_id + 100u64, // tsid + "key11".to_string(), + "key21".to_string(), + vec![1000, 2000].into_iter(), + vec![Some(1.0), Some(2.0)].into_iter(), + 1, + ); + + let key_values2 = build_key_values_with_sparse_encoding( + &metadata, + &primary_key_codec, + 4096u32, // table_id + 200u64, // tsid + "key12".to_string(), + "key22".to_string(), + vec![1500].into_iter(), + vec![Some(3.0)].into_iter(), + 2, + ); + + converter.append_key_values(&key_values1).unwrap(); + converter.append_key_values(&key_values2).unwrap(); + + let bulk_part = converter.convert().unwrap(); + + assert_eq!(bulk_part.num_rows(), 3); + assert_eq!(bulk_part.min_ts, 1000); + assert_eq!(bulk_part.max_ts, 2000); + assert_eq!(bulk_part.sequence, 2); + assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4); + + // For sparse encoding, primary key columns should NOT be stored individually + // even when store_primary_key_columns is true, because sparse encoding + // stores the encoded primary key in the __primary_key column + let schema = bulk_part.batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"] + ); + + // Verify the __primary_key column contains encoded sparse keys + let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap(); + let dict_array = primary_key_column + .as_any() + .downcast_ref::>() + .unwrap(); + + // Should have non-zero entries indicating encoded primary keys + assert!(!dict_array.is_empty()); + assert_eq!(dict_array.len(), 3); // 3 rows total + + // Verify values are properly encoded binary data (not empty) + let values = dict_array + .values() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..values.len() { + assert!( + !values.value(i).is_empty(), + "Encoded primary key should not be empty" + ); + } + } } diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 50f8e4cc8e..fcd9376a8f 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -350,6 +350,10 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder { &self.config, )) } + + fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool { + false + } } struct PartitionTreeIterBuilder { @@ -373,6 +377,7 @@ impl IterBuilder for PartitionTreeIterBuilder { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; use api::v1::value::ValueData; use api::v1::{Mutation, OpType, Row, Rows, SemanticType}; @@ -402,9 +407,9 @@ mod tests { fn write_iter_sorted_input(has_pk: bool) { let metadata = if has_pk { - memtable_util::metadata_with_primary_key(vec![1, 0], true) + Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true)) } else { - memtable_util::metadata_with_primary_key(vec![], false) + Arc::new(memtable_util::metadata_with_primary_key(vec![], false)) }; let timestamps = (0..100).collect::>(); let kvs = @@ -447,9 +452,9 @@ mod tests { fn write_iter_unsorted_input(has_pk: bool) { let metadata = if has_pk { - memtable_util::metadata_with_primary_key(vec![1, 0], true) + Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true)) } else { - memtable_util::metadata_with_primary_key(vec![], false) + Arc::new(memtable_util::metadata_with_primary_key(vec![], false)) }; let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); let memtable = PartitionTreeMemtable::new( @@ -512,9 +517,9 @@ mod tests { fn write_iter_projection(has_pk: bool) { let metadata = if has_pk { - memtable_util::metadata_with_primary_key(vec![1, 0], true) + Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true)) } else { - memtable_util::metadata_with_primary_key(vec![], false) + Arc::new(memtable_util::metadata_with_primary_key(vec![], false)) }; // Try to build a memtable via the builder. let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None) @@ -552,7 +557,7 @@ mod tests { } fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) { - let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true); + let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true)); let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); let memtable = PartitionTreeMemtable::new( 1, @@ -602,7 +607,7 @@ mod tests { #[test] fn test_memtable_filter() { - let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false); + let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false)); // Try to build a memtable via the builder. let memtable = PartitionTreeMemtableBuilder::new( PartitionTreeConfig { diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index fd7e0e1a17..ce33beb329 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -28,17 +28,19 @@ use datatypes::arrow::array::{ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer}; -use datatypes::arrow::datatypes::{DataType, Int64Type}; +use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef}; use mito_codec::key_values::KeyValue; +use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec}; use smallvec::{smallvec, SmallVec}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use crate::error; use crate::error::{InvalidRequestSnafu, Result}; -use crate::memtable::bulk::part::BulkPart; +use crate::memtable::bulk::part::{BulkPart, BulkPartConverter}; use crate::memtable::version::SmallMemtableVec; use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; /// Initial time window if not specified. const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1); @@ -208,6 +210,12 @@ pub struct TimePartitions { metadata: RegionMetadataRef, /// Builder of memtables. builder: MemtableBuilderRef, + /// Primary key encoder. + primary_key_codec: Arc, + + /// Cached schema for bulk insert. + /// This field is Some if the memtable uses bulk insert. + bulk_schema: Option, } pub type TimePartitionsRef = Arc; @@ -221,11 +229,19 @@ impl TimePartitions { part_duration: Option, ) -> Self { let inner = PartitionsInner::new(next_memtable_id); + let primary_key_codec = build_primary_key_codec(&metadata); + let bulk_schema = builder.use_bulk_insert(&metadata).then(|| { + let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding); + to_flat_sst_arrow_schema(&metadata, &opts) + }); + Self { inner: Mutex::new(inner), part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW), metadata, builder, + primary_key_codec, + bulk_schema, } } @@ -233,6 +249,21 @@ impl TimePartitions { /// /// It creates new partitions if necessary. pub fn write(&self, kvs: &KeyValues) -> Result<()> { + if let Some(bulk_schema) = &self.bulk_schema { + let mut converter = BulkPartConverter::new( + &self.metadata, + bulk_schema.clone(), + kvs.num_rows(), + self.primary_key_codec.clone(), + // Always store primary keys for bulk mode. + true, + ); + converter.append_key_values(kvs)?; + let part = converter.convert()?; + + return self.write_bulk(part); + } + // Get all parts. let parts = self.list_partitions(); @@ -413,6 +444,8 @@ impl TimePartitions { part_duration, metadata: metadata.clone(), builder: self.builder.clone(), + primary_key_codec: self.primary_key_codec.clone(), + bulk_schema: self.bulk_schema.clone(), } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index cdae1abdc6..59a6078cf8 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -112,6 +112,12 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder { )) } } + + fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool { + // Now if we can use simple bulk memtable, the input request is already + // a bulk write request and won't call this method. + false + } } /// Memtable implementation that groups rows by their primary key. @@ -828,7 +834,7 @@ impl Series { } /// `ValueBuilder` holds all the vector builders for field columns. -struct ValueBuilder { +pub(crate) struct ValueBuilder { timestamp: Vec, timestamp_type: ConcreteDataType, sequence: Vec, @@ -872,7 +878,7 @@ impl ValueBuilder { /// Returns the size of field values. /// /// In this method, we don't check the data type of the value, because it is already checked in the caller. - fn push<'a>( + pub(crate) fn push<'a>( &mut self, ts: ValueRef, sequence: u64, @@ -1103,10 +1109,10 @@ impl ValueBuilder { /// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`. #[derive(Clone)] pub struct Values { - timestamp: VectorRef, - sequence: Arc, - op_type: Arc, - fields: Vec, + pub(crate) timestamp: VectorRef, + pub(crate) sequence: Arc, + pub(crate) op_type: Arc, + pub(crate) fields: Vec, } impl Values { diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 6368e08e43..d7f7d28f5b 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -21,6 +21,7 @@ use common_base::readable_size::ReadableSize; use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, }; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadata; use store_api::storage::consts::{ OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, @@ -63,6 +64,101 @@ pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { Arc::new(Schema::new(fields)) } +/// Options of flat schema. +pub struct FlatSchemaOptions { + /// Whether to store primary key columns additionally instead of an encoded column. + pub raw_pk_columns: bool, + /// Whether to use dictionary encoding for string primary key columns + /// when storing primary key columns. + /// Only takes effect when `raw_pk_columns` is true. + pub string_pk_use_dict: bool, +} + +impl Default for FlatSchemaOptions { + fn default() -> Self { + Self { + raw_pk_columns: true, + string_pk_use_dict: true, + } + } +} + +impl FlatSchemaOptions { + /// Creates a options according to the primary key encoding. + pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self { + if encoding == PrimaryKeyEncoding::Dense { + Self::default() + } else { + Self { + raw_pk_columns: false, + string_pk_use_dict: false, + } + } + } +} + +/// Gets the arrow schema to store in parquet. +/// +/// The schema is: +/// ```text +/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type +/// ``` +/// +/// # Panics +/// Panics if the metadata is invalid. +pub fn to_flat_sst_arrow_schema( + metadata: &RegionMetadata, + options: &FlatSchemaOptions, +) -> SchemaRef { + let num_fields = if options.raw_pk_columns { + metadata.column_metadatas.len() + 3 + } else { + metadata.column_metadatas.len() + 3 - metadata.primary_key.len() + }; + let mut fields = Vec::with_capacity(num_fields); + let schema = metadata.schema.arrow_schema(); + if options.raw_pk_columns { + for pk_id in &metadata.primary_key { + let pk_index = metadata.column_index_by_id(*pk_id).unwrap(); + if options.string_pk_use_dict + && metadata.column_metadatas[pk_index] + .column_schema + .data_type + .is_string() + { + let field = &schema.fields[pk_index]; + let field = Arc::new(Field::new_dictionary( + field.name(), + datatypes::arrow::datatypes::DataType::UInt32, + field.data_type().clone(), + field.is_nullable(), + )); + fields.push(field); + } else { + fields.push(schema.fields[pk_index].clone()); + } + } + } + let remaining_fields = schema + .fields() + .iter() + .zip(&metadata.column_metadatas) + .filter_map(|(field, column_meta)| { + if column_meta.semantic_type == SemanticType::Field { + Some(field.clone()) + } else { + None + } + }) + .chain([metadata.time_index_field()]) + .chain(internal_fields()); + for field in remaining_fields { + fields.push(field); + } + + Arc::new(Schema::new(fields)) +} + /// Fields for internal columns. fn internal_fields() -> [FieldRef; 3] { // Internal columns are always not null. diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index aefe1528fb..cb55a87c80 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -27,7 +27,9 @@ use datatypes::schema::ColumnSchema; use datatypes::vectors::TimestampMillisecondVector; use mito_codec::key_values::KeyValue; use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; -use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; +use store_api::metadata::{ + ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, +}; use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; @@ -126,13 +128,17 @@ impl MemtableBuilder for EmptyMemtableBuilder { fn build(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { Arc::new(EmptyMemtable::new(id)) } + + fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool { + true + } } /// Creates a region metadata to test memtable with default pk. /// /// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`. pub(crate) fn metadata_for_test() -> RegionMetadataRef { - metadata_with_primary_key(vec![0, 1], false) + Arc::new(metadata_with_primary_key(vec![0, 1], false)) } /// Creates a region metadata to test memtable and specific primary key. @@ -142,7 +148,7 @@ pub(crate) fn metadata_for_test() -> RegionMetadataRef { pub fn metadata_with_primary_key( primary_key: Vec, enable_table_id: bool, -) -> RegionMetadataRef { +) -> RegionMetadata { let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); let maybe_table_id = if enable_table_id { "__table_id" } else { "k1" }; builder @@ -180,8 +186,7 @@ pub fn metadata_with_primary_key( column_id: 4, }) .primary_key(primary_key); - let region_metadata = builder.build().unwrap(); - Arc::new(region_metadata) + builder.build().unwrap() } fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> SemanticType { diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index a8d5e47cf0..3709148408 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -33,7 +33,7 @@ use crate::error::Result; use crate::flush::FlushScheduler; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState}; -use crate::request::{WorkerRequest, WorkerRequestWithTime}; +use crate::request::WorkerRequestWithTime; use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory;