From 120447779c7d5c531faf36308ea94c8db5520e2e Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 25 Jun 2024 17:02:20 +0800 Subject: [PATCH] feat: bulk memtable codec (#4163) * feat: introduce bulk memtable encoder/decoder * chore: rebase main * chore: resolve some comments * refactor: only carries time unit in ArraysSorter * fix: some comments --- src/mito2/src/error.rs | 8 + src/mito2/src/memtable.rs | 5 + src/mito2/src/memtable/bulk.rs | 92 +++ src/mito2/src/memtable/bulk/part.rs | 682 +++++++++++++++++++++++ src/mito2/src/memtable/key_values.rs | 48 +- src/mito2/src/memtable/partition_tree.rs | 11 +- src/mito2/src/memtable/time_series.rs | 14 +- src/mito2/src/row_converter.rs | 11 + src/mito2/src/sst.rs | 53 ++ src/mito2/src/sst/parquet/format.rs | 74 +-- src/mito2/src/test_util/memtable_util.rs | 6 +- 11 files changed, 943 insertions(+), 61 deletions(-) create mode 100644 src/mito2/src/memtable/bulk.rs create mode 100644 src/mito2/src/memtable/bulk/part.rs diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 7b31f7cfc0..2f6e3ace34 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -753,6 +753,13 @@ pub enum Error { location: Location, source: Arc, }, + + #[snafu(display("Operation is not supported: {}", err_msg))] + UnsupportedOperation { + err_msg: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -871,6 +878,7 @@ impl ErrorExt for Error { RegionStopped { .. } => StatusCode::RegionNotReady, TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments, BuildTimeRangeFilter { .. } => StatusCode::Unexpected, + UnsupportedOperation { .. } => StatusCode::Unsupported, } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 20ab1db69c..b807197f09 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -18,6 +18,7 @@ use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; +pub use bulk::part::BulkPart; use common_time::Timestamp; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; @@ -35,6 +36,7 @@ use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; use crate::region::options::MemtableOptions; +pub mod bulk; pub mod key_values; pub mod partition_tree; pub mod time_partition; @@ -101,6 +103,9 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Writes one key value pair into the memtable. fn write_one(&self, key_value: KeyValue) -> Result<()>; + /// Writes an encoded batch of into memtable. + fn write_bulk(&self, part: BulkPart) -> Result<()>; + /// Scans the memtable. /// `projection` selects columns to read, `None` means reading all columns. /// `filters` are the predicates to be pushed down to memtable. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs new file mode 100644 index 0000000000..3ed2ed1347 --- /dev/null +++ b/src/mito2/src/memtable/bulk.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Memtable implementation for bulk load + +use std::sync::{Arc, RwLock}; + +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use table::predicate::Predicate; + +use crate::error::Result; +use crate::memtable::bulk::part::BulkPart; +use crate::memtable::key_values::KeyValue; +use crate::memtable::{ + BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRange, MemtableRef, MemtableStats, +}; + +#[allow(unused)] +pub(crate) mod part; + +#[derive(Debug)] +pub struct BulkMemtable { + id: MemtableId, + parts: RwLock>, +} + +impl Memtable for BulkMemtable { + fn id(&self) -> MemtableId { + self.id + } + + fn write(&self, _kvs: &KeyValues) -> Result<()> { + unimplemented!() + } + + fn write_one(&self, _key_value: KeyValue) -> Result<()> { + unimplemented!() + } + + fn write_bulk(&self, fragment: BulkPart) -> Result<()> { + let mut parts = self.parts.write().unwrap(); + parts.push(fragment); + Ok(()) + } + + fn iter( + &self, + _projection: Option<&[ColumnId]>, + _predicate: Option, + ) -> Result { + todo!() + } + + fn ranges( + &self, + _projection: Option<&[ColumnId]>, + _predicate: Option, + ) -> Vec { + todo!() + } + + fn is_empty(&self) -> bool { + self.parts.read().unwrap().is_empty() + } + + fn freeze(&self) -> Result<()> { + Ok(()) + } + + fn stats(&self) -> MemtableStats { + todo!() + } + + fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(Self { + id, + parts: RwLock::new(vec![]), + }) + } +} diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs new file mode 100644 index 0000000000..2f15968c6f --- /dev/null +++ b/src/mito2/src/memtable/bulk/part.rs @@ -0,0 +1,682 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Bulk part encoder/decoder. +use std::collections::VecDeque; +use std::sync::Arc; + +use api::v1::Mutation; +use common_time::timestamp::TimeUnit; +use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder}; +use datatypes::arrow; +use datatypes::arrow::array::{ + Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, + UInt8Array, UInt8Builder, +}; +use datatypes::arrow::compute::TakeOptions; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef, UInt16Type}; +use datatypes::arrow_array::BinaryArray; +use datatypes::data_type::DataType; +use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; +use datatypes::types::TimestampType; +use parquet::arrow::ArrowWriter; +use parquet::data_type::AsBytes; +use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; + +use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result}; +use crate::memtable::key_values::KeyValuesRef; +use crate::read::Batch; +use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::sst::to_sst_arrow_schema; + +#[derive(Debug)] +pub struct BulkPart { + data: Vec, + metadata: BulkPartMeta, +} + +impl BulkPart { + pub fn new(data: Vec, metadata: BulkPartMeta) -> Self { + Self { data, metadata } + } + + pub(crate) fn metadata(&self) -> &BulkPartMeta { + &self.metadata + } +} + +#[derive(Debug)] +pub struct BulkPartMeta { + pub num_rows: usize, + pub max_timestamp: i64, + pub min_timestamp: i64, +} + +impl Default for BulkPartMeta { + fn default() -> Self { + Self { + num_rows: 0, + max_timestamp: i64::MIN, + min_timestamp: i64::MAX, + } + } +} + +pub struct BulkPartEncoder { + metadata: RegionMetadataRef, + arrow_schema: SchemaRef, + pk_encoder: McmpRowCodec, + dedup: bool, +} + +impl BulkPartEncoder { + /// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`. + fn encode_mutations(&self, mutations: &[Mutation], dest: &mut BulkPart) -> Result { + let Some((arrow_record_batch, min_ts, max_ts)) = + mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, false)? + else { + return Ok(false); + }; + + let arrow_schema = arrow_record_batch.schema(); + { + let mut writer = ArrowWriter::try_new(&mut dest.data, arrow_schema, None) + .context(EncodeMemtableSnafu)?; + writer + .write(&arrow_record_batch) + .context(EncodeMemtableSnafu)?; + let _metadata = writer.finish().context(EncodeMemtableSnafu)?; + } + + dest.metadata = BulkPartMeta { + num_rows: arrow_record_batch.num_rows(), + max_timestamp: max_ts, + min_timestamp: min_ts, + }; + Ok(true) + } + + /// Decodes [BulkPart] to [Batch]es. + fn decode_to_batches(&self, _part: &BulkPart, _dest: &mut VecDeque) -> Result<()> { + todo!() + } +} + +/// Converts mutations to record batches. +fn mutations_to_record_batch( + mutations: &[Mutation], + metadata: &RegionMetadataRef, + pk_encoder: &McmpRowCodec, + dedup: bool, +) -> Result> { + let total_rows: usize = mutations + .iter() + .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0)) + .sum(); + + if total_rows == 0 { + return Ok(None); + } + + let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0); + + let mut ts_vector: Box = metadata + .time_index_column() + .column_schema + .data_type + .create_mutable_vector(total_rows); + let mut sequence_builder = UInt64Builder::with_capacity(total_rows); + let mut op_type_builder = UInt8Builder::with_capacity(total_rows); + + let mut field_builders: Vec> = metadata + .field_columns() + .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows)) + .collect(); + + let mut pk_buffer = vec![]; + for m in mutations { + let Some(key_values) = KeyValuesRef::new(metadata, m) else { + continue; + }; + + for row in key_values.iter() { + pk_buffer.clear(); + pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?; + pk_builder.append_value(pk_buffer.as_bytes()); + ts_vector.push_value_ref(row.timestamp()); + sequence_builder.append_value(row.sequence()); + op_type_builder.append_value(row.op_type() as u8); + for (builder, field) in field_builders.iter_mut().zip(row.fields()) { + builder.push_value_ref(field); + } + } + } + + let arrow_schema = to_sst_arrow_schema(metadata); + // safety: timestamp column must be valid, and values must not be None. + let timestamp_unit = metadata + .time_index_column() + .column_schema + .data_type + .as_timestamp() + .unwrap() + .unit(); + let sorter = ArraysSorter { + encoded_primary_keys: pk_builder.finish(), + timestamp_unit, + timestamp: ts_vector.to_vector().to_arrow_array(), + sequence: sequence_builder.finish(), + op_type: op_type_builder.finish(), + fields: field_builders + .iter_mut() + .map(|f| f.to_vector().to_arrow_array()), + dedup, + arrow_schema, + }; + + sorter.sort().map(Some) +} + +struct ArraysSorter { + encoded_primary_keys: BinaryArray, + timestamp_unit: TimeUnit, + timestamp: ArrayRef, + sequence: UInt64Array, + op_type: UInt8Array, + fields: I, + dedup: bool, + arrow_schema: SchemaRef, +} + +impl ArraysSorter +where + I: Iterator, +{ + /// Converts arrays to record batch. + fn sort(self) -> Result<(RecordBatch, i64, i64)> { + debug_assert!(!self.timestamp.is_empty()); + debug_assert!(self.timestamp.len() == self.sequence.len()); + debug_assert!(self.timestamp.len() == self.op_type.len()); + debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len()); + + let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp); + let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN); + let mut to_sort = self + .encoded_primary_keys + .iter() + .zip(timestamp_iter) + .zip(self.sequence.iter()) + .map(|((pk, timestamp), sequence)| { + max_timestamp = max_timestamp.max(*timestamp); + min_timestamp = min_timestamp.min(*timestamp); + (pk, timestamp, sequence) + }) + .enumerate() + .collect::>(); + + to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| { + l_pk.cmp(r_pk) + .then(l_ts.cmp(r_ts)) + .then(l_seq.cmp(r_seq).reverse()) + }); + + if self.dedup { + // Dedup by timestamps while ignore sequence. + to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| { + l_pk == r_pk && l_ts == r_ts + }); + } + + let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32)); + + let pk_dictionary = Arc::new(binary_array_to_dictionary( + // safety: pk must be BinaryArray + arrow::compute::take( + &self.encoded_primary_keys, + &indices, + Some(TakeOptions { + check_bounds: false, + }), + ) + .context(ComputeArrowSnafu)? + .as_any() + .downcast_ref::() + .unwrap(), + )?) as ArrayRef; + + let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len()); + for arr in self.fields { + arrays.push( + arrow::compute::take( + &arr, + &indices, + Some(TakeOptions { + check_bounds: false, + }), + ) + .context(ComputeArrowSnafu)?, + ); + } + + let timestamp = arrow::compute::take( + &self.timestamp, + &indices, + Some(TakeOptions { + check_bounds: false, + }), + ) + .context(ComputeArrowSnafu)?; + + arrays.push(timestamp); + arrays.push(pk_dictionary); + arrays.push( + arrow::compute::take( + &self.sequence, + &indices, + Some(TakeOptions { + check_bounds: false, + }), + ) + .context(ComputeArrowSnafu)?, + ); + + arrays.push( + arrow::compute::take( + &self.op_type, + &indices, + Some(TakeOptions { + check_bounds: false, + }), + ) + .context(ComputeArrowSnafu)?, + ); + + let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?; + Ok((batch, min_timestamp, max_timestamp)) + } +} + +/// Converts timestamp array to an iter of i64 values. +fn timestamp_array_to_iter( + timestamp_unit: TimeUnit, + timestamp: &ArrayRef, +) -> impl Iterator { + match timestamp_unit { + // safety: timestamp column must be valid. + TimeUnit::Second => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + TimeUnit::Millisecond => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + TimeUnit::Microsecond => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + TimeUnit::Nanosecond => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + } +} + +/// Converts a **sorted** [BinaryArray] to [DictionaryArray]. +fn binary_array_to_dictionary(input: &BinaryArray) -> Result> { + if input.is_empty() { + return Ok(DictionaryArray::new( + UInt16Array::from(Vec::::new()), + Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef, + )); + } + let mut keys = Vec::with_capacity(16); + let mut values = BinaryBuilder::new(); + let mut prev: usize = 0; + keys.push(prev as u16); + values.append_value(input.value(prev)); + + for current_bytes in input.iter().skip(1) { + // safety: encoded pk must present. + let current_bytes = current_bytes.unwrap(); + let prev_bytes = input.value(prev); + if current_bytes != prev_bytes { + values.append_value(current_bytes); + prev += 1; + } + keys.push(prev as u16); + } + + Ok(DictionaryArray::new( + UInt16Array::from(keys), + Arc::new(values.finish()) as ArrayRef, + )) +} + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + + use datatypes::prelude::{ScalarVector, Value}; + use datatypes::vectors::{Float64Vector, TimestampMillisecondVector}; + + use super::*; + use crate::sst::parquet::format::ReadFormat; + use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; + + fn check_binary_array_to_dictionary( + input: &[&[u8]], + expected_keys: &[u16], + expected_values: &[&[u8]], + ) { + let input = BinaryArray::from_iter_values(input.iter()); + let array = binary_array_to_dictionary(&input).unwrap(); + assert_eq!( + &expected_keys, + &array.keys().iter().map(|v| v.unwrap()).collect::>() + ); + assert_eq!( + expected_values, + &array + .values() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + ); + } + + #[test] + fn test_binary_array_to_dictionary() { + check_binary_array_to_dictionary(&[], &[], &[]); + + check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]); + + check_binary_array_to_dictionary( + &["a".as_bytes(), "a".as_bytes()], + &[0, 0], + &["a".as_bytes()], + ); + + check_binary_array_to_dictionary( + &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()], + &[0, 0, 1], + &["a".as_bytes(), "b".as_bytes()], + ); + + check_binary_array_to_dictionary( + &[ + "a".as_bytes(), + "a".as_bytes(), + "b".as_bytes(), + "c".as_bytes(), + ], + &[0, 0, 1, 2], + &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()], + ); + } + + struct MutationInput<'a> { + k0: &'a str, + k1: u32, + timestamps: &'a [i64], + v0: &'a [Option], + sequence: u64, + } + + #[derive(Debug, PartialOrd, PartialEq)] + struct BatchOutput<'a> { + pk_values: &'a [Value], + timestamps: &'a [i64], + v0: &'a [Option], + } + + fn check_mutations_to_record_batches( + input: &[MutationInput], + expected: &[BatchOutput], + expected_timestamp: (i64, i64), + dedup: bool, + ) { + let metadata = metadata_for_test(); + let mutations = input + .iter() + .map(|m| { + build_key_values_with_ts_seq_values( + &metadata, + m.k0.to_string(), + m.k1, + m.timestamps.iter().copied(), + m.v0.iter().copied(), + m.sequence, + ) + .mutation + }) + .collect::>(); + let total_rows: usize = mutations + .iter() + .flat_map(|m| m.rows.iter()) + .map(|r| r.rows.len()) + .sum(); + + let pk_encoder = McmpRowCodec::new_with_primary_keys(&metadata); + + let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup) + .unwrap() + .unwrap(); + let read_format = ReadFormat::new_with_all_columns(metadata.clone()); + let mut batches = VecDeque::new(); + read_format + .convert_record_batch(&batch, &mut batches) + .unwrap(); + if !dedup { + assert_eq!( + total_rows, + batches.iter().map(|b| { b.num_rows() }).sum::() + ); + } + let batch_values = batches + .into_iter() + .map(|b| { + let pk_values = pk_encoder.decode(b.primary_key()).unwrap(); + let timestamps = b + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|v| v.unwrap().0.value()) + .collect::>(); + let float_values = b.fields()[1] + .data + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>(); + + (pk_values, timestamps, float_values) + }) + .collect::>(); + assert_eq!(expected.len(), batch_values.len()); + + for idx in 0..expected.len() { + assert_eq!(expected[idx].pk_values, &batch_values[idx].0); + assert_eq!(expected[idx].timestamps, &batch_values[idx].1); + assert_eq!(expected[idx].v0, &batch_values[idx].2); + } + } + + #[test] + fn test_mutations_to_record_batch() { + check_mutations_to_record_batches( + &[MutationInput { + k0: "a", + k1: 0, + timestamps: &[0], + v0: &[Some(0.1)], + sequence: 0, + }], + &[BatchOutput { + pk_values: &[Value::String("a".into()), Value::UInt32(0)], + timestamps: &[0], + v0: &[Some(0.1)], + }], + (0, 0), + true, + ); + + check_mutations_to_record_batches( + &[ + MutationInput { + k0: "a", + k1: 0, + timestamps: &[0], + v0: &[Some(0.1)], + sequence: 0, + }, + MutationInput { + k0: "b", + k1: 0, + timestamps: &[0], + v0: &[Some(0.0)], + sequence: 0, + }, + MutationInput { + k0: "a", + k1: 0, + timestamps: &[1], + v0: &[Some(0.2)], + sequence: 1, + }, + MutationInput { + k0: "a", + k1: 1, + timestamps: &[1], + v0: &[Some(0.3)], + sequence: 2, + }, + ], + &[ + BatchOutput { + pk_values: &[Value::String("a".into()), Value::UInt32(0)], + timestamps: &[0, 1], + v0: &[Some(0.1), Some(0.2)], + }, + BatchOutput { + pk_values: &[Value::String("a".into()), Value::UInt32(1)], + timestamps: &[1], + v0: &[Some(0.3)], + }, + BatchOutput { + pk_values: &[Value::String("b".into()), Value::UInt32(0)], + timestamps: &[0], + v0: &[Some(0.0)], + }, + ], + (0, 1), + true, + ); + + check_mutations_to_record_batches( + &[ + MutationInput { + k0: "a", + k1: 0, + timestamps: &[0], + v0: &[Some(0.1)], + sequence: 0, + }, + MutationInput { + k0: "b", + k1: 0, + timestamps: &[0], + v0: &[Some(0.0)], + sequence: 0, + }, + MutationInput { + k0: "a", + k1: 0, + timestamps: &[0], + v0: &[Some(0.2)], + sequence: 1, + }, + ], + &[ + BatchOutput { + pk_values: &[Value::String("a".into()), Value::UInt32(0)], + timestamps: &[0], + v0: &[Some(0.2)], + }, + BatchOutput { + pk_values: &[Value::String("b".into()), Value::UInt32(0)], + timestamps: &[0], + v0: &[Some(0.0)], + }, + ], + (0, 0), + true, + ); + check_mutations_to_record_batches( + &[ + MutationInput { + k0: "a", + k1: 0, + timestamps: &[0], + v0: &[Some(0.1)], + sequence: 0, + }, + MutationInput { + k0: "b", + k1: 0, + timestamps: &[0], + v0: &[Some(0.0)], + sequence: 0, + }, + MutationInput { + k0: "a", + k1: 0, + timestamps: &[0], + v0: &[Some(0.2)], + sequence: 1, + }, + ], + &[ + BatchOutput { + pk_values: &[Value::String("a".into()), Value::UInt32(0)], + timestamps: &[0, 0], + v0: &[Some(0.2), Some(0.1)], + }, + BatchOutput { + pk_values: &[Value::String("b".into()), Value::UInt32(0)], + timestamps: &[0], + v0: &[Some(0.0)], + }, + ], + (0, 0), + false, + ); + } +} diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index f1734e5a36..59812b4619 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -26,7 +26,7 @@ pub struct KeyValues { /// /// This mutation must be a valid mutation and rows in the mutation /// must not be `None`. - mutation: Mutation, + pub(crate) mutation: Mutation, /// Key value read helper. helper: SparseReadRowHelper, } @@ -65,6 +65,52 @@ impl KeyValues { } } +/// Key value view of a mutation. +#[derive(Debug)] +pub struct KeyValuesRef<'a> { + /// Mutation to read. + /// + /// This mutation must be a valid mutation and rows in the mutation + /// must not be `None`. + mutation: &'a Mutation, + /// Key value read helper. + helper: SparseReadRowHelper, +} + +impl<'a> KeyValuesRef<'a> { + /// Creates [crate::memtable::KeyValues] from specific `mutation`. + /// + /// Returns `None` if `rows` of the `mutation` is `None`. + pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option> { + let rows = mutation.rows.as_ref()?; + let helper = SparseReadRowHelper::new(metadata, rows); + + Some(KeyValuesRef { mutation, helper }) + } + + /// Returns a key value iterator. + pub fn iter(&self) -> impl Iterator { + let rows = self.mutation.rows.as_ref().unwrap(); + let schema = &rows.schema; + rows.rows.iter().enumerate().map(|(idx, row)| { + KeyValue { + row, + schema, + helper: &self.helper, + sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row. + // Safety: This is a valid mutation. + op_type: OpType::try_from(self.mutation.op_type).unwrap(), + } + }) + } + + /// Returns number of rows. + pub fn num_rows(&self) -> usize { + // Safety: rows is not None. + self.mutation.rows.as_ref().unwrap().rows.len() + } +} + /// Key value view of a row. /// /// A key value view divides primary key columns and field (value) columns. diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 541a34f701..af3b1e3437 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -34,13 +34,13 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::Result; +use crate::error::{Result, UnsupportedOperationSnafu}; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::metrics::WriteMetrics; use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, + AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; @@ -148,6 +148,13 @@ impl Memtable for PartitionTreeMemtable { res } + fn write_bulk(&self, _part: BulkPart) -> Result<()> { + UnsupportedOperationSnafu { + err_msg: "PartitionTreeMemtable does not support write_bulk", + } + .fail() + } + fn iter( &self, projection: Option<&[ColumnId]>, diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 52ec7f60cb..ac3965f6f1 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -36,11 +36,14 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; +use crate::error::{ + ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result, + UnsupportedOperationSnafu, +}; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, + AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; @@ -224,6 +227,13 @@ impl Memtable for TimeSeriesMemtable { res } + fn write_bulk(&self, _part: BulkPart) -> Result<()> { + UnsupportedOperationSnafu { + err_msg: "TimeSeriesMemtable does not support write_bulk", + } + .fail() + } + fn iter( &self, projection: Option<&[ColumnId]>, diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 16c7ea8771..ae0ab1177d 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -24,6 +24,7 @@ use memcomparable::{Deserializer, Serializer}; use paste::paste; use serde::{Deserialize, Serialize}; use snafu::ResultExt; +use store_api::metadata::RegionMetadata; use crate::error; use crate::error::{FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu}; @@ -279,6 +280,16 @@ pub struct McmpRowCodec { } impl McmpRowCodec { + /// Creates [McmpRowCodec] instance with all primary keys in given `metadata`. + pub fn new_with_primary_keys(metadata: &RegionMetadata) -> Self { + Self::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ) + } + pub fn new(fields: Vec) -> Self { Self { fields } } diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index ad750c7709..8645470fa9 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -14,7 +14,17 @@ //! Sorted strings tables. +use std::sync::Arc; + +use api::v1::SemanticType; use common_base::readable_size::ReadableSize; +use datatypes::arrow::datatypes::{ + DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, +}; +use store_api::metadata::RegionMetadata; +use store_api::storage::consts::{ + OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, +}; pub mod file; pub mod file_purger; @@ -28,3 +38,46 @@ pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); /// Default number of concurrent write, it only works on object store backend(e.g., S3). pub const DEFAULT_WRITE_CONCURRENCY: usize = 8; + +/// Gets the arrow schema to store in parquet. +pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { + let fields = Fields::from_iter( + metadata + .schema + .arrow_schema() + .fields() + .iter() + .zip(&metadata.column_metadatas) + .filter_map(|(field, column_meta)| { + if column_meta.semantic_type == SemanticType::Field { + Some(field.clone()) + } else { + // We have fixed positions for tags (primary key) and time index. + None + } + }) + .chain([metadata.time_index_field()]) + .chain(internal_fields()), + ); + + Arc::new(Schema::new(fields)) +} + +/// Fields for internal columns. +fn internal_fields() -> [FieldRef; 3] { + // Internal columns are always not null. + [ + Arc::new(Field::new_dictionary( + PRIMARY_KEY_COLUMN_NAME, + ArrowDataType::UInt16, + ArrowDataType::Binary, + false, + )), + Arc::new(Field::new( + SEQUENCE_COLUMN_NAME, + ArrowDataType::UInt64, + false, + )), + Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)), + ] +} diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index aecabd971b..2c64800641 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -33,19 +33,14 @@ use std::sync::Arc; use api::v1::SemanticType; use datafusion_common::ScalarValue; use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array}; -use datatypes::arrow::datatypes::{ - DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type, -}; +use datatypes::arrow::datatypes::{SchemaRef, UInt16Type}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; use datatypes::vectors::{Helper, Vector}; use parquet::file::metadata::RowGroupMetaData; use parquet::file::statistics::Statistics; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; -use store_api::storage::consts::{ - OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, -}; +use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::{ @@ -53,6 +48,7 @@ use crate::error::{ }; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::sst::to_sst_arrow_schema; /// Number of columns that have fixed positions. /// @@ -114,7 +110,7 @@ impl WriteFormat { } /// Helper for reading the SST format. -pub(crate) struct ReadFormat { +pub struct ReadFormat { metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, @@ -130,7 +126,7 @@ pub(crate) struct ReadFormat { impl ReadFormat { /// Creates a helper with existing `metadata` and `column_ids` to read. - pub(crate) fn new( + pub fn new( metadata: RegionMetadataRef, column_ids: impl Iterator, ) -> ReadFormat { @@ -201,7 +197,7 @@ impl ReadFormat { /// Convert a arrow record batch into `batches`. /// /// Note that the `record_batch` may only contains a subset of columns if it is projected. - pub(crate) fn convert_record_batch( + pub fn convert_record_batch( &self, record_batch: &RecordBatch, batches: &mut VecDeque, @@ -282,7 +278,7 @@ impl ReadFormat { } /// Returns min values of specific column in row groups. - pub(crate) fn min_values( + pub fn min_values( &self, row_groups: &[impl Borrow], column_id: ColumnId, @@ -302,7 +298,7 @@ impl ReadFormat { } /// Returns max values of specific column in row groups. - pub(crate) fn max_values( + pub fn max_values( &self, row_groups: &[impl Borrow], column_id: ColumnId, @@ -322,7 +318,7 @@ impl ReadFormat { } /// Returns null counts of specific column in row groups. - pub(crate) fn null_counts( + pub fn null_counts( &self, row_groups: &[impl Borrow], column_id: ColumnId, @@ -516,28 +512,15 @@ impl ReadFormat { } } -/// Gets the arrow schema to store in parquet. -fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { - let fields = Fields::from_iter( - metadata - .schema - .arrow_schema() - .fields() - .iter() - .zip(&metadata.column_metadatas) - .filter_map(|(field, column_meta)| { - if column_meta.semantic_type == SemanticType::Field { - Some(field.clone()) - } else { - // We have fixed positions for tags (primary key) and time index. - None - } - }) - .chain([metadata.time_index_field()]) - .chain(internal_fields()), - ); - - Arc::new(Schema::new(fields)) +#[cfg(test)] +impl ReadFormat { + /// Creates a helper with existing `metadata` and all columns. + pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat { + Self::new( + Arc::clone(&metadata), + metadata.column_metadatas.iter().map(|c| c.column_id), + ) + } } /// Compute offsets of different primary keys in the array. @@ -563,25 +546,6 @@ fn primary_key_offsets(pk_dict_array: &DictionaryArray) -> Result [FieldRef; 3] { - // Internal columns are always not null. - [ - Arc::new(Field::new_dictionary( - PRIMARY_KEY_COLUMN_NAME, - ArrowDataType::UInt16, - ArrowDataType::Binary, - false, - )), - Arc::new(Field::new( - SEQUENCE_COLUMN_NAME, - ArrowDataType::UInt64, - false, - )), - Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)), - ] -} - /// Creates a new array for specific `primary_key`. fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef { let values = Arc::new(BinaryArray::from_iter_values([primary_key])); @@ -595,7 +559,7 @@ fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef { mod tests { use api::v1::OpType; use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; - use datatypes::arrow::datatypes::TimeUnit; + use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 69a9398975..235e9694c7 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -33,7 +33,7 @@ use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ - BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, + BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -76,6 +76,10 @@ impl Memtable for EmptyMemtable { Ok(()) } + fn write_bulk(&self, _part: BulkPart) -> Result<()> { + Ok(()) + } + fn iter( &self, _projection: Option<&[ColumnId]>,