Compare commits

...

2 Commits

Author SHA1 Message Date
Lei, HUANG
2ed98ff558 fix: some cr comments 2024-02-20 14:10:57 +08:00
Lei, HUANG
b46386d52a feat: data buffer and related structs 2024-02-19 22:57:25 +08:00
3 changed files with 809 additions and 6 deletions

View File

@@ -37,8 +37,8 @@ use crate::worker::WorkerId;
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
region_id, expected_last_entry_id, replayed_last_entry_id
"Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
region_id, expected_last_entry_id, replayed_last_entry_id
))]
UnexpectedReplay {
location: Location,
@@ -559,6 +559,13 @@ pub enum Error {
#[snafu(display("Encode null value"))]
IndexEncodeNull { location: Location },
#[snafu(display("Failed to encode memtable to Parquet bytes"))]
EncodeMemtable {
#[snafu(source)]
error: parquet::errors::ParquetError,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -662,6 +669,7 @@ impl ErrorExt for Error {
FilterRecordBatch { source, .. } => source.status_code(),
Upload { .. } => StatusCode::StorageUnavailable,
BiError { .. } => StatusCode::Internal,
EncodeMemtable { .. } => StatusCode::Internal,
}
}

View File

@@ -14,8 +14,688 @@
//! Data part of a shard.
/// Buffer to store columns not in the primary key.
pub struct DataBuffer {}
use std::cmp::{Ordering, Reverse};
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow;
use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array};
use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
use datatypes::schema::ColumnSchema;
use datatypes::types::TimestampType;
use datatypes::vectors::{
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
UInt8VectorBuilder,
};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::{PkId, PkIndex};
const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone)]
pub struct DataBatch {
/// Primary key index of this batch.
pk_index: PkIndex,
/// Record batch of data.
rb: Arc<RecordBatch>,
/// Range of current primary key inside record batch
range: Range<usize>,
}
impl DataBatch {
pub(crate) fn pk_index(&self) -> PkIndex {
self.pk_index
}
pub(crate) fn record_batch(&self) -> &RecordBatch {
&self.rb
}
pub(crate) fn range(&self) -> Range<usize> {
self.range.clone()
}
pub(crate) fn slice_record_batch(&self) -> RecordBatch {
self.rb.slice(self.range.start, self.range.len())
}
}
/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
pub struct DataBuffer {
metadata: RegionMetadataRef,
/// Schema for data part (primary keys are replaced with pk_index)
data_part_schema: SchemaRef,
/// Data types for field columns.
field_types: Vec<ConcreteDataType>,
/// Builder for primary key index.
pk_index_builder: UInt16VectorBuilder,
/// Builder for timestamp column.
ts_builder: Box<dyn MutableVector>,
/// Builder for sequence column.
sequence_builder: UInt64VectorBuilder,
/// Builder for op_type column.
op_type_builder: UInt8VectorBuilder,
/// Builders for field columns.
field_builders: Vec<Option<Box<dyn MutableVector>>>,
/// Threshold for freezing data buffer.
freeze_threshold: usize,
}
impl DataBuffer {
pub fn with_capacity(
metadata: RegionMetadataRef,
init_capacity: usize,
freeze_threshold: usize,
) -> Self {
let ts_builder = metadata
.time_index_column()
.column_schema
.data_type
.create_mutable_vector(init_capacity);
let pk_id_builder = UInt16VectorBuilder::with_capacity(init_capacity);
let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity);
let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity);
let field_types = metadata
.field_columns()
.map(|c| c.column_schema.data_type.clone())
.collect::<Vec<_>>();
let field_builders = (0..field_types.len()).map(|_| None).collect();
let data_part_schema = memtable_schema_to_encoded_schema(&metadata);
Self {
metadata,
data_part_schema,
field_types,
pk_index_builder: pk_id_builder,
ts_builder,
sequence_builder,
op_type_builder,
field_builders,
freeze_threshold,
}
}
/// Writes a row to data buffer.
pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) -> bool {
self.ts_builder.push_value_ref(kv.timestamp());
self.pk_index_builder.push(Some(pk_id.pk_index));
self.sequence_builder.push(Some(kv.sequence()));
self.op_type_builder.push(Some(kv.op_type() as u8));
debug_assert_eq!(self.field_builders.len(), kv.num_fields());
for (idx, field) in kv.fields().enumerate() {
self.field_builders[idx]
.get_or_insert_with(|| {
let mut builder =
self.field_types[idx].create_mutable_vector(self.ts_builder.len());
builder.push_nulls(self.ts_builder.len() - 1);
builder
})
.push_value_ref(field);
}
self.ts_builder.len() >= self.freeze_threshold
}
/// Freezes `DataBuffer` to bytes. Use `pk_weights` to convert pk_id to pk sort order.
/// `freeze` clears the buffers of builders.
pub fn freeze(&mut self, _pk_weights: &[u16]) -> Result<DataPart> {
// we need distinguish between `freeze` in `ShardWriter` And `Shard`.
todo!()
}
/// Reads batches from data buffer without resetting builder's buffers.
pub fn iter(&mut self, pk_weights: &[u16]) -> Result<DataBufferIter> {
let batch =
data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?;
DataBufferIter::new(batch)
}
/// Returns num of rows in data buffer.
pub fn num_rows(&self) -> usize {
self.ts_builder.len()
}
/// Returns whether the buffer is empty.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
}
/// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights.
fn data_buffer_to_record_batches(
schema: SchemaRef,
buffer: &mut DataBuffer,
pk_weights: &[u16],
keep_data: bool,
) -> Result<RecordBatch> {
let num_rows = buffer.ts_builder.len();
let (pk_index_v, ts_v, sequence_v, op_type_v) = if keep_data {
(
buffer.pk_index_builder.finish_cloned(),
buffer.ts_builder.to_vector_cloned(),
buffer.sequence_builder.finish_cloned(),
buffer.op_type_builder.finish_cloned(),
)
} else {
(
buffer.pk_index_builder.finish(),
buffer.ts_builder.to_vector(),
buffer.sequence_builder.finish(),
buffer.op_type_builder.finish(),
)
};
let mut rows = build_rows_to_sort(pk_weights, &pk_index_v, &ts_v, &sequence_v);
// sort and dedup
rows.sort_unstable_by(|l, r| l.1.cmp(&r.1));
rows.dedup_by(|l, r| l.1.timestamp == r.1.timestamp);
let indices_to_take = UInt32Array::from_iter_values(rows.into_iter().map(|v| v.0 as u32));
let mut columns = Vec::with_capacity(4 + buffer.field_builders.len());
columns.push(
arrow::compute::take(&pk_index_v.as_arrow(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
columns.push(
arrow::compute::take(&ts_v.to_arrow_array(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
columns.push(
arrow::compute::take(&sequence_v.as_arrow(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
columns.push(
arrow::compute::take(&op_type_v.as_arrow(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
for (idx, c) in buffer.field_builders.iter_mut().enumerate() {
let array = match c {
None => {
let mut single_null = buffer.field_types[idx].create_mutable_vector(num_rows);
single_null.push_nulls(num_rows);
single_null.to_vector().to_arrow_array()
}
Some(v) => {
if keep_data {
v.to_vector_cloned().to_arrow_array()
} else {
v.to_vector().to_arrow_array()
}
}
};
columns.push(
arrow::compute::take(&array, &indices_to_take, None)
.context(error::ComputeArrowSnafu)?,
);
}
RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
}
#[derive(Debug)]
pub(crate) struct DataBufferIter {
batch: Arc<RecordBatch>,
offset: usize,
current_data_batch: Option<DataBatch>,
}
impl DataBufferIter {
pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
let mut iter = Self {
batch: Arc::new(batch),
offset: 0,
current_data_batch: None,
};
iter.next()?; // fill data batch for comparison and merge.
Ok(iter)
}
pub(crate) fn is_valid(&self) -> bool {
self.current_data_batch.is_some()
}
/// # Panics
/// If Current iterator is not exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
self.current_data_batch.as_ref().unwrap().clone()
}
/// # Panics
/// If Current iterator is not exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_data_batch.as_ref().unwrap().pk_index
}
/// Advances iterator to next data batch.
pub(crate) fn next(&mut self) -> Result<()> {
if self.offset >= self.batch.num_rows() {
self.current_data_batch = None;
return Ok(());
}
let pk_index_array = pk_index_array(&self.batch);
if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
self.offset = range.end;
self.current_data_batch = Some(DataBatch {
pk_index: next_pk,
rb: self.batch.clone(),
range,
})
} else {
self.current_data_batch = None;
}
Ok(())
}
}
/// Gets `pk_index` array from record batch.
/// # Panics
/// If pk index column is not the first column or the type is not `UInt16Array`.
fn pk_index_array(batch: &RecordBatch) -> &UInt16Array {
batch
.column(0)
.as_any()
.downcast_ref::<UInt16Array>()
.unwrap()
}
/// Searches for next pk index, and it's offset range in a sorted `UInt16Array`.
fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range<usize>)> {
let num_rows = array.len();
if start >= num_rows {
return None;
}
let next_pk = array.value(start);
for idx in start..num_rows {
if array.value(idx) != next_pk {
return Some((next_pk, start..idx));
}
}
Some((next_pk, start..num_rows))
}
#[derive(Eq, PartialEq)]
struct InnerKey {
pk_weight: u16,
timestamp: i64,
sequence: u64,
}
impl PartialOrd for InnerKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for InnerKey {
fn cmp(&self, other: &Self) -> Ordering {
(self.pk_weight, self.timestamp, Reverse(self.sequence)).cmp(&(
other.pk_weight,
other.timestamp,
Reverse(other.sequence),
))
}
}
fn build_rows_to_sort(
pk_weights: &[u16],
pk_index: &UInt16Vector,
ts: &VectorRef,
sequence: &UInt64Vector,
) -> Vec<(usize, InnerKey)> {
let ts_values = match ts.data_type() {
ConcreteDataType::Timestamp(t) => match t {
TimestampType::Second(_) => ts
.as_any()
.downcast_ref::<TimestampSecondVector>()
.unwrap()
.as_arrow()
.values(),
TimestampType::Millisecond(_) => ts
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.as_arrow()
.values(),
TimestampType::Microsecond(_) => ts
.as_any()
.downcast_ref::<TimestampMicrosecondVector>()
.unwrap()
.as_arrow()
.values(),
TimestampType::Nanosecond(_) => ts
.as_any()
.downcast_ref::<TimestampNanosecondVector>()
.unwrap()
.as_arrow()
.values(),
},
other => unreachable!("Unexpected type {:?}", other),
};
let pk_index_values = pk_index.as_arrow().values();
let sequence_values = sequence.as_arrow().values();
debug_assert_eq!(ts_values.len(), pk_index_values.len());
debug_assert_eq!(ts_values.len(), sequence_values.len());
ts_values
.iter()
.zip(pk_index_values.iter())
.zip(sequence_values.iter())
.enumerate()
.map(|(idx, ((timestamp, pk_index), sequence))| {
(
idx,
InnerKey {
timestamp: *timestamp,
pk_weight: pk_weights[*pk_index as usize],
sequence: *sequence,
},
)
})
.collect()
}
fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {
use datatypes::arrow::datatypes::DataType;
let ColumnSchema {
name: ts_name,
data_type: ts_type,
..
} = &schema.time_index_column().column_schema;
let mut fields = vec![
Field::new(PK_INDEX_COLUMN_NAME, DataType::UInt16, false),
Field::new(ts_name, ts_type.as_arrow_type(), false),
Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
];
fields.extend(schema.field_columns().map(|c| {
Field::new(
&c.column_schema.name,
c.column_schema.data_type.as_arrow_type(),
c.column_schema.is_nullable(),
)
}));
Arc::new(Schema::new(fields))
}
struct DataPartEncoder<'a> {
schema: SchemaRef,
pk_weights: &'a [u16],
row_group_size: Option<usize>,
}
impl<'a> DataPartEncoder<'a> {
pub fn new(
metadata: &RegionMetadataRef,
pk_weights: &'a [u16],
row_group_size: Option<usize>,
) -> DataPartEncoder<'a> {
let schema = memtable_schema_to_encoded_schema(metadata);
Self {
schema,
pk_weights,
row_group_size,
}
}
fn writer_props(&self) -> Option<WriterProperties> {
self.row_group_size.map(|size| {
WriterProperties::builder()
.set_max_row_group_size(size)
.build()
})
}
pub fn write(&self, source: &mut DataBuffer) -> Result<Bytes> {
let mut bytes = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props())
.context(error::EncodeMemtableSnafu)?;
let rb =
data_buffer_to_record_batches(self.schema.clone(), source, self.pk_weights, false)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?;
Ok(Bytes::from(bytes))
}
}
/// Format of immutable data part.
pub enum DataPart {
Parquet(Bytes),
}
impl DataPart {
fn is_empty(&self) -> bool {
match self {
DataPart::Parquet(data) => data.is_empty(),
}
}
}
/// Data parts under a shard.
pub struct DataParts {}
#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
use datatypes::arrow::array::{TimestampMillisecondArray, UInt16Array};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::data_type::AsBytes;
use super::*;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
fn check_test_data_buffer_to_record_batches(keep_data: bool) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3);
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch = data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data).unwrap();
assert_eq!(
vec![1, 2, 1, 2],
batch
.column_by_name("ts")
.unwrap()
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>()
);
assert_eq!(
vec![1, 1, 0, 0],
batch
.column_by_name(PK_INDEX_COLUMN_NAME)
.unwrap()
.as_any()
.downcast_ref::<UInt16Array>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>()
);
assert_eq!(
vec![Some(1.1), None, Some(0.1), Some(1.1)],
batch
.column_by_name("v1")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.iter()
.collect::<Vec<_>>()
);
if keep_data {
assert_eq!(5, buffer.num_rows());
} else {
assert_eq!(0, buffer.num_rows());
}
}
#[test]
fn test_data_buffer_to_record_batches() {
check_test_data_buffer_to_record_batches(true);
check_test_data_buffer_to_record_batches(false);
}
fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);
for kv in kvs.iter() {
buffer.write_row(
PkId {
shard_id: 0,
pk_index,
},
kv,
);
}
}
#[test]
fn test_encode_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
// write rows with null values.
write_rows_to_buffer(
&mut buffer,
&meta,
2,
vec![0, 1, 2],
vec![Some(1.0), None, Some(3.0)],
2,
);
assert_eq!(3, buffer.num_rows());
write_rows_to_buffer(&mut buffer, &meta, 2, vec![1], vec![Some(2.0)], 3);
assert_eq!(4, buffer.num_rows());
let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoded = encoder.write(&mut buffer).unwrap();
let s = String::from_utf8_lossy(encoded.as_bytes());
assert!(s.starts_with("PAR1"));
assert!(s.ends_with("PAR1"));
let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
let mut reader = builder.build().unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(3, batch.num_rows());
}
fn check_buffer_values_equal(iter: &mut DataBufferIter, expected_values: &[Vec<f64>]) {
let mut output = Vec::with_capacity(expected_values.len());
while iter.is_valid() {
let batch = iter.current_data_batch().slice_record_batch();
let values = batch
.column_by_name("v1")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>();
output.push(values);
iter.next().unwrap();
}
assert_eq!(expected_values, output);
}
#[test]
fn test_search_next_pk_range() {
let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]);
assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap());
assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap());
assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap());
assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap());
assert_eq!(None, search_next_pk_range(&a, 6));
}
#[test]
fn test_iter_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
write_rows_to_buffer(
&mut buffer,
&meta,
3,
vec![1, 2, 3],
vec![Some(1.1), Some(2.1), Some(3.1)],
3,
);
write_rows_to_buffer(
&mut buffer,
&meta,
2,
vec![0, 1, 2],
vec![Some(1.0), Some(2.0), Some(3.0)],
2,
);
let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]);
}
#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}
}

View File

@@ -17,8 +17,13 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
@@ -83,3 +88,113 @@ impl MemtableBuilder for EmptyMemtableBuilder {
))
}
}
/// Creates a region metadata to test memtable with default pk.
///
/// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`.
pub(crate) fn metadata_for_test() -> RegionMetadataRef {
metadata_with_primary_key(vec![0, 1])
}
/// Creates a region metadata to test memtable and specific primary key.
///
/// The schema is `k0, k1, ts, v0, v1`.
pub(crate) fn metadata_with_primary_key(primary_key: Vec<ColumnId>) -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
semantic_type: semantic_type_of_column(0, &primary_key),
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
semantic_type: semantic_type_of_column(1, &primary_key),
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
semantic_type: semantic_type_of_column(3, &primary_key),
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
semantic_type: semantic_type_of_column(4, &primary_key),
column_id: 4,
})
.primary_key(primary_key);
let region_metadata = builder.build().unwrap();
Arc::new(region_metadata)
}
fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> SemanticType {
if primary_key.contains(&column_id) {
SemanticType::Tag
} else {
SemanticType::Field
}
}
/// Builds key values with timestamps (ms) and sequences for test.
pub(crate) fn build_key_values_with_ts_seq_values(
schema: &RegionMetadataRef,
k0: String,
k1: i64,
timestamps: impl Iterator<Item = i64>,
values: impl Iterator<Item = Option<f64>>,
sequence: SequenceNumber,
) -> KeyValues {
let column_schema = schema
.column_metadatas
.iter()
.map(|c| api::v1::ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
.unwrap()
.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
})
.collect();
let rows = timestamps
.zip(values)
.map(|(ts, v)| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(k0.clone())),
},
api::v1::Value {
value_data: Some(ValueData::I64Value(k1)),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(ts)),
},
api::v1::Value {
value_data: Some(ValueData::I64Value(ts)),
},
api::v1::Value {
value_data: v.map(ValueData::F64Value),
},
],
})
.collect();
let mutation = api::v1::Mutation {
op_type: 1,
sequence,
rows: Some(Rows {
schema: column_schema,
rows,
}),
};
KeyValues::new(schema.as_ref(), mutation).unwrap()
}