mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
refactor: Rename value_type to op_type (#185)
This commit is contained in:
@@ -14,7 +14,7 @@ use datatypes::{
|
||||
};
|
||||
use rand::{distributions::Alphanumeric, prelude::ThreadRng, Rng};
|
||||
use storage::memtable::KeyValues;
|
||||
use store_api::storage::{SequenceNumber, ValueType};
|
||||
use store_api::storage::{OpType, SequenceNumber};
|
||||
|
||||
static NEXT_SEQUENCE: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
@@ -50,7 +50,7 @@ fn random_kvs(len: usize, value_size: usize) -> (Vec<KeyTuple>, Vec<ValueTuple>)
|
||||
|
||||
fn kvs_with_index(
|
||||
sequence: SequenceNumber,
|
||||
value_type: ValueType,
|
||||
op_type: OpType,
|
||||
start_index_in_batch: usize,
|
||||
keys: &[(i64, u64)],
|
||||
values: &[(Option<u64>, String)],
|
||||
@@ -81,7 +81,7 @@ fn kvs_with_index(
|
||||
];
|
||||
KeyValues {
|
||||
sequence,
|
||||
value_type,
|
||||
op_type,
|
||||
start_index_in_batch,
|
||||
keys: row_keys,
|
||||
values: row_values,
|
||||
@@ -92,7 +92,7 @@ fn generate_kv(kv_size: usize, start_index_in_batch: usize, value_size: usize) -
|
||||
let (keys, values) = random_kvs(kv_size, value_size);
|
||||
kvs_with_index(
|
||||
get_sequence(),
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
start_index_in_batch,
|
||||
&keys,
|
||||
&values,
|
||||
|
||||
@@ -7,7 +7,7 @@ mod version;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::vectors::VectorRef;
|
||||
use store_api::storage::{consts, SequenceNumber, ValueType};
|
||||
use store_api::storage::{consts, OpType, SequenceNumber};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::btree::BTreeMemtable;
|
||||
@@ -100,7 +100,7 @@ pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
|
||||
/// Key-value pairs in columnar format.
|
||||
pub struct KeyValues {
|
||||
pub sequence: SequenceNumber,
|
||||
pub value_type: ValueType,
|
||||
pub op_type: OpType,
|
||||
/// Start index of these key-value paris in batch. Each row in the same batch has
|
||||
/// a unique index to identify it.
|
||||
pub start_index_in_batch: usize,
|
||||
@@ -110,8 +110,8 @@ pub struct KeyValues {
|
||||
|
||||
impl KeyValues {
|
||||
// Note that `sequence` is not reset.
|
||||
fn reset(&mut self, value_type: ValueType, index_in_batch: usize) {
|
||||
self.value_type = value_type;
|
||||
fn reset(&mut self, op_type: OpType, index_in_batch: usize) {
|
||||
self.op_type = op_type;
|
||||
self.start_index_in_batch = index_in_batch;
|
||||
self.keys.clear();
|
||||
self.values.clear();
|
||||
|
||||
@@ -11,7 +11,7 @@ use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, VectorBuilder,
|
||||
};
|
||||
use store_api::storage::{SequenceNumber, ValueType};
|
||||
use store_api::storage::{OpType, SequenceNumber};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::{
|
||||
@@ -122,7 +122,7 @@ impl BTreeIterator {
|
||||
map.range(..)
|
||||
};
|
||||
|
||||
let (keys, sequences, value_types, values) = if self.ctx.for_flush {
|
||||
let (keys, sequences, op_types, values) = if self.ctx.for_flush {
|
||||
collect_iter(iter, self.ctx.batch_size)
|
||||
} else {
|
||||
let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence);
|
||||
@@ -150,7 +150,7 @@ impl BTreeIterator {
|
||||
Some(Batch {
|
||||
keys: rows_to_vectors(key_data_types, keys.as_slice()),
|
||||
sequences,
|
||||
value_types,
|
||||
op_types,
|
||||
values: rows_to_vectors(value_data_types, values.as_slice()),
|
||||
})
|
||||
}
|
||||
@@ -167,16 +167,16 @@ fn collect_iter<'a, I: Iterator<Item = (&'a InnerKey, &'a RowValue)>>(
|
||||
) {
|
||||
let mut keys = Vec::with_capacity(batch_size);
|
||||
let mut sequences = UInt64VectorBuilder::with_capacity(batch_size);
|
||||
let mut value_types = UInt8VectorBuilder::with_capacity(batch_size);
|
||||
let mut op_types = UInt8VectorBuilder::with_capacity(batch_size);
|
||||
let mut values = Vec::with_capacity(batch_size);
|
||||
for (inner_key, row_value) in iter.take(batch_size) {
|
||||
keys.push(inner_key);
|
||||
sequences.push(Some(inner_key.sequence));
|
||||
value_types.push(Some(inner_key.value_type.as_u8()));
|
||||
op_types.push(Some(inner_key.op_type.as_u8()));
|
||||
values.push(row_value);
|
||||
}
|
||||
|
||||
(keys, sequences.finish(), value_types.finish(), values)
|
||||
(keys, sequences.finish(), op_types.finish(), values)
|
||||
}
|
||||
|
||||
/// `MapIterWrapper` removes same user key with invisible sequence.
|
||||
@@ -260,7 +260,7 @@ impl<'a> IterRow<'a> {
|
||||
row_key,
|
||||
sequence: self.kvs.sequence,
|
||||
index_in_batch: self.kvs.start_index_in_batch + self.index,
|
||||
value_type: self.kvs.value_type,
|
||||
op_type: self.kvs.op_type,
|
||||
};
|
||||
|
||||
let row_value = RowValue {
|
||||
@@ -299,18 +299,18 @@ struct InnerKey {
|
||||
row_key: Vec<Value>,
|
||||
sequence: SequenceNumber,
|
||||
index_in_batch: usize,
|
||||
value_type: ValueType,
|
||||
op_type: OpType,
|
||||
}
|
||||
|
||||
impl Ord for InnerKey {
|
||||
fn cmp(&self, other: &InnerKey) -> Ordering {
|
||||
// Order by (row_key asc, sequence desc, index_in_batch desc, value type desc), though (key,
|
||||
// Order by (row_key asc, sequence desc, index_in_batch desc, op_type desc), though (key,
|
||||
// sequence, index_in_batch) should be enough to disambiguate.
|
||||
self.row_key
|
||||
.cmp(&other.row_key)
|
||||
.then_with(|| other.sequence.cmp(&self.sequence))
|
||||
.then_with(|| other.index_in_batch.cmp(&self.index_in_batch))
|
||||
.then_with(|| other.value_type.cmp(&self.value_type))
|
||||
.then_with(|| other.op_type.cmp(&self.op_type))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -334,12 +334,12 @@ impl InnerKey {
|
||||
/// Reset the `InnerKey` so that we can use it to seek next key that
|
||||
/// has different row key.
|
||||
fn reset_for_seek(&mut self) {
|
||||
// sequence, index_in_batch, value_type are ordered in desc order, so
|
||||
// sequence, index_in_batch, op_type are ordered in desc order, so
|
||||
// we can represent the last inner key with same row key by setting them
|
||||
// to zero (Minimum value).
|
||||
self.sequence = 0;
|
||||
self.index_in_batch = 0;
|
||||
self.value_type = ValueType::min_type();
|
||||
self.op_type = OpType::min_type();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ use datatypes::prelude::ScalarVector;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use datatypes::vectors::{Int64Vector, NullVector, VectorRef};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::storage::{ColumnDescriptor, SequenceNumber, ValueType};
|
||||
use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::memtable::{KeyValues, Memtable, MemtableSet};
|
||||
@@ -65,7 +65,7 @@ impl Inserter {
|
||||
// Reusable KeyValues buffer.
|
||||
let mut kvs = KeyValues {
|
||||
sequence: self.sequence,
|
||||
value_type: ValueType::Put,
|
||||
op_type: OpType::Put,
|
||||
start_index_in_batch: self.index_in_batch,
|
||||
keys: Vec::with_capacity(total_column_num),
|
||||
values: Vec::with_capacity(total_column_num),
|
||||
@@ -108,7 +108,7 @@ impl Inserter {
|
||||
let schema = memtable.schema();
|
||||
let num_rows = put_data.num_rows();
|
||||
|
||||
kvs.reset(ValueType::Put, self.index_in_batch);
|
||||
kvs.reset(OpType::Put, self.index_in_batch);
|
||||
|
||||
for key_col in schema.row_key_columns() {
|
||||
clone_put_data_column_to(put_data, &key_col.desc, &mut kvs.keys)?;
|
||||
|
||||
@@ -26,7 +26,7 @@ pub fn schema_for_test() -> RegionSchemaRef {
|
||||
|
||||
fn kvs_for_test_with_index(
|
||||
sequence: SequenceNumber,
|
||||
value_type: ValueType,
|
||||
op_type: OpType,
|
||||
start_index_in_batch: usize,
|
||||
keys: &[(i64, u64)],
|
||||
values: &[Option<u64>],
|
||||
@@ -54,7 +54,7 @@ fn kvs_for_test_with_index(
|
||||
|
||||
let kvs = KeyValues {
|
||||
sequence,
|
||||
value_type,
|
||||
op_type,
|
||||
start_index_in_batch,
|
||||
keys: row_keys,
|
||||
values: row_values,
|
||||
@@ -68,21 +68,21 @@ fn kvs_for_test_with_index(
|
||||
|
||||
fn kvs_for_test(
|
||||
sequence: SequenceNumber,
|
||||
value_type: ValueType,
|
||||
op_type: OpType,
|
||||
keys: &[(i64, u64)],
|
||||
values: &[Option<u64>],
|
||||
) -> KeyValues {
|
||||
kvs_for_test_with_index(sequence, value_type, 0, keys, values)
|
||||
kvs_for_test_with_index(sequence, op_type, 0, keys, values)
|
||||
}
|
||||
|
||||
pub fn write_kvs(
|
||||
memtable: &dyn Memtable,
|
||||
sequence: SequenceNumber,
|
||||
value_type: ValueType,
|
||||
op_type: OpType,
|
||||
keys: &[(i64, u64)],
|
||||
values: &[Option<u64>],
|
||||
) {
|
||||
let kvs = kvs_for_test(sequence, value_type, keys, values);
|
||||
let kvs = kvs_for_test(sequence, op_type, keys, values);
|
||||
|
||||
memtable.write(&kvs).unwrap();
|
||||
}
|
||||
@@ -93,7 +93,7 @@ fn check_batch_valid(batch: &Batch) {
|
||||
let row_num = batch.keys[0].len();
|
||||
assert_eq!(row_num, batch.keys[1].len());
|
||||
assert_eq!(row_num, batch.sequences.len());
|
||||
assert_eq!(row_num, batch.value_types.len());
|
||||
assert_eq!(row_num, batch.op_types.len());
|
||||
assert_eq!(row_num, batch.values[0].len());
|
||||
}
|
||||
|
||||
@@ -101,7 +101,7 @@ fn check_iter_content(
|
||||
iter: &mut dyn BatchIterator,
|
||||
keys: &[(i64, u64)],
|
||||
sequences: &[u64],
|
||||
value_types: &[ValueType],
|
||||
op_types: &[OpType],
|
||||
values: &[Option<u64>],
|
||||
) {
|
||||
let mut index = 0;
|
||||
@@ -113,13 +113,13 @@ fn check_iter_content(
|
||||
for i in 0..row_num {
|
||||
let (k0, k1) = (batch.keys[0].get(i), batch.keys[1].get(i));
|
||||
let sequence = batch.sequences.get_data(i).unwrap();
|
||||
let value_type = batch.value_types.get_data(i).unwrap();
|
||||
let op_type = batch.op_types.get_data(i).unwrap();
|
||||
let v = batch.values[0].get(i);
|
||||
|
||||
assert_eq!(Value::from(keys[index].0), k0);
|
||||
assert_eq!(Value::from(keys[index].1), k1);
|
||||
assert_eq!(sequences[index], sequence);
|
||||
assert_eq!(value_types[index].as_u8(), value_type);
|
||||
assert_eq!(op_types[index].as_u8(), op_type);
|
||||
assert_eq!(Value::from(values[index]), v);
|
||||
|
||||
index += 1;
|
||||
@@ -187,7 +187,7 @@ fn write_iter_memtable_case(ctx: &TestContext) {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[
|
||||
(1000, 1),
|
||||
(1000, 2),
|
||||
@@ -201,7 +201,7 @@ fn write_iter_memtable_case(ctx: &TestContext) {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
11, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1002, 1), (1003, 1), (1004, 1)], // keys
|
||||
&[None, Some(5), None], // values
|
||||
);
|
||||
@@ -233,16 +233,16 @@ fn write_iter_memtable_case(ctx: &TestContext) {
|
||||
], // keys
|
||||
&[10, 10, 10, 11, 11, 11, 10, 10, 10], // sequences
|
||||
&[
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
], // value types
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
], // op_types
|
||||
&[
|
||||
Some(1),
|
||||
Some(2),
|
||||
@@ -292,7 +292,7 @@ fn test_iter_batch_size() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[
|
||||
(1000, 1),
|
||||
(1000, 2),
|
||||
@@ -326,7 +326,7 @@ fn test_duplicate_key_across_batch() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
|
||||
&[Some(1), None, None, None], // values
|
||||
);
|
||||
@@ -334,7 +334,7 @@ fn test_duplicate_key_across_batch() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
11, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1000, 1), (2001, 2)], // keys
|
||||
&[Some(1231), Some(1232)], // values
|
||||
);
|
||||
@@ -351,12 +351,7 @@ fn test_duplicate_key_across_batch() {
|
||||
&mut *iter,
|
||||
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
|
||||
&[11, 10, 10, 11], // sequences
|
||||
&[
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
ValueType::Put,
|
||||
], // value types
|
||||
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put], // op_types
|
||||
&[Some(1231), None, None, Some(1232)], // values
|
||||
);
|
||||
}
|
||||
@@ -370,7 +365,7 @@ fn test_duplicate_key_in_batch() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1000, 1), (1000, 2), (1000, 1), (2001, 2)], // keys
|
||||
&[None, None, Some(1234), None], // values
|
||||
);
|
||||
@@ -385,10 +380,10 @@ fn test_duplicate_key_in_batch() {
|
||||
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
|
||||
check_iter_content(
|
||||
&mut *iter,
|
||||
&[(1000, 1), (1000, 2), (2001, 2)], // keys
|
||||
&[10, 10, 10], // sequences
|
||||
&[ValueType::Put, ValueType::Put, ValueType::Put], // value types
|
||||
&[Some(1234), None, None, None], // values
|
||||
&[(1000, 1), (1000, 2), (2001, 2)], // keys
|
||||
&[10, 10, 10], // sequences
|
||||
&[OpType::Put, OpType::Put, OpType::Put], // op_types
|
||||
&[Some(1234), None, None, None], // values
|
||||
);
|
||||
}
|
||||
});
|
||||
@@ -401,7 +396,7 @@ fn test_sequence_visibility() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1000, 1), (1000, 2)], // keys
|
||||
&[Some(1), Some(2)], // values
|
||||
);
|
||||
@@ -409,7 +404,7 @@ fn test_sequence_visibility() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
11, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1000, 1), (1000, 2)], // keys
|
||||
&[Some(11), Some(12)], // values
|
||||
);
|
||||
@@ -417,7 +412,7 @@ fn test_sequence_visibility() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
12, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1000, 1), (1000, 2)], // keys
|
||||
&[Some(21), Some(22)], // values
|
||||
);
|
||||
@@ -434,7 +429,7 @@ fn test_sequence_visibility() {
|
||||
&mut *iter,
|
||||
&[], // keys
|
||||
&[], // sequences
|
||||
&[], // value types
|
||||
&[], // op_types
|
||||
&[], // values
|
||||
);
|
||||
}
|
||||
@@ -449,10 +444,10 @@ fn test_sequence_visibility() {
|
||||
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
|
||||
check_iter_content(
|
||||
&mut *iter,
|
||||
&[(1000, 1), (1000, 2)], // keys
|
||||
&[10, 10], // sequences
|
||||
&[ValueType::Put, ValueType::Put], // value types
|
||||
&[Some(1), Some(2)], // values
|
||||
&[(1000, 1), (1000, 2)], // keys
|
||||
&[10, 10], // sequences
|
||||
&[OpType::Put, OpType::Put], // op_types
|
||||
&[Some(1), Some(2)], // values
|
||||
);
|
||||
}
|
||||
|
||||
@@ -466,10 +461,10 @@ fn test_sequence_visibility() {
|
||||
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
|
||||
check_iter_content(
|
||||
&mut *iter,
|
||||
&[(1000, 1), (1000, 2)], // keys
|
||||
&[11, 11], // sequences
|
||||
&[ValueType::Put, ValueType::Put], // value types
|
||||
&[Some(11), Some(12)], // values
|
||||
&[(1000, 1), (1000, 2)], // keys
|
||||
&[11, 11], // sequences
|
||||
&[OpType::Put, OpType::Put], // op_types
|
||||
&[Some(11), Some(12)], // values
|
||||
);
|
||||
}
|
||||
});
|
||||
@@ -482,7 +477,7 @@ fn test_iter_after_none() {
|
||||
write_kvs(
|
||||
&*ctx.memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[(1000, 0), (1001, 1), (1002, 2)], // keys
|
||||
&[Some(0), Some(1), Some(2)], // values
|
||||
);
|
||||
|
||||
@@ -227,7 +227,7 @@ impl MemtableSet {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use store_api::storage::ValueType;
|
||||
use store_api::storage::OpType;
|
||||
|
||||
use super::*;
|
||||
use crate::memtable::tests;
|
||||
@@ -258,7 +258,7 @@ mod tests {
|
||||
tests::write_kvs(
|
||||
&*memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[
|
||||
(1000, 1),
|
||||
(1000, 2),
|
||||
|
||||
@@ -500,8 +500,8 @@ fn internal_column_descs() -> [ColumnDescriptor; 2] {
|
||||
.build()
|
||||
.unwrap(),
|
||||
ColumnDescriptorBuilder::new(
|
||||
ReservedColumnId::value_type(),
|
||||
consts::VALUE_TYPE_COLUMN_NAME.to_string(),
|
||||
ReservedColumnId::op_type(),
|
||||
consts::OP_TYPE_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
)
|
||||
.is_nullable(false)
|
||||
@@ -515,7 +515,7 @@ fn internal_column_descs() -> [ColumnDescriptor; 2] {
|
||||
fn is_internal_value_column(column_name: &str) -> bool {
|
||||
matches!(
|
||||
column_name,
|
||||
consts::SEQUENCE_COLUMN_NAME | consts::VALUE_TYPE_COLUMN_NAME
|
||||
consts::SEQUENCE_COLUMN_NAME | consts::OP_TYPE_COLUMN_NAME
|
||||
)
|
||||
}
|
||||
|
||||
@@ -589,7 +589,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_build_metadata_internal_name() {
|
||||
let names = [consts::SEQUENCE_COLUMN_NAME, consts::VALUE_TYPE_COLUMN_NAME];
|
||||
let names = [consts::SEQUENCE_COLUMN_NAME, consts::OP_TYPE_COLUMN_NAME];
|
||||
for name in names {
|
||||
let cf = ColumnFamilyDescriptorBuilder::default()
|
||||
.push_column(
|
||||
|
||||
@@ -5,13 +5,13 @@ use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
// TODO(yingwen): Maybe pack value_type with sequence (reserve 8bits in u64 for value type) like RocksDB.
|
||||
// TODO(yingwen): Maybe pack op_type with sequence (reserve 8bits in u64 for op_type) like RocksDB.
|
||||
/// Storage internal representation of a batch of rows.
|
||||
pub struct Batch {
|
||||
// Now the structure of `Batch` is still unstable, all pub fields may be changed.
|
||||
pub keys: Vec<VectorRef>,
|
||||
pub sequences: UInt64Vector,
|
||||
pub value_types: UInt8Vector,
|
||||
pub op_types: UInt8Vector,
|
||||
pub values: Vec<VectorRef>,
|
||||
}
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||
/// special usage. Reserved columns expect the version columns are also
|
||||
/// called internal columns (though the version could also be thought as a
|
||||
/// special kind of internal column), are not visible to user, such as our
|
||||
/// internal sequence, value_type columns.
|
||||
/// internal sequence, op_type columns.
|
||||
///
|
||||
/// The user schema is the schema that only contains columns that user could visit,
|
||||
/// as well as what the schema user created.
|
||||
@@ -183,7 +183,7 @@ impl SstSchema {
|
||||
schema.column_schemas()[user_column_end].name
|
||||
);
|
||||
assert_eq!(
|
||||
consts::VALUE_TYPE_COLUMN_NAME,
|
||||
consts::OP_TYPE_COLUMN_NAME,
|
||||
schema.column_schemas()[user_column_end + 1].name
|
||||
);
|
||||
|
||||
@@ -212,7 +212,7 @@ impl SstSchema {
|
||||
pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> Chunk<Arc<dyn Array>> {
|
||||
assert_eq!(
|
||||
self.schema.num_columns(),
|
||||
// key columns + value columns + sequence + value_type
|
||||
// key columns + value columns + sequence + op_type
|
||||
batch.keys.len() + batch.values.len() + 2
|
||||
);
|
||||
|
||||
@@ -223,7 +223,7 @@ impl SstSchema {
|
||||
.map(|v| v.to_arrow_array())
|
||||
.chain(batch.values.iter().map(|v| v.to_arrow_array()))
|
||||
.chain(std::iter::once(batch.sequences.to_arrow_array()))
|
||||
.chain(std::iter::once(batch.value_types.to_arrow_array()))
|
||||
.chain(std::iter::once(batch.op_types.to_arrow_array()))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
@@ -241,12 +241,10 @@ impl SstSchema {
|
||||
.context(ConvertChunkSnafu {
|
||||
name: consts::SEQUENCE_COLUMN_NAME,
|
||||
})?;
|
||||
let value_types = UInt8Vector::try_from_arrow_array(
|
||||
&chunk[self.value_type_index()].clone(),
|
||||
)
|
||||
.context(ConvertChunkSnafu {
|
||||
name: consts::VALUE_TYPE_COLUMN_NAME,
|
||||
})?;
|
||||
let op_types = UInt8Vector::try_from_arrow_array(&chunk[self.op_type_index()].clone())
|
||||
.context(ConvertChunkSnafu {
|
||||
name: consts::OP_TYPE_COLUMN_NAME,
|
||||
})?;
|
||||
let values = self
|
||||
.value_indices()
|
||||
.map(|i| {
|
||||
@@ -259,7 +257,7 @@ impl SstSchema {
|
||||
Ok(Batch {
|
||||
keys,
|
||||
sequences,
|
||||
value_types,
|
||||
op_types,
|
||||
values,
|
||||
})
|
||||
}
|
||||
@@ -270,7 +268,7 @@ impl SstSchema {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn value_type_index(&self) -> usize {
|
||||
fn op_type_index(&self) -> usize {
|
||||
self.user_column_end + 1
|
||||
}
|
||||
|
||||
@@ -299,13 +297,13 @@ impl TryFrom<ArrowSchema> for SstSchema {
|
||||
let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?;
|
||||
let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?;
|
||||
|
||||
// There should be sequence and value type columns.
|
||||
// There should be sequence and op_type columns.
|
||||
ensure!(
|
||||
consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name,
|
||||
InvalidIndexSnafu
|
||||
);
|
||||
ensure!(
|
||||
consts::VALUE_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name,
|
||||
consts::OP_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name,
|
||||
InvalidIndexSnafu
|
||||
);
|
||||
|
||||
@@ -354,7 +352,7 @@ mod tests {
|
||||
keys: vec![Arc::new(k1), Arc::new(timestamp)],
|
||||
values: vec![Arc::new(v1)],
|
||||
sequences: UInt64Vector::from_slice(&[100, 100, 100]),
|
||||
value_types: UInt8Vector::from_slice(&[0, 0, 0]),
|
||||
op_types: UInt8Vector::from_slice(&[0, 0, 0]),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -367,7 +365,7 @@ mod tests {
|
||||
}
|
||||
assert_eq!(chunk[2], batch.values[0].to_arrow_array());
|
||||
assert_eq!(chunk[3], batch.sequences.to_arrow_array());
|
||||
assert_eq!(chunk[4], batch.value_types.to_arrow_array());
|
||||
assert_eq!(chunk[4], batch.op_types.to_arrow_array());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -422,7 +420,7 @@ mod tests {
|
||||
("timestamp", LogicalTypeId::Int64, false),
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
(consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
(consts::VALUE_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false),
|
||||
(consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false),
|
||||
],
|
||||
Some(1),
|
||||
);
|
||||
@@ -431,7 +429,7 @@ mod tests {
|
||||
sst_schema.schema().column_schemas()
|
||||
);
|
||||
assert_eq!(3, sst_schema.sequence_index());
|
||||
assert_eq!(4, sst_schema.value_type_index());
|
||||
assert_eq!(4, sst_schema.op_type_index());
|
||||
let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect();
|
||||
assert_eq!([0, 1], &row_key_indices[..]);
|
||||
let value_indices: Vec<_> = sst_schema.value_indices().collect();
|
||||
|
||||
@@ -260,7 +260,7 @@ mod tests {
|
||||
use datatypes::arrow::array::{Array, Int64Array, UInt64Array, UInt8Array};
|
||||
use datatypes::arrow::io::parquet::read::FileReader;
|
||||
use object_store::backend::fs::Backend;
|
||||
use store_api::storage::ValueType;
|
||||
use store_api::storage::OpType;
|
||||
use tempdir::TempDir;
|
||||
|
||||
use super::*;
|
||||
@@ -275,7 +275,7 @@ mod tests {
|
||||
memtable_tests::write_kvs(
|
||||
&*memtable,
|
||||
10, // sequence
|
||||
ValueType::Put,
|
||||
OpType::Put,
|
||||
&[
|
||||
(1000, 1),
|
||||
(1000, 2),
|
||||
@@ -305,7 +305,7 @@ mod tests {
|
||||
let reader = std::fs::File::open(dir.path().join(sst_file_name)).unwrap();
|
||||
let mut file_reader = FileReader::try_new(reader, None, Some(128), None, None).unwrap();
|
||||
|
||||
// chunk schema: timestamp, __version, v1, __sequence, __value_type
|
||||
// chunk schema: timestamp, __version, v1, __sequence, __op_type
|
||||
let chunk = file_reader.next().unwrap().unwrap();
|
||||
assert_eq!(5, chunk.arrays().len());
|
||||
|
||||
@@ -335,7 +335,7 @@ mod tests {
|
||||
chunk.arrays()[3]
|
||||
);
|
||||
|
||||
// value type
|
||||
// op_type
|
||||
assert_eq!(
|
||||
Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc<dyn Array>,
|
||||
chunk.arrays()[4]
|
||||
|
||||
@@ -7,17 +7,17 @@ use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector};
|
||||
use crate::error::Result;
|
||||
use crate::read::{Batch, BatchReader, BoxedBatchReader};
|
||||
|
||||
/// Build a new batch, with 0 sequence and value type.
|
||||
/// Build a new batch, with 0 sequence and op_type.
|
||||
fn new_kv_batch(key_values: &[(i64, Option<i64>)]) -> Batch {
|
||||
let key = Arc::new(Int64Vector::from_values(key_values.iter().map(|v| v.0)));
|
||||
let value = Arc::new(Int64Vector::from_iter(key_values.iter().map(|v| v.1)));
|
||||
let sequences = UInt64Vector::from_vec(vec![0; key_values.len()]);
|
||||
let value_types = UInt8Vector::from_vec(vec![0; key_values.len()]);
|
||||
let op_types = UInt8Vector::from_vec(vec![0; key_values.len()]);
|
||||
|
||||
Batch {
|
||||
keys: vec![key],
|
||||
sequences,
|
||||
value_types,
|
||||
op_types,
|
||||
values: vec![value],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,4 +22,4 @@ pub use self::region::{Region, WriteContext};
|
||||
pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest};
|
||||
pub use self::responses::{GetResponse, ScanResponse, WriteResponse};
|
||||
pub use self::snapshot::{ReadContext, Snapshot};
|
||||
pub use self::types::{SequenceNumber, ValueType};
|
||||
pub use self::types::{OpType, SequenceNumber};
|
||||
|
||||
@@ -22,7 +22,7 @@ pub const DEFAULT_CF_ID: ColumnFamilyId = 1;
|
||||
enum ReservedColumnType {
|
||||
Version = 0,
|
||||
Sequence,
|
||||
ValueType,
|
||||
OpType,
|
||||
}
|
||||
|
||||
/// Column id reserved by the engine.
|
||||
@@ -48,9 +48,9 @@ impl ReservedColumnId {
|
||||
Self::BASE | ReservedColumnType::Sequence as ColumnId
|
||||
}
|
||||
|
||||
/// Id for `__value_type` column.
|
||||
pub const fn value_type() -> ColumnId {
|
||||
Self::BASE | ReservedColumnType::ValueType as ColumnId
|
||||
/// Id for `__op_type` column.
|
||||
pub const fn op_type() -> ColumnId {
|
||||
Self::BASE | ReservedColumnType::OpType as ColumnId
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,9 +70,8 @@ pub const SEQUENCE_COLUMN_NAME: &str = "__sequence";
|
||||
/// Name for time index constraint name.
|
||||
pub const TIME_INDEX_NAME: &str = "__time_index";
|
||||
|
||||
// TODO(yingwen): `__op_type` might be proper than `__value_type`.
|
||||
/// Name for reserved column: value_type
|
||||
pub const VALUE_TYPE_COLUMN_NAME: &str = "__value_type";
|
||||
/// Name for reserved column: op_type
|
||||
pub const OP_TYPE_COLUMN_NAME: &str = "__op_type";
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
@@ -90,6 +89,6 @@ mod tests {
|
||||
fn test_reserved_id() {
|
||||
assert_eq!(0x80000000, ReservedColumnId::version());
|
||||
assert_eq!(0x80000001, ReservedColumnId::sequence());
|
||||
assert_eq!(0x80000002, ReservedColumnId::value_type());
|
||||
assert_eq!(0x80000002, ReservedColumnId::op_type());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,19 +6,19 @@ pub type SequenceNumber = u64;
|
||||
|
||||
/// Operation type of the value to write to storage.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum ValueType {
|
||||
pub enum OpType {
|
||||
/// Put operation.
|
||||
Put,
|
||||
}
|
||||
|
||||
impl ValueType {
|
||||
impl OpType {
|
||||
pub fn as_u8(&self) -> u8 {
|
||||
*self as u8
|
||||
}
|
||||
|
||||
/// Minimum value type after casting to u8.
|
||||
pub const fn min_type() -> ValueType {
|
||||
ValueType::Put
|
||||
/// Minimal op type after casting to u8.
|
||||
pub const fn min_type() -> OpType {
|
||||
OpType::Put
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,8 +27,8 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_value_type() {
|
||||
assert_eq!(0, ValueType::Put.as_u8());
|
||||
assert_eq!(0, ValueType::min_type().as_u8());
|
||||
fn test_op_type() {
|
||||
assert_eq!(0, OpType::Put.as_u8());
|
||||
assert_eq!(0, OpType::min_type().as_u8());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user