feat: bulk memtable codec (#4163)

* feat: introduce bulk memtable encoder/decoder

* chore: rebase main

* chore: resolve some comments

* refactor: only carries time unit in ArraysSorter

* fix: some comments
This commit is contained in:
Lei, HUANG
2024-06-25 17:02:20 +08:00
committed by GitHub
parent 82f6373574
commit 120447779c
11 changed files with 943 additions and 61 deletions

View File

@@ -753,6 +753,13 @@ pub enum Error {
location: Location,
source: Arc<Error>,
},
#[snafu(display("Operation is not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -871,6 +878,7 @@ impl ErrorExt for Error {
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
UnsupportedOperation { .. } => StatusCode::Unsupported,
}
}

View File

@@ -18,6 +18,7 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
pub use bulk::part::BulkPart;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
@@ -35,6 +36,7 @@ use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
use crate::region::options::MemtableOptions;
pub mod bulk;
pub mod key_values;
pub mod partition_tree;
pub mod time_partition;
@@ -101,6 +103,9 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Writes one key value pair into the memtable.
fn write_one(&self, key_value: KeyValue) -> Result<()>;
/// Writes an encoded batch of into memtable.
fn write_bulk(&self, part: BulkPart) -> Result<()>;
/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Memtable implementation for bulk load
use std::sync::{Arc, RwLock};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRange, MemtableRef, MemtableStats,
};
#[allow(unused)]
pub(crate) mod part;
#[derive(Debug)]
pub struct BulkMemtable {
id: MemtableId,
parts: RwLock<Vec<BulkPart>>,
}
impl Memtable for BulkMemtable {
fn id(&self) -> MemtableId {
self.id
}
fn write(&self, _kvs: &KeyValues) -> Result<()> {
unimplemented!()
}
fn write_one(&self, _key_value: KeyValue) -> Result<()> {
unimplemented!()
}
fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
let mut parts = self.parts.write().unwrap();
parts.push(fragment);
Ok(())
}
fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
todo!()
}
fn ranges(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
todo!()
}
fn is_empty(&self) -> bool {
self.parts.read().unwrap().is_empty()
}
fn freeze(&self) -> Result<()> {
Ok(())
}
fn stats(&self) -> MemtableStats {
todo!()
}
fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(Self {
id,
parts: RwLock::new(vec![]),
})
}
}

View File

@@ -0,0 +1,682 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Bulk part encoder/decoder.
use std::collections::VecDeque;
use std::sync::Arc;
use api::v1::Mutation;
use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array, UInt8Builder,
};
use datatypes::arrow::compute::TakeOptions;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef, UInt16Type};
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::types::TimestampType;
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::key_values::KeyValuesRef;
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::sst::to_sst_arrow_schema;
#[derive(Debug)]
pub struct BulkPart {
data: Vec<u8>,
metadata: BulkPartMeta,
}
impl BulkPart {
pub fn new(data: Vec<u8>, metadata: BulkPartMeta) -> Self {
Self { data, metadata }
}
pub(crate) fn metadata(&self) -> &BulkPartMeta {
&self.metadata
}
}
#[derive(Debug)]
pub struct BulkPartMeta {
pub num_rows: usize,
pub max_timestamp: i64,
pub min_timestamp: i64,
}
impl Default for BulkPartMeta {
fn default() -> Self {
Self {
num_rows: 0,
max_timestamp: i64::MIN,
min_timestamp: i64::MAX,
}
}
}
pub struct BulkPartEncoder {
metadata: RegionMetadataRef,
arrow_schema: SchemaRef,
pk_encoder: McmpRowCodec,
dedup: bool,
}
impl BulkPartEncoder {
/// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`.
fn encode_mutations(&self, mutations: &[Mutation], dest: &mut BulkPart) -> Result<bool> {
let Some((arrow_record_batch, min_ts, max_ts)) =
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, false)?
else {
return Ok(false);
};
let arrow_schema = arrow_record_batch.schema();
{
let mut writer = ArrowWriter::try_new(&mut dest.data, arrow_schema, None)
.context(EncodeMemtableSnafu)?;
writer
.write(&arrow_record_batch)
.context(EncodeMemtableSnafu)?;
let _metadata = writer.finish().context(EncodeMemtableSnafu)?;
}
dest.metadata = BulkPartMeta {
num_rows: arrow_record_batch.num_rows(),
max_timestamp: max_ts,
min_timestamp: min_ts,
};
Ok(true)
}
/// Decodes [BulkPart] to [Batch]es.
fn decode_to_batches(&self, _part: &BulkPart, _dest: &mut VecDeque<Batch>) -> Result<()> {
todo!()
}
}
/// Converts mutations to record batches.
fn mutations_to_record_batch(
mutations: &[Mutation],
metadata: &RegionMetadataRef,
pk_encoder: &McmpRowCodec,
dedup: bool,
) -> Result<Option<(RecordBatch, i64, i64)>> {
let total_rows: usize = mutations
.iter()
.map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
.sum();
if total_rows == 0 {
return Ok(None);
}
let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
let mut ts_vector: Box<dyn MutableVector> = metadata
.time_index_column()
.column_schema
.data_type
.create_mutable_vector(total_rows);
let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
.field_columns()
.map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
.collect();
let mut pk_buffer = vec![];
for m in mutations {
let Some(key_values) = KeyValuesRef::new(metadata, m) else {
continue;
};
for row in key_values.iter() {
pk_buffer.clear();
pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
pk_builder.append_value(pk_buffer.as_bytes());
ts_vector.push_value_ref(row.timestamp());
sequence_builder.append_value(row.sequence());
op_type_builder.append_value(row.op_type() as u8);
for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
builder.push_value_ref(field);
}
}
}
let arrow_schema = to_sst_arrow_schema(metadata);
// safety: timestamp column must be valid, and values must not be None.
let timestamp_unit = metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap()
.unit();
let sorter = ArraysSorter {
encoded_primary_keys: pk_builder.finish(),
timestamp_unit,
timestamp: ts_vector.to_vector().to_arrow_array(),
sequence: sequence_builder.finish(),
op_type: op_type_builder.finish(),
fields: field_builders
.iter_mut()
.map(|f| f.to_vector().to_arrow_array()),
dedup,
arrow_schema,
};
sorter.sort().map(Some)
}
struct ArraysSorter<I> {
encoded_primary_keys: BinaryArray,
timestamp_unit: TimeUnit,
timestamp: ArrayRef,
sequence: UInt64Array,
op_type: UInt8Array,
fields: I,
dedup: bool,
arrow_schema: SchemaRef,
}
impl<I> ArraysSorter<I>
where
I: Iterator<Item = ArrayRef>,
{
/// Converts arrays to record batch.
fn sort(self) -> Result<(RecordBatch, i64, i64)> {
debug_assert!(!self.timestamp.is_empty());
debug_assert!(self.timestamp.len() == self.sequence.len());
debug_assert!(self.timestamp.len() == self.op_type.len());
debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
let mut to_sort = self
.encoded_primary_keys
.iter()
.zip(timestamp_iter)
.zip(self.sequence.iter())
.map(|((pk, timestamp), sequence)| {
max_timestamp = max_timestamp.max(*timestamp);
min_timestamp = min_timestamp.min(*timestamp);
(pk, timestamp, sequence)
})
.enumerate()
.collect::<Vec<_>>();
to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
l_pk.cmp(r_pk)
.then(l_ts.cmp(r_ts))
.then(l_seq.cmp(r_seq).reverse())
});
if self.dedup {
// Dedup by timestamps while ignore sequence.
to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
l_pk == r_pk && l_ts == r_ts
});
}
let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
let pk_dictionary = Arc::new(binary_array_to_dictionary(
// safety: pk must be BinaryArray
arrow::compute::take(
&self.encoded_primary_keys,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap(),
)?) as ArrayRef;
let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
for arr in self.fields {
arrays.push(
arrow::compute::take(
&arr,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
}
let timestamp = arrow::compute::take(
&self.timestamp,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?;
arrays.push(timestamp);
arrays.push(pk_dictionary);
arrays.push(
arrow::compute::take(
&self.sequence,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
arrays.push(
arrow::compute::take(
&self.op_type,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
Ok((batch, min_timestamp, max_timestamp))
}
}
/// Converts timestamp array to an iter of i64 values.
fn timestamp_array_to_iter(
timestamp_unit: TimeUnit,
timestamp: &ArrayRef,
) -> impl Iterator<Item = &i64> {
match timestamp_unit {
// safety: timestamp column must be valid.
TimeUnit::Second => timestamp
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Millisecond => timestamp
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Microsecond => timestamp
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Nanosecond => timestamp
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.values()
.iter(),
}
}
/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UInt16Type>> {
if input.is_empty() {
return Ok(DictionaryArray::new(
UInt16Array::from(Vec::<u16>::new()),
Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
));
}
let mut keys = Vec::with_capacity(16);
let mut values = BinaryBuilder::new();
let mut prev: usize = 0;
keys.push(prev as u16);
values.append_value(input.value(prev));
for current_bytes in input.iter().skip(1) {
// safety: encoded pk must present.
let current_bytes = current_bytes.unwrap();
let prev_bytes = input.value(prev);
if current_bytes != prev_bytes {
values.append_value(current_bytes);
prev += 1;
}
keys.push(prev as u16);
}
Ok(DictionaryArray::new(
UInt16Array::from(keys),
Arc::new(values.finish()) as ArrayRef,
))
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use datatypes::prelude::{ScalarVector, Value};
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
use super::*;
use crate::sst::parquet::format::ReadFormat;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
fn check_binary_array_to_dictionary(
input: &[&[u8]],
expected_keys: &[u16],
expected_values: &[&[u8]],
) {
let input = BinaryArray::from_iter_values(input.iter());
let array = binary_array_to_dictionary(&input).unwrap();
assert_eq!(
&expected_keys,
&array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
);
assert_eq!(
expected_values,
&array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>()
);
}
#[test]
fn test_binary_array_to_dictionary() {
check_binary_array_to_dictionary(&[], &[], &[]);
check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
check_binary_array_to_dictionary(
&["a".as_bytes(), "a".as_bytes()],
&[0, 0],
&["a".as_bytes()],
);
check_binary_array_to_dictionary(
&["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
&[0, 0, 1],
&["a".as_bytes(), "b".as_bytes()],
);
check_binary_array_to_dictionary(
&[
"a".as_bytes(),
"a".as_bytes(),
"b".as_bytes(),
"c".as_bytes(),
],
&[0, 0, 1, 2],
&["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
);
}
struct MutationInput<'a> {
k0: &'a str,
k1: u32,
timestamps: &'a [i64],
v0: &'a [Option<f64>],
sequence: u64,
}
#[derive(Debug, PartialOrd, PartialEq)]
struct BatchOutput<'a> {
pk_values: &'a [Value],
timestamps: &'a [i64],
v0: &'a [Option<f64>],
}
fn check_mutations_to_record_batches(
input: &[MutationInput],
expected: &[BatchOutput],
expected_timestamp: (i64, i64),
dedup: bool,
) {
let metadata = metadata_for_test();
let mutations = input
.iter()
.map(|m| {
build_key_values_with_ts_seq_values(
&metadata,
m.k0.to_string(),
m.k1,
m.timestamps.iter().copied(),
m.v0.iter().copied(),
m.sequence,
)
.mutation
})
.collect::<Vec<_>>();
let total_rows: usize = mutations
.iter()
.flat_map(|m| m.rows.iter())
.map(|r| r.rows.len())
.sum();
let pk_encoder = McmpRowCodec::new_with_primary_keys(&metadata);
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
.unwrap()
.unwrap();
let read_format = ReadFormat::new_with_all_columns(metadata.clone());
let mut batches = VecDeque::new();
read_format
.convert_record_batch(&batch, &mut batches)
.unwrap();
if !dedup {
assert_eq!(
total_rows,
batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
);
}
let batch_values = batches
.into_iter()
.map(|b| {
let pk_values = pk_encoder.decode(b.primary_key()).unwrap();
let timestamps = b
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let float_values = b.fields()[1]
.data
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>();
(pk_values, timestamps, float_values)
})
.collect::<Vec<_>>();
assert_eq!(expected.len(), batch_values.len());
for idx in 0..expected.len() {
assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
assert_eq!(expected[idx].v0, &batch_values[idx].2);
}
}
#[test]
fn test_mutations_to_record_batch() {
check_mutations_to_record_batches(
&[MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
sequence: 0,
}],
&[BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.1)],
}],
(0, 0),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v0: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[1],
v0: &[Some(0.2)],
sequence: 1,
},
MutationInput {
k0: "a",
k1: 1,
timestamps: &[1],
v0: &[Some(0.3)],
sequence: 2,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 1],
v0: &[Some(0.1), Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(1)],
timestamps: &[1],
v0: &[Some(0.3)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.0)],
},
],
(0, 1),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v0: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.2)],
sequence: 1,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.0)],
},
],
(0, 0),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v0: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.2)],
sequence: 1,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 0],
v0: &[Some(0.2), Some(0.1)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.0)],
},
],
(0, 0),
false,
);
}
}

View File

@@ -26,7 +26,7 @@ pub struct KeyValues {
///
/// This mutation must be a valid mutation and rows in the mutation
/// must not be `None`.
mutation: Mutation,
pub(crate) mutation: Mutation,
/// Key value read helper.
helper: SparseReadRowHelper,
}
@@ -65,6 +65,52 @@ impl KeyValues {
}
}
/// Key value view of a mutation.
#[derive(Debug)]
pub struct KeyValuesRef<'a> {
/// Mutation to read.
///
/// This mutation must be a valid mutation and rows in the mutation
/// must not be `None`.
mutation: &'a Mutation,
/// Key value read helper.
helper: SparseReadRowHelper,
}
impl<'a> KeyValuesRef<'a> {
/// Creates [crate::memtable::KeyValues] from specific `mutation`.
///
/// Returns `None` if `rows` of the `mutation` is `None`.
pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
let rows = mutation.rows.as_ref()?;
let helper = SparseReadRowHelper::new(metadata, rows);
Some(KeyValuesRef { mutation, helper })
}
/// Returns a key value iterator.
pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
let rows = self.mutation.rows.as_ref().unwrap();
let schema = &rows.schema;
rows.rows.iter().enumerate().map(|(idx, row)| {
KeyValue {
row,
schema,
helper: &self.helper,
sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row.
// Safety: This is a valid mutation.
op_type: OpType::try_from(self.mutation.op_type).unwrap(),
}
})
}
/// Returns number of rows.
pub fn num_rows(&self) -> usize {
// Safety: rows is not None.
self.mutation.rows.as_ref().unwrap().rows.len()
}
}
/// Key value view of a row.
///
/// A key value view divides primary key columns and field (value) columns.

View File

@@ -34,13 +34,13 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};
@@ -148,6 +148,13 @@ impl Memtable for PartitionTreeMemtable {
res
}
fn write_bulk(&self, _part: BulkPart) -> Result<()> {
UnsupportedOperationSnafu {
err_msg: "PartitionTreeMemtable does not support write_bulk",
}
.fail()
}
fn iter(
&self,
projection: Option<&[ColumnId]>,

View File

@@ -36,11 +36,14 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::error::{
ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result,
UnsupportedOperationSnafu,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
@@ -224,6 +227,13 @@ impl Memtable for TimeSeriesMemtable {
res
}
fn write_bulk(&self, _part: BulkPart) -> Result<()> {
UnsupportedOperationSnafu {
err_msg: "TimeSeriesMemtable does not support write_bulk",
}
.fail()
}
fn iter(
&self,
projection: Option<&[ColumnId]>,

View File

@@ -24,6 +24,7 @@ use memcomparable::{Deserializer, Serializer};
use paste::paste;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::RegionMetadata;
use crate::error;
use crate::error::{FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu};
@@ -279,6 +280,16 @@ pub struct McmpRowCodec {
}
impl McmpRowCodec {
/// Creates [McmpRowCodec] instance with all primary keys in given `metadata`.
pub fn new_with_primary_keys(metadata: &RegionMetadata) -> Self {
Self::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
)
}
pub fn new(fields: Vec<SortField>) -> Self {
Self { fields }
}

View File

@@ -14,7 +14,17 @@
//! Sorted strings tables.
use std::sync::Arc;
use api::v1::SemanticType;
use common_base::readable_size::ReadableSize;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
};
use store_api::metadata::RegionMetadata;
use store_api::storage::consts::{
OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
pub mod file;
pub mod file_purger;
@@ -28,3 +38,46 @@ pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
/// Default number of concurrent write, it only works on object store backend(e.g., S3).
pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
/// Gets the arrow schema to store in parquet.
pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
let fields = Fields::from_iter(
metadata
.schema
.arrow_schema()
.fields()
.iter()
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
Some(field.clone())
} else {
// We have fixed positions for tags (primary key) and time index.
None
}
})
.chain([metadata.time_index_field()])
.chain(internal_fields()),
);
Arc::new(Schema::new(fields))
}
/// Fields for internal columns.
fn internal_fields() -> [FieldRef; 3] {
// Internal columns are always not null.
[
Arc::new(Field::new_dictionary(
PRIMARY_KEY_COLUMN_NAME,
ArrowDataType::UInt16,
ArrowDataType::Binary,
false,
)),
Arc::new(Field::new(
SEQUENCE_COLUMN_NAME,
ArrowDataType::UInt64,
false,
)),
Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
]
}

View File

@@ -33,19 +33,14 @@ use std::sync::Arc;
use api::v1::SemanticType;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array};
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type,
};
use datatypes::arrow::datatypes::{SchemaRef, UInt16Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::statistics::Statistics;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::{
OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{
@@ -53,6 +48,7 @@ use crate::error::{
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::to_sst_arrow_schema;
/// Number of columns that have fixed positions.
///
@@ -114,7 +110,7 @@ impl WriteFormat {
}
/// Helper for reading the SST format.
pub(crate) struct ReadFormat {
pub struct ReadFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
@@ -130,7 +126,7 @@ pub(crate) struct ReadFormat {
impl ReadFormat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
pub(crate) fn new(
pub fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> ReadFormat {
@@ -201,7 +197,7 @@ impl ReadFormat {
/// Convert a arrow record batch into `batches`.
///
/// Note that the `record_batch` may only contains a subset of columns if it is projected.
pub(crate) fn convert_record_batch(
pub fn convert_record_batch(
&self,
record_batch: &RecordBatch,
batches: &mut VecDeque<Batch>,
@@ -282,7 +278,7 @@ impl ReadFormat {
}
/// Returns min values of specific column in row groups.
pub(crate) fn min_values(
pub fn min_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
@@ -302,7 +298,7 @@ impl ReadFormat {
}
/// Returns max values of specific column in row groups.
pub(crate) fn max_values(
pub fn max_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
@@ -322,7 +318,7 @@ impl ReadFormat {
}
/// Returns null counts of specific column in row groups.
pub(crate) fn null_counts(
pub fn null_counts(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
@@ -516,28 +512,15 @@ impl ReadFormat {
}
}
/// Gets the arrow schema to store in parquet.
fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
let fields = Fields::from_iter(
metadata
.schema
.arrow_schema()
.fields()
.iter()
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
Some(field.clone())
} else {
// We have fixed positions for tags (primary key) and time index.
None
}
})
.chain([metadata.time_index_field()])
.chain(internal_fields()),
);
Arc::new(Schema::new(fields))
#[cfg(test)]
impl ReadFormat {
/// Creates a helper with existing `metadata` and all columns.
pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat {
Self::new(
Arc::clone(&metadata),
metadata.column_metadatas.iter().map(|c| c.column_id),
)
}
}
/// Compute offsets of different primary keys in the array.
@@ -563,25 +546,6 @@ fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Ve
Ok(offsets)
}
/// Fields for internal columns.
fn internal_fields() -> [FieldRef; 3] {
// Internal columns are always not null.
[
Arc::new(Field::new_dictionary(
PRIMARY_KEY_COLUMN_NAME,
ArrowDataType::UInt16,
ArrowDataType::Binary,
false,
)),
Arc::new(Field::new(
SEQUENCE_COLUMN_NAME,
ArrowDataType::UInt64,
false,
)),
Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
]
}
/// Creates a new array for specific `primary_key`.
fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
@@ -595,7 +559,7 @@ fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
mod tests {
use api::v1::OpType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::datatypes::TimeUnit;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};

View File

@@ -33,7 +33,7 @@ use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId,
BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -76,6 +76,10 @@ impl Memtable for EmptyMemtable {
Ok(())
}
fn write_bulk(&self, _part: BulkPart) -> Result<()> {
Ok(())
}
fn iter(
&self,
_projection: Option<&[ColumnId]>,