mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
2 Commits
feat/index
...
feat/merge
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ed98ff558 | ||
|
|
b46386d52a |
@@ -37,8 +37,8 @@ use crate::worker::WorkerId;
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
|
||||
region_id, expected_last_entry_id, replayed_last_entry_id
|
||||
"Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
|
||||
region_id, expected_last_entry_id, replayed_last_entry_id
|
||||
))]
|
||||
UnexpectedReplay {
|
||||
location: Location,
|
||||
@@ -559,6 +559,13 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Encode null value"))]
|
||||
IndexEncodeNull { location: Location },
|
||||
|
||||
#[snafu(display("Failed to encode memtable to Parquet bytes"))]
|
||||
EncodeMemtable {
|
||||
#[snafu(source)]
|
||||
error: parquet::errors::ParquetError,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -662,6 +669,7 @@ impl ErrorExt for Error {
|
||||
FilterRecordBatch { source, .. } => source.status_code(),
|
||||
Upload { .. } => StatusCode::StorageUnavailable,
|
||||
BiError { .. } => StatusCode::Internal,
|
||||
EncodeMemtable { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,8 +14,688 @@
|
||||
|
||||
//! Data part of a shard.
|
||||
|
||||
/// Buffer to store columns not in the primary key.
|
||||
pub struct DataBuffer {}
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array};
|
||||
use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::vectors::{
|
||||
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
|
||||
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
|
||||
UInt8VectorBuilder,
|
||||
};
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::merge_tree::{PkId, PkIndex};
|
||||
|
||||
const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
|
||||
|
||||
/// Data part batches returns by `DataParts::read`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DataBatch {
|
||||
/// Primary key index of this batch.
|
||||
pk_index: PkIndex,
|
||||
/// Record batch of data.
|
||||
rb: Arc<RecordBatch>,
|
||||
/// Range of current primary key inside record batch
|
||||
range: Range<usize>,
|
||||
}
|
||||
|
||||
impl DataBatch {
|
||||
pub(crate) fn pk_index(&self) -> PkIndex {
|
||||
self.pk_index
|
||||
}
|
||||
|
||||
pub(crate) fn record_batch(&self) -> &RecordBatch {
|
||||
&self.rb
|
||||
}
|
||||
|
||||
pub(crate) fn range(&self) -> Range<usize> {
|
||||
self.range.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn slice_record_batch(&self) -> RecordBatch {
|
||||
self.rb.slice(self.range.start, self.range.len())
|
||||
}
|
||||
}
|
||||
|
||||
/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
|
||||
pub struct DataBuffer {
|
||||
metadata: RegionMetadataRef,
|
||||
/// Schema for data part (primary keys are replaced with pk_index)
|
||||
data_part_schema: SchemaRef,
|
||||
/// Data types for field columns.
|
||||
field_types: Vec<ConcreteDataType>,
|
||||
/// Builder for primary key index.
|
||||
pk_index_builder: UInt16VectorBuilder,
|
||||
/// Builder for timestamp column.
|
||||
ts_builder: Box<dyn MutableVector>,
|
||||
/// Builder for sequence column.
|
||||
sequence_builder: UInt64VectorBuilder,
|
||||
/// Builder for op_type column.
|
||||
op_type_builder: UInt8VectorBuilder,
|
||||
/// Builders for field columns.
|
||||
field_builders: Vec<Option<Box<dyn MutableVector>>>,
|
||||
/// Threshold for freezing data buffer.
|
||||
freeze_threshold: usize,
|
||||
}
|
||||
|
||||
impl DataBuffer {
|
||||
pub fn with_capacity(
|
||||
metadata: RegionMetadataRef,
|
||||
init_capacity: usize,
|
||||
freeze_threshold: usize,
|
||||
) -> Self {
|
||||
let ts_builder = metadata
|
||||
.time_index_column()
|
||||
.column_schema
|
||||
.data_type
|
||||
.create_mutable_vector(init_capacity);
|
||||
|
||||
let pk_id_builder = UInt16VectorBuilder::with_capacity(init_capacity);
|
||||
let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity);
|
||||
let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity);
|
||||
|
||||
let field_types = metadata
|
||||
.field_columns()
|
||||
.map(|c| c.column_schema.data_type.clone())
|
||||
.collect::<Vec<_>>();
|
||||
let field_builders = (0..field_types.len()).map(|_| None).collect();
|
||||
|
||||
let data_part_schema = memtable_schema_to_encoded_schema(&metadata);
|
||||
Self {
|
||||
metadata,
|
||||
data_part_schema,
|
||||
field_types,
|
||||
pk_index_builder: pk_id_builder,
|
||||
ts_builder,
|
||||
sequence_builder,
|
||||
op_type_builder,
|
||||
field_builders,
|
||||
freeze_threshold,
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes a row to data buffer.
|
||||
pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) -> bool {
|
||||
self.ts_builder.push_value_ref(kv.timestamp());
|
||||
self.pk_index_builder.push(Some(pk_id.pk_index));
|
||||
self.sequence_builder.push(Some(kv.sequence()));
|
||||
self.op_type_builder.push(Some(kv.op_type() as u8));
|
||||
|
||||
debug_assert_eq!(self.field_builders.len(), kv.num_fields());
|
||||
|
||||
for (idx, field) in kv.fields().enumerate() {
|
||||
self.field_builders[idx]
|
||||
.get_or_insert_with(|| {
|
||||
let mut builder =
|
||||
self.field_types[idx].create_mutable_vector(self.ts_builder.len());
|
||||
builder.push_nulls(self.ts_builder.len() - 1);
|
||||
builder
|
||||
})
|
||||
.push_value_ref(field);
|
||||
}
|
||||
|
||||
self.ts_builder.len() >= self.freeze_threshold
|
||||
}
|
||||
|
||||
/// Freezes `DataBuffer` to bytes. Use `pk_weights` to convert pk_id to pk sort order.
|
||||
/// `freeze` clears the buffers of builders.
|
||||
pub fn freeze(&mut self, _pk_weights: &[u16]) -> Result<DataPart> {
|
||||
// we need distinguish between `freeze` in `ShardWriter` And `Shard`.
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Reads batches from data buffer without resetting builder's buffers.
|
||||
pub fn iter(&mut self, pk_weights: &[u16]) -> Result<DataBufferIter> {
|
||||
let batch =
|
||||
data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?;
|
||||
DataBufferIter::new(batch)
|
||||
}
|
||||
|
||||
/// Returns num of rows in data buffer.
|
||||
pub fn num_rows(&self) -> usize {
|
||||
self.ts_builder.len()
|
||||
}
|
||||
|
||||
/// Returns whether the buffer is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.num_rows() == 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights.
|
||||
fn data_buffer_to_record_batches(
|
||||
schema: SchemaRef,
|
||||
buffer: &mut DataBuffer,
|
||||
pk_weights: &[u16],
|
||||
keep_data: bool,
|
||||
) -> Result<RecordBatch> {
|
||||
let num_rows = buffer.ts_builder.len();
|
||||
|
||||
let (pk_index_v, ts_v, sequence_v, op_type_v) = if keep_data {
|
||||
(
|
||||
buffer.pk_index_builder.finish_cloned(),
|
||||
buffer.ts_builder.to_vector_cloned(),
|
||||
buffer.sequence_builder.finish_cloned(),
|
||||
buffer.op_type_builder.finish_cloned(),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
buffer.pk_index_builder.finish(),
|
||||
buffer.ts_builder.to_vector(),
|
||||
buffer.sequence_builder.finish(),
|
||||
buffer.op_type_builder.finish(),
|
||||
)
|
||||
};
|
||||
|
||||
let mut rows = build_rows_to_sort(pk_weights, &pk_index_v, &ts_v, &sequence_v);
|
||||
|
||||
// sort and dedup
|
||||
rows.sort_unstable_by(|l, r| l.1.cmp(&r.1));
|
||||
rows.dedup_by(|l, r| l.1.timestamp == r.1.timestamp);
|
||||
let indices_to_take = UInt32Array::from_iter_values(rows.into_iter().map(|v| v.0 as u32));
|
||||
|
||||
let mut columns = Vec::with_capacity(4 + buffer.field_builders.len());
|
||||
|
||||
columns.push(
|
||||
arrow::compute::take(&pk_index_v.as_arrow(), &indices_to_take, None)
|
||||
.context(error::ComputeArrowSnafu)?,
|
||||
);
|
||||
|
||||
columns.push(
|
||||
arrow::compute::take(&ts_v.to_arrow_array(), &indices_to_take, None)
|
||||
.context(error::ComputeArrowSnafu)?,
|
||||
);
|
||||
|
||||
columns.push(
|
||||
arrow::compute::take(&sequence_v.as_arrow(), &indices_to_take, None)
|
||||
.context(error::ComputeArrowSnafu)?,
|
||||
);
|
||||
|
||||
columns.push(
|
||||
arrow::compute::take(&op_type_v.as_arrow(), &indices_to_take, None)
|
||||
.context(error::ComputeArrowSnafu)?,
|
||||
);
|
||||
|
||||
for (idx, c) in buffer.field_builders.iter_mut().enumerate() {
|
||||
let array = match c {
|
||||
None => {
|
||||
let mut single_null = buffer.field_types[idx].create_mutable_vector(num_rows);
|
||||
single_null.push_nulls(num_rows);
|
||||
single_null.to_vector().to_arrow_array()
|
||||
}
|
||||
Some(v) => {
|
||||
if keep_data {
|
||||
v.to_vector_cloned().to_arrow_array()
|
||||
} else {
|
||||
v.to_vector().to_arrow_array()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
columns.push(
|
||||
arrow::compute::take(&array, &indices_to_take, None)
|
||||
.context(error::ComputeArrowSnafu)?,
|
||||
);
|
||||
}
|
||||
|
||||
RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DataBufferIter {
|
||||
batch: Arc<RecordBatch>,
|
||||
offset: usize,
|
||||
current_data_batch: Option<DataBatch>,
|
||||
}
|
||||
|
||||
impl DataBufferIter {
|
||||
pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
|
||||
let mut iter = Self {
|
||||
batch: Arc::new(batch),
|
||||
offset: 0,
|
||||
current_data_batch: None,
|
||||
};
|
||||
iter.next()?; // fill data batch for comparison and merge.
|
||||
Ok(iter)
|
||||
}
|
||||
|
||||
pub(crate) fn is_valid(&self) -> bool {
|
||||
self.current_data_batch.is_some()
|
||||
}
|
||||
|
||||
/// # Panics
|
||||
/// If Current iterator is not exhausted.
|
||||
pub(crate) fn current_data_batch(&self) -> DataBatch {
|
||||
self.current_data_batch.as_ref().unwrap().clone()
|
||||
}
|
||||
|
||||
/// # Panics
|
||||
/// If Current iterator is not exhausted.
|
||||
pub(crate) fn current_pk_index(&self) -> PkIndex {
|
||||
self.current_data_batch.as_ref().unwrap().pk_index
|
||||
}
|
||||
|
||||
/// Advances iterator to next data batch.
|
||||
pub(crate) fn next(&mut self) -> Result<()> {
|
||||
if self.offset >= self.batch.num_rows() {
|
||||
self.current_data_batch = None;
|
||||
return Ok(());
|
||||
}
|
||||
let pk_index_array = pk_index_array(&self.batch);
|
||||
if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
|
||||
self.offset = range.end;
|
||||
self.current_data_batch = Some(DataBatch {
|
||||
pk_index: next_pk,
|
||||
rb: self.batch.clone(),
|
||||
range,
|
||||
})
|
||||
} else {
|
||||
self.current_data_batch = None;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets `pk_index` array from record batch.
|
||||
/// # Panics
|
||||
/// If pk index column is not the first column or the type is not `UInt16Array`.
|
||||
fn pk_index_array(batch: &RecordBatch) -> &UInt16Array {
|
||||
batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt16Array>()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Searches for next pk index, and it's offset range in a sorted `UInt16Array`.
|
||||
fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range<usize>)> {
|
||||
let num_rows = array.len();
|
||||
if start >= num_rows {
|
||||
return None;
|
||||
}
|
||||
|
||||
let next_pk = array.value(start);
|
||||
for idx in start..num_rows {
|
||||
if array.value(idx) != next_pk {
|
||||
return Some((next_pk, start..idx));
|
||||
}
|
||||
}
|
||||
Some((next_pk, start..num_rows))
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
struct InnerKey {
|
||||
pk_weight: u16,
|
||||
timestamp: i64,
|
||||
sequence: u64,
|
||||
}
|
||||
|
||||
impl PartialOrd for InnerKey {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for InnerKey {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
(self.pk_weight, self.timestamp, Reverse(self.sequence)).cmp(&(
|
||||
other.pk_weight,
|
||||
other.timestamp,
|
||||
Reverse(other.sequence),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn build_rows_to_sort(
|
||||
pk_weights: &[u16],
|
||||
pk_index: &UInt16Vector,
|
||||
ts: &VectorRef,
|
||||
sequence: &UInt64Vector,
|
||||
) -> Vec<(usize, InnerKey)> {
|
||||
let ts_values = match ts.data_type() {
|
||||
ConcreteDataType::Timestamp(t) => match t {
|
||||
TimestampType::Second(_) => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondVector>()
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.values(),
|
||||
TimestampType::Millisecond(_) => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.values(),
|
||||
TimestampType::Microsecond(_) => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondVector>()
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.values(),
|
||||
TimestampType::Nanosecond(_) => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondVector>()
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.values(),
|
||||
},
|
||||
other => unreachable!("Unexpected type {:?}", other),
|
||||
};
|
||||
let pk_index_values = pk_index.as_arrow().values();
|
||||
let sequence_values = sequence.as_arrow().values();
|
||||
debug_assert_eq!(ts_values.len(), pk_index_values.len());
|
||||
debug_assert_eq!(ts_values.len(), sequence_values.len());
|
||||
|
||||
ts_values
|
||||
.iter()
|
||||
.zip(pk_index_values.iter())
|
||||
.zip(sequence_values.iter())
|
||||
.enumerate()
|
||||
.map(|(idx, ((timestamp, pk_index), sequence))| {
|
||||
(
|
||||
idx,
|
||||
InnerKey {
|
||||
timestamp: *timestamp,
|
||||
pk_weight: pk_weights[*pk_index as usize],
|
||||
sequence: *sequence,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {
|
||||
use datatypes::arrow::datatypes::DataType;
|
||||
let ColumnSchema {
|
||||
name: ts_name,
|
||||
data_type: ts_type,
|
||||
..
|
||||
} = &schema.time_index_column().column_schema;
|
||||
|
||||
let mut fields = vec![
|
||||
Field::new(PK_INDEX_COLUMN_NAME, DataType::UInt16, false),
|
||||
Field::new(ts_name, ts_type.as_arrow_type(), false),
|
||||
Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
|
||||
Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
|
||||
];
|
||||
|
||||
fields.extend(schema.field_columns().map(|c| {
|
||||
Field::new(
|
||||
&c.column_schema.name,
|
||||
c.column_schema.data_type.as_arrow_type(),
|
||||
c.column_schema.is_nullable(),
|
||||
)
|
||||
}));
|
||||
|
||||
Arc::new(Schema::new(fields))
|
||||
}
|
||||
|
||||
struct DataPartEncoder<'a> {
|
||||
schema: SchemaRef,
|
||||
pk_weights: &'a [u16],
|
||||
row_group_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a> DataPartEncoder<'a> {
|
||||
pub fn new(
|
||||
metadata: &RegionMetadataRef,
|
||||
pk_weights: &'a [u16],
|
||||
row_group_size: Option<usize>,
|
||||
) -> DataPartEncoder<'a> {
|
||||
let schema = memtable_schema_to_encoded_schema(metadata);
|
||||
Self {
|
||||
schema,
|
||||
pk_weights,
|
||||
row_group_size,
|
||||
}
|
||||
}
|
||||
|
||||
fn writer_props(&self) -> Option<WriterProperties> {
|
||||
self.row_group_size.map(|size| {
|
||||
WriterProperties::builder()
|
||||
.set_max_row_group_size(size)
|
||||
.build()
|
||||
})
|
||||
}
|
||||
pub fn write(&self, source: &mut DataBuffer) -> Result<Bytes> {
|
||||
let mut bytes = Vec::with_capacity(1024);
|
||||
let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props())
|
||||
.context(error::EncodeMemtableSnafu)?;
|
||||
let rb =
|
||||
data_buffer_to_record_batches(self.schema.clone(), source, self.pk_weights, false)?;
|
||||
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
|
||||
let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?;
|
||||
Ok(Bytes::from(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
/// Format of immutable data part.
|
||||
pub enum DataPart {
|
||||
Parquet(Bytes),
|
||||
}
|
||||
|
||||
impl DataPart {
|
||||
fn is_empty(&self) -> bool {
|
||||
match self {
|
||||
DataPart::Parquet(data) => data.is_empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Data parts under a shard.
|
||||
pub struct DataParts {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::arrow::array::Float64Array;
|
||||
use datatypes::arrow::array::{TimestampMillisecondArray, UInt16Array};
|
||||
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
||||
use parquet::data_type::AsBytes;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
|
||||
|
||||
fn check_test_data_buffer_to_record_batches(keep_data: bool) {
|
||||
let meta = metadata_for_test();
|
||||
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
|
||||
|
||||
write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
|
||||
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
|
||||
write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3);
|
||||
assert_eq!(5, buffer.num_rows());
|
||||
let schema = memtable_schema_to_encoded_schema(&meta);
|
||||
let batch = data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
vec![1, 2, 1, 2],
|
||||
batch
|
||||
.column_by_name("ts")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| v.unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
vec![1, 1, 0, 0],
|
||||
batch
|
||||
.column_by_name(PK_INDEX_COLUMN_NAME)
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<UInt16Array>()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| v.unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
vec![Some(1.1), None, Some(0.1), Some(1.1)],
|
||||
batch
|
||||
.column_by_name("v1")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
if keep_data {
|
||||
assert_eq!(5, buffer.num_rows());
|
||||
} else {
|
||||
assert_eq!(0, buffer.num_rows());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_data_buffer_to_record_batches() {
|
||||
check_test_data_buffer_to_record_batches(true);
|
||||
check_test_data_buffer_to_record_batches(false);
|
||||
}
|
||||
|
||||
fn write_rows_to_buffer(
|
||||
buffer: &mut DataBuffer,
|
||||
schema: &RegionMetadataRef,
|
||||
pk_index: u16,
|
||||
ts: Vec<i64>,
|
||||
v0: Vec<Option<f64>>,
|
||||
sequence: u64,
|
||||
) {
|
||||
let kvs = build_key_values_with_ts_seq_values(
|
||||
schema,
|
||||
"whatever".to_string(),
|
||||
1,
|
||||
ts.into_iter(),
|
||||
v0.into_iter(),
|
||||
sequence,
|
||||
);
|
||||
|
||||
for kv in kvs.iter() {
|
||||
buffer.write_row(
|
||||
PkId {
|
||||
shard_id: 0,
|
||||
pk_index,
|
||||
},
|
||||
kv,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_data_buffer() {
|
||||
let meta = metadata_for_test();
|
||||
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
|
||||
|
||||
// write rows with null values.
|
||||
write_rows_to_buffer(
|
||||
&mut buffer,
|
||||
&meta,
|
||||
2,
|
||||
vec![0, 1, 2],
|
||||
vec![Some(1.0), None, Some(3.0)],
|
||||
2,
|
||||
);
|
||||
|
||||
assert_eq!(3, buffer.num_rows());
|
||||
|
||||
write_rows_to_buffer(&mut buffer, &meta, 2, vec![1], vec![Some(2.0)], 3);
|
||||
|
||||
assert_eq!(4, buffer.num_rows());
|
||||
|
||||
let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
|
||||
let encoded = encoder.write(&mut buffer).unwrap();
|
||||
let s = String::from_utf8_lossy(encoded.as_bytes());
|
||||
assert!(s.starts_with("PAR1"));
|
||||
assert!(s.ends_with("PAR1"));
|
||||
|
||||
let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
|
||||
let mut reader = builder.build().unwrap();
|
||||
let batch = reader.next().unwrap().unwrap();
|
||||
assert_eq!(3, batch.num_rows());
|
||||
}
|
||||
|
||||
fn check_buffer_values_equal(iter: &mut DataBufferIter, expected_values: &[Vec<f64>]) {
|
||||
let mut output = Vec::with_capacity(expected_values.len());
|
||||
while iter.is_valid() {
|
||||
let batch = iter.current_data_batch().slice_record_batch();
|
||||
let values = batch
|
||||
.column_by_name("v1")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| v.unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
output.push(values);
|
||||
iter.next().unwrap();
|
||||
}
|
||||
assert_eq!(expected_values, output);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_search_next_pk_range() {
|
||||
let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]);
|
||||
assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap());
|
||||
assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap());
|
||||
assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap());
|
||||
assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap());
|
||||
|
||||
assert_eq!(None, search_next_pk_range(&a, 6));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_data_buffer() {
|
||||
let meta = metadata_for_test();
|
||||
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
|
||||
|
||||
write_rows_to_buffer(
|
||||
&mut buffer,
|
||||
&meta,
|
||||
3,
|
||||
vec![1, 2, 3],
|
||||
vec![Some(1.1), Some(2.1), Some(3.1)],
|
||||
3,
|
||||
);
|
||||
|
||||
write_rows_to_buffer(
|
||||
&mut buffer,
|
||||
&meta,
|
||||
2,
|
||||
vec![0, 1, 2],
|
||||
vec![Some(1.0), Some(2.0), Some(3.0)],
|
||||
2,
|
||||
);
|
||||
|
||||
let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
|
||||
check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_empty_data_buffer() {
|
||||
let meta = metadata_for_test();
|
||||
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, usize::MAX);
|
||||
let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
|
||||
check_buffer_values_equal(&mut iter, &[]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,8 +17,13 @@
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{Row, Rows, SemanticType};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
|
||||
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -83,3 +88,113 @@ impl MemtableBuilder for EmptyMemtableBuilder {
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a region metadata to test memtable with default pk.
|
||||
///
|
||||
/// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`.
|
||||
pub(crate) fn metadata_for_test() -> RegionMetadataRef {
|
||||
metadata_with_primary_key(vec![0, 1])
|
||||
}
|
||||
|
||||
/// Creates a region metadata to test memtable and specific primary key.
|
||||
///
|
||||
/// The schema is `k0, k1, ts, v0, v1`.
|
||||
pub(crate) fn metadata_with_primary_key(primary_key: Vec<ColumnId>) -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
|
||||
semantic_type: semantic_type_of_column(0, &primary_key),
|
||||
column_id: 0,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
|
||||
semantic_type: semantic_type_of_column(1, &primary_key),
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
|
||||
semantic_type: semantic_type_of_column(3, &primary_key),
|
||||
column_id: 3,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
|
||||
semantic_type: semantic_type_of_column(4, &primary_key),
|
||||
column_id: 4,
|
||||
})
|
||||
.primary_key(primary_key);
|
||||
let region_metadata = builder.build().unwrap();
|
||||
Arc::new(region_metadata)
|
||||
}
|
||||
|
||||
fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> SemanticType {
|
||||
if primary_key.contains(&column_id) {
|
||||
SemanticType::Tag
|
||||
} else {
|
||||
SemanticType::Field
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds key values with timestamps (ms) and sequences for test.
|
||||
pub(crate) fn build_key_values_with_ts_seq_values(
|
||||
schema: &RegionMetadataRef,
|
||||
k0: String,
|
||||
k1: i64,
|
||||
timestamps: impl Iterator<Item = i64>,
|
||||
values: impl Iterator<Item = Option<f64>>,
|
||||
sequence: SequenceNumber,
|
||||
) -> KeyValues {
|
||||
let column_schema = schema
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|c| api::v1::ColumnSchema {
|
||||
column_name: c.column_schema.name.clone(),
|
||||
datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
|
||||
.unwrap()
|
||||
.datatype() as i32,
|
||||
semantic_type: c.semantic_type as i32,
|
||||
..Default::default()
|
||||
})
|
||||
.collect();
|
||||
|
||||
let rows = timestamps
|
||||
.zip(values)
|
||||
.map(|(ts, v)| Row {
|
||||
values: vec![
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::StringValue(k0.clone())),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::I64Value(k1)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::I64Value(ts)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: v.map(ValueData::F64Value),
|
||||
},
|
||||
],
|
||||
})
|
||||
.collect();
|
||||
let mutation = api::v1::Mutation {
|
||||
op_type: 1,
|
||||
sequence,
|
||||
rows: Some(Rows {
|
||||
schema: column_schema,
|
||||
rows,
|
||||
}),
|
||||
};
|
||||
KeyValues::new(schema.as_ref(), mutation).unwrap()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user