refactor: remove version column (#1576)

This commit is contained in:
Lei, HUANG
2023-05-15 11:03:37 +08:00
committed by GitHub
parent 66903d42e1
commit cfcfc72681
24 changed files with 97 additions and 305 deletions

View File

@@ -27,7 +27,6 @@ pub const TIMESTAMP_NAME: &str = "timestamp";
pub fn schema_for_test() -> RegionSchemaRef {
let desc = RegionDescBuilder::new("bench")
.enable_version_column(true)
.push_field_column(("v1", LogicalTypeId::UInt64, true))
.push_field_column(("v2", LogicalTypeId::String, true))
.build();

View File

@@ -49,11 +49,6 @@ impl RegionDescBuilder {
}
}
pub fn enable_version_column(mut self, enable: bool) -> Self {
self.key_builder = self.key_builder.enable_version_column(enable);
self
}
pub fn push_field_column(mut self, column_def: ColumnDef) -> Self {
let column = self.new_column(column_def);
self.default_cf_builder = self.default_cf_builder.push_column(column);

View File

@@ -25,13 +25,12 @@ use datatypes::vectors::{
use rand::Rng;
use storage::proto;
use storage::write_batch::WriteBatch;
use store_api::storage::{consts, WriteRequest};
use store_api::storage::WriteRequest;
pub fn new_test_batch() -> WriteBatch {
write_batch_util::new_write_batch(
&[
("k1", LogicalTypeId::UInt64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("ts", LogicalTypeId::TimestampMillisecond, false),
("v1", LogicalTypeId::Boolean, true),
("4", LogicalTypeId::Float64, false),
@@ -78,7 +77,6 @@ pub fn gen_new_batch_and_types(putdate_nums: usize) -> (WriteBatch, Vec<i32>) {
let svs = Arc::new(StringVector::from_slice(&svs)) as VectorRef;
let mut put_data = HashMap::with_capacity(11);
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv.clone());
put_data.insert("4".to_string(), fvs.clone());

View File

@@ -110,7 +110,6 @@ mod tests {
fn schema_for_test() -> RegionSchemaRef {
// Just build a region desc and use its columns metadata.
let desc = RegionDescBuilder::new("test")
.enable_version_column(false)
.push_field_column(("v", LogicalTypeId::UInt64, true))
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();

View File

@@ -139,7 +139,7 @@ mod tests {
&*memtable,
10,
OpType::Put,
&[(1, 1), (2, 2)],
&[1, 2],
&[(Some(1), Some(1)), (Some(2), Some(2))],
);

View File

@@ -48,7 +48,6 @@ pub struct RawColumnsMetadata {
pub columns: Vec<ColumnMetadata>,
pub row_key_end: usize,
pub timestamp_key_index: usize,
pub enable_version_column: bool,
pub user_column_end: usize,
}
@@ -365,7 +364,6 @@ mod tests {
#[test]
fn test_region_manifest_builder() {
let desc = RegionDescBuilder::new("test_region_manifest_builder")
.enable_version_column(true)
.push_field_column(("v0", LogicalTypeId::Int64, true))
.build();
let region_metadata: RegionMetadata = desc.try_into().unwrap();

View File

@@ -355,7 +355,9 @@ impl<'a> Iterator for IterRow<'a> {
#[derive(Clone, Debug, PartialEq, Eq)]
struct InnerKey {
/// User defined primary keys
row_key: Vec<Value>,
/// Sequence number of row
sequence: SequenceNumber,
index_in_batch: usize,
op_type: OpType,

View File

@@ -151,7 +151,6 @@ mod tests {
let desc = RegionDescBuilder::new("test")
.timestamp(("ts", LogicalTypeId::TimestampMillisecond, false))
.push_field_column(("value", LogicalTypeId::Int64, true))
.enable_version_column(false)
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();

View File

@@ -31,7 +31,6 @@ use crate::test_util::descriptor_util::RegionDescBuilder;
pub fn schema_for_test() -> RegionSchemaRef {
// Just build a region desc and use its columns metadata.
let desc = RegionDescBuilder::new("test")
.enable_version_column(true)
.push_field_column(("v0", LogicalTypeId::UInt64, true))
.push_field_column(("v1", LogicalTypeId::UInt64, true))
.build();
@@ -44,23 +43,16 @@ fn kvs_for_test_with_index(
sequence: SequenceNumber,
op_type: OpType,
start_index_in_batch: usize,
keys: &[(TimestampMillisecond, u64)],
keys: &[TimestampMillisecond],
values: &[(Option<u64>, Option<u64>)],
) -> KeyValues {
assert_eq!(keys.len(), values.len());
let mut key_builders = (
TimestampMillisecondVectorBuilder::with_capacity(keys.len()),
UInt64VectorBuilder::with_capacity(keys.len()),
);
let mut key_builders = TimestampMillisecondVectorBuilder::with_capacity(keys.len());
for key in keys {
key_builders.0.push(Some(key.0));
key_builders.1.push(Some(key.1));
key_builders.push(Some(*key));
}
let row_keys = vec![
Arc::new(key_builders.0.finish()) as _,
Arc::new(key_builders.1.finish()) as _,
];
let row_keys = vec![Arc::new(key_builders.finish()) as _];
let mut value_builders = (
UInt64VectorBuilder::with_capacity(values.len()),
@@ -92,7 +84,7 @@ fn kvs_for_test_with_index(
fn kvs_for_test(
sequence: SequenceNumber,
op_type: OpType,
keys: &[(TimestampMillisecond, u64)],
keys: &[TimestampMillisecond],
values: &[(Option<u64>, Option<u64>)],
) -> KeyValues {
kvs_for_test_with_index(sequence, op_type, 0, keys, values)
@@ -102,11 +94,10 @@ pub fn write_kvs(
memtable: &dyn Memtable,
sequence: SequenceNumber,
op_type: OpType,
keys: &[(i64, u64)],
keys: &[i64],
values: &[(Option<u64>, Option<u64>)],
) {
let keys: Vec<(TimestampMillisecond, u64)> =
keys.iter().map(|(l, r)| ((*l).into(), *r)).collect();
let keys: Vec<TimestampMillisecond> = keys.iter().map(|l| ((*l).into())).collect();
let kvs = kvs_for_test(sequence, op_type, &keys, values);
@@ -114,22 +105,21 @@ pub fn write_kvs(
}
fn check_batch_valid(batch: &Batch) {
assert_eq!(6, batch.num_columns());
assert_eq!(5, batch.num_columns());
let row_num = batch.column(0).len();
for i in 1..6 {
for i in 1..5 {
assert_eq!(row_num, batch.column(i).len());
}
}
fn check_iter_content(
iter: &mut dyn BatchIterator,
keys: &[(i64, u64)],
keys: &[i64],
sequences: &[u64],
op_types: &[OpType],
values: &[(Option<u64>, Option<u64>)],
) {
let keys: Vec<(TimestampMillisecond, u64)> =
keys.iter().map(|(l, r)| ((*l).into(), *r)).collect();
let keys: Vec<TimestampMillisecond> = keys.iter().map(|l| (*l).into()).collect();
let mut index = 0;
for batch in iter {
@@ -138,13 +128,12 @@ fn check_iter_content(
let row_num = batch.column(0).len();
for i in 0..row_num {
let (k0, k1) = (batch.column(0).get(i), batch.column(1).get(i));
let (v0, v1) = (batch.column(2).get(i), batch.column(3).get(i));
let sequence = batch.column(4).get(i);
let op_type = batch.column(5).get(i);
let k0 = batch.column(0).get(i);
let (v0, v1) = (batch.column(1).get(i), batch.column(2).get(i));
let sequence = batch.column(3).get(i);
let op_type = batch.column(4).get(i);
assert_eq!(Value::from(keys[index].0), k0);
assert_eq!(Value::from(keys[index].1), k1);
assert_eq!(Value::from(keys[index]), k0);
assert_eq!(Value::from(values[index].0), v0);
assert_eq!(Value::from(values[index].1), v1);
assert_eq!(Value::from(sequences[index]), sequence);
@@ -216,14 +205,7 @@ fn write_iter_memtable_case(ctx: &TestContext) {
&*ctx.memtable,
10, // sequence
OpType::Put,
&[
(1000, 1),
(1000, 2),
(2002, 1),
(2003, 1),
(2003, 5),
(1001, 1),
], // keys
&[1000, 1000, 2002, 2003, 2003, 1001], // keys
&[
(Some(1), None),
(Some(2), None),
@@ -237,12 +219,12 @@ fn write_iter_memtable_case(ctx: &TestContext) {
&*ctx.memtable,
11, // sequence
OpType::Put,
&[(1002, 1), (1003, 1), (1004, 1)], // keys
&[1002, 1003, 1004], // keys
&[(None, None), (Some(5), None), (None, None)], // values
);
// 9 key value pairs (6 + 3).
assert_eq!(704, ctx.memtable.bytes_allocated());
assert_eq!(576, ctx.memtable.bytes_allocated());
let batch_sizes = [1, 4, 8, consts::READ_BATCH_SIZE];
for batch_size in batch_sizes {
@@ -259,21 +241,9 @@ fn write_iter_memtable_case(ctx: &TestContext) {
check_iter_content(
&mut *iter,
&[1000, 1001, 1002, 1003, 1004, 2002, 2003], // keys
&[10, 10, 11, 11, 11, 10, 10], // sequences
&[
(1000, 1),
(1000, 2),
(1001, 1),
(1002, 1),
(1003, 1),
(1004, 1),
(2002, 1),
(2003, 1),
(2003, 5),
], // keys
&[10, 10, 10, 11, 11, 11, 10, 10, 10], // sequences
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
@@ -283,14 +253,12 @@ fn write_iter_memtable_case(ctx: &TestContext) {
OpType::Put,
], // op_types
&[
(Some(1), None),
(Some(2), None),
(Some(3), None),
(None, None),
(Some(5), None),
(None, None),
(Some(7), None),
(Some(8), None),
(Some(9), None),
], // values
);
@@ -332,14 +300,7 @@ fn test_iter_batch_size() {
&*ctx.memtable,
10, // sequence
OpType::Put,
&[
(1000, 1),
(1000, 2),
(1001, 1),
(2002, 1),
(2003, 1),
(2003, 5),
], // keys
&[1000, 1000, 1001, 2002, 2003, 2003], // keys
&[
(Some(1), None),
(Some(2), None),
@@ -350,7 +311,7 @@ fn test_iter_batch_size() {
], // values
);
let total = 6;
let total = 4;
// Batch size [less than, equal to, greater than] total
let batch_sizes = [1, 6, 8];
for batch_size in batch_sizes {
@@ -373,7 +334,7 @@ fn test_duplicate_key_across_batch() {
&*ctx.memtable,
10, // sequence
OpType::Put,
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
&[1000, 1001, 2000, 2001], // keys
&[(Some(1), None), (None, None), (None, None), (None, None)], // values
);
@@ -381,7 +342,7 @@ fn test_duplicate_key_across_batch() {
&*ctx.memtable,
11, // sequence
OpType::Put,
&[(1000, 1), (2001, 2)], // keys
&[1000, 2001], // keys
&[(Some(1231), None), (Some(1232), None)], // values
);
@@ -395,8 +356,8 @@ fn test_duplicate_key_across_batch() {
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
&[11, 10, 10, 11], // sequences
&[1000, 1001, 2000, 2001], // keys
&[11, 10, 10, 11], // sequences
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put], // op_types
&[
(Some(1231), None),
@@ -417,7 +378,7 @@ fn test_duplicate_key_in_batch() {
&*ctx.memtable,
10, // sequence
OpType::Put,
&[(1000, 1), (1000, 2), (1000, 1), (2001, 2)], // keys
&[1000, 1000, 1001, 2001], // keys
&[(None, None), (None, None), (Some(1234), None), (None, None)], // values
);
@@ -431,10 +392,10 @@ fn test_duplicate_key_in_batch() {
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2), (2001, 2)], // keys
&[10, 10, 10], // sequences
&[OpType::Put, OpType::Put, OpType::Put], // op_types
&[(Some(1234), None), (None, None), (None, None), (None, None)], // values
&[1000, 1001, 2001], // keys
&[10, 10, 10], // sequences
&[OpType::Put, OpType::Put, OpType::Put], // op_types
&[(None, None), (Some(1234), None), (None, None)], // values
);
}
});
@@ -448,7 +409,7 @@ fn test_sequence_visibility() {
&*ctx.memtable,
10, // sequence
OpType::Put,
&[(1000, 1), (1000, 2)], // keys
&[1000, 1000], // keys
&[(Some(1), None), (Some(2), None)], // values
);
@@ -456,7 +417,7 @@ fn test_sequence_visibility() {
&*ctx.memtable,
11, // sequence
OpType::Put,
&[(1000, 1), (1000, 2)], // keys
&[1000, 1000], // keys
&[(Some(11), None), (Some(12), None)], // values
);
@@ -464,7 +425,7 @@ fn test_sequence_visibility() {
&*ctx.memtable,
12, // sequence
OpType::Put,
&[(1000, 1), (1000, 2)], // keys
&[1000, 1000], // keys
&[(Some(21), None), (Some(22), None)], // values
);
@@ -497,10 +458,10 @@ fn test_sequence_visibility() {
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2)], // keys
&[10, 10], // sequences
&[OpType::Put, OpType::Put], // op_types
&[(Some(1), None), (Some(2), None)], // values
&[1000], // keys
&[10], // sequences
&[OpType::Put, OpType::Put], // op_types
&[(Some(2), None)], // values
);
}
@@ -515,10 +476,10 @@ fn test_sequence_visibility() {
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2)], // keys
&[11, 11], // sequences
&[OpType::Put, OpType::Put], // op_types
&[(Some(11), None), (Some(12), None)], // values
&[1000], // keys
&[11], // sequences
&[OpType::Put, OpType::Put], // op_types
&[(Some(12), None)], // values
);
}
});
@@ -532,7 +493,7 @@ fn test_iter_after_none() {
&*ctx.memtable,
10, // sequence
OpType::Put,
&[(1000, 0), (1001, 1), (1002, 2)], // keys
&[1000, 1001, 1002], // keys
&[(Some(0), None), (Some(1), None), (Some(2), None)], // values
);
@@ -560,7 +521,7 @@ fn test_memtable_projection() {
&*ctx.memtable,
9, // sequence
OpType::Put,
&[(1000, 0), (1001, 1), (1002, 2)], // keys
&[1000, 1001, 1002], // keys
&[
(Some(10), Some(20)),
(Some(11), Some(21)),
@@ -578,17 +539,15 @@ fn test_memtable_projection() {
let batch = iter.next().unwrap().unwrap();
assert!(iter.next().is_none());
assert_eq!(5, batch.num_columns());
assert_eq!(4, batch.num_columns());
let k0 = Arc::new(TimestampMillisecondVector::from_slice([1000, 1001, 1002])) as VectorRef;
let k1 = Arc::new(UInt64Vector::from_slice([0, 1, 2])) as VectorRef;
let v0 = Arc::new(UInt64Vector::from_slice([10, 11, 12])) as VectorRef;
let v0 = Arc::new(UInt64Vector::from_slice([20, 21, 22])) as VectorRef;
let sequences = Arc::new(UInt64Vector::from_slice([9, 9, 9])) as VectorRef;
let op_types = Arc::new(UInt8Vector::from_slice([1, 1, 1])) as VectorRef;
assert_eq!(k0, *batch.column(0));
assert_eq!(k1, *batch.column(1));
assert_eq!(v0, *batch.column(2));
assert_eq!(sequences, *batch.column(3));
assert_eq!(op_types, *batch.column(4));
assert_eq!(v0, *batch.column(1));
assert_eq!(sequences, *batch.column(2));
assert_eq!(op_types, *batch.column(3));
});
}

View File

@@ -477,9 +477,6 @@ pub struct ColumnsMetadata {
row_key_end: usize,
/// Index of timestamp key column.
timestamp_key_index: usize,
/// If version column is enabled, then the last column of key columns is a
/// version column.
enable_version_column: bool,
/// Exclusive end index of user columns.
///
/// Columns in `[user_column_end..)` are internal columns.
@@ -540,8 +537,7 @@ impl ColumnsMetadata {
}
fn to_row_key_descriptor(&self) -> RowKeyDescriptor {
let mut builder =
RowKeyDescriptorBuilder::default().enable_version_column(self.enable_version_column);
let mut builder = RowKeyDescriptorBuilder::default();
for (idx, column) in self.iter_row_key_columns().enumerate() {
// Not a timestamp column.
if idx != self.timestamp_key_index {
@@ -562,7 +558,6 @@ impl From<&ColumnsMetadata> for RawColumnsMetadata {
columns: data.columns.clone(),
row_key_end: data.row_key_end,
timestamp_key_index: data.timestamp_key_index,
enable_version_column: data.enable_version_column,
user_column_end: data.user_column_end,
}
}
@@ -582,7 +577,6 @@ impl From<RawColumnsMetadata> for ColumnsMetadata {
name_to_col_index,
row_key_end: raw.row_key_end,
timestamp_key_index: raw.timestamp_key_index,
enable_version_column: raw.enable_version_column,
user_column_end: raw.user_column_end,
}
}
@@ -669,7 +663,6 @@ struct ColumnsMetadataBuilder {
// Row key metadata:
row_key_end: usize,
timestamp_key_index: Option<usize>,
enable_version_column: bool,
}
impl ColumnsMetadataBuilder {
@@ -681,15 +674,7 @@ impl ColumnsMetadataBuilder {
// TODO(yingwen): Validate this is a timestamp column.
self.timestamp_key_index = Some(self.columns.len());
self.push_row_key_column(key.timestamp)?;
if key.enable_version_column {
// TODO(yingwen): Validate that version column must be uint64 column.
let version_col = version_column_desc();
self.push_row_key_column(version_col)?;
}
self.row_key_end = self.columns.len();
self.enable_version_column = key.enable_version_column;
Ok(self)
}
@@ -751,7 +736,6 @@ impl ColumnsMetadataBuilder {
name_to_col_index: self.name_to_col_index,
row_key_end: self.row_key_end,
timestamp_key_index,
enable_version_column: self.enable_version_column,
user_column_end,
})
}
@@ -876,17 +860,6 @@ impl RegionMetadataBuilder {
}
}
fn version_column_desc() -> ColumnDescriptor {
ColumnDescriptorBuilder::new(
ReservedColumnId::version(),
consts::VERSION_COLUMN_NAME.to_string(),
ConcreteDataType::uint64_datatype(),
)
.is_nullable(false)
.build()
.unwrap()
}
fn internal_column_descs() -> [ColumnDescriptor; 2] {
[
ColumnDescriptorBuilder::new(
@@ -938,7 +911,6 @@ mod tests {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.timestamp(("ts", LogicalTypeId::TimestampMillisecond, false))
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_field_column(("v1", LogicalTypeId::Float32, true))
.build();
@@ -1046,7 +1018,7 @@ mod tests {
assert!(matches!(err, Error::ColIdExists { .. }));
}
fn new_metadata(enable_version_column: bool) -> RegionMetadata {
fn new_metadata() -> RegionMetadata {
let timestamp = ColumnDescriptorBuilder::new(
2,
"ts",
@@ -1063,7 +1035,6 @@ mod tests {
.build()
.unwrap(),
)
.enable_version_column(enable_version_column)
.build()
.unwrap();
let cf = ColumnFamilyDescriptorBuilder::default()
@@ -1087,7 +1058,7 @@ mod tests {
#[test]
fn test_build_metedata_disable_version() {
let metadata = new_metadata(false);
let metadata = new_metadata();
assert_eq!(TEST_REGION, metadata.name);
let expect_schema = schema_util::new_schema_ref(
@@ -1121,8 +1092,6 @@ mod tests {
assert_eq!(["v1"], &value_names[..]);
// Check timestamp index.
assert_eq!(1, metadata.columns.timestamp_key_index);
// Check version column.
assert!(!metadata.columns.enable_version_column);
assert!(metadata
.column_families
@@ -1132,53 +1101,9 @@ mod tests {
assert_eq!(0, metadata.version);
}
#[test]
fn test_build_metedata_enable_version() {
let metadata = new_metadata(true);
assert_eq!(TEST_REGION, metadata.name);
let expect_schema = schema_util::new_schema_ref(
&[
("k1", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::TimestampMillisecond, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Int64, true),
],
Some(1),
);
assert_eq!(expect_schema, *metadata.user_schema());
// 4 user columns and 2 internal columns.
assert_eq!(6, metadata.columns.columns.len());
// 3 row key columns
assert_eq!(3, metadata.columns.num_row_key_columns());
let row_key_names: Vec<_> = metadata
.columns
.iter_row_key_columns()
.map(|column| &column.desc.name)
.collect();
assert_eq!(
["k1", "ts", consts::VERSION_COLUMN_NAME],
&row_key_names[..]
);
// 1 value column
assert_eq!(1, metadata.columns.num_field_columns());
let value_names: Vec<_> = metadata
.columns
.iter_field_columns()
.map(|column| &column.desc.name)
.collect();
assert_eq!(["v1"], &value_names[..]);
// Check timestamp index.
assert_eq!(1, metadata.columns.timestamp_key_index);
// Check version column.
assert!(metadata.columns.enable_version_column);
}
#[test]
fn test_convert_between_raw() {
let metadata = new_metadata(true);
let metadata = new_metadata();
let raw = RawRegionMetadata::from(&metadata);
let converted = RegionMetadata::try_from(raw).unwrap();
@@ -1189,7 +1114,6 @@ mod tests {
fn test_alter_metadata_add_columns() {
let region_name = "region-0";
let builder = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_field_column(("v1", LogicalTypeId::Float32, true));
let last_column_id = builder.last_column_id();
@@ -1226,7 +1150,6 @@ mod tests {
let metadata = metadata.alter(&req).unwrap();
let builder: RegionMetadataBuilder = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_field_column(("v1", LogicalTypeId::Float32, true))
.push_key_column(("k2", LogicalTypeId::Int32, true))
@@ -1242,7 +1165,6 @@ mod tests {
fn test_alter_metadata_drop_columns() {
let region_name = "region-0";
let metadata: RegionMetadata = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_key_column(("k2", LogicalTypeId::Int32, false))
.push_field_column(("v1", LogicalTypeId::Float32, true))
@@ -1263,7 +1185,6 @@ mod tests {
let metadata = metadata.alter(&req).unwrap();
let builder = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_key_column(("k2", LogicalTypeId::Int32, false));
let last_column_id = builder.last_column_id() + 1;
@@ -1280,7 +1201,6 @@ mod tests {
#[test]
fn test_validate_alter_request() {
let builder = RegionDescBuilder::new("region-alter")
.enable_version_column(false)
.timestamp(("ts", LogicalTypeId::TimestampMillisecond, false))
.push_key_column(("k0", LogicalTypeId::Int32, false))
.push_field_column(("v0", LogicalTypeId::Float32, true))

View File

@@ -35,7 +35,7 @@ use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::manifest::MAX_VERSION;
use store_api::storage::{
consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
};
use super::*;
@@ -50,9 +50,8 @@ use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::{self, config_util, schema_util, write_batch_util};
/// Create metadata of a region with schema: (timestamp, v0).
pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMetadata {
pub fn new_metadata(region_name: &str) -> RegionMetadata {
let desc = RegionDescBuilder::new(region_name)
.enable_version_column(enable_version_column)
.push_field_column(("v0", LogicalTypeId::Int64, true))
.build();
desc.try_into().unwrap()
@@ -195,7 +194,6 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
LogicalTypeId::TimestampMillisecond,
false,
),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v0", LogicalTypeId::Int64, true),
],
Some(0),
@@ -267,7 +265,6 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<i64>)>) {
async fn test_new_region() {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.enable_version_column(true)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_field_column(("v0", LogicalTypeId::Float32, true))
.build();
@@ -294,7 +291,6 @@ async fn test_new_region() {
LogicalTypeId::TimestampMillisecond,
false,
),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v0", LogicalTypeId::Float32, true),
],
Some(1),

View File

@@ -36,7 +36,7 @@ const REGION_NAME: &str = "region-alter-0";
async fn create_region_for_alter(store_dir: &str) -> RegionImpl<RaftEngineLogStore> {
// Always disable version column in this test.
let metadata = tests::new_metadata(REGION_NAME, false);
let metadata = tests::new_metadata(REGION_NAME);
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;

View File

@@ -30,12 +30,9 @@ const REGION_NAME: &str = "region-basic-0";
async fn create_region_for_basic(
region_name: &str,
store_dir: &str,
enable_version_column: bool,
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(region_name, enable_version_column);
let metadata = tests::new_metadata(region_name);
let store_config = config_util::new_store_config(region_name, store_dir).await;
RegionImpl::create(metadata, store_config).await.unwrap()
}
@@ -48,7 +45,7 @@ struct Tester {
impl Tester {
async fn new(region_name: &str, store_dir: &str) -> Tester {
let region = create_region_for_basic(region_name, store_dir, false).await;
let region = create_region_for_basic(region_name, store_dir).await;
Tester {
region_name: region_name.to_string(),

View File

@@ -38,10 +38,9 @@ struct CloseTester {
/// Create a new region for flush test
async fn create_region_for_close(
store_dir: &str,
enable_version_column: bool,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(REGION_NAME, enable_version_column);
let metadata = tests::new_metadata(REGION_NAME);
let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
store_config.flush_strategy = flush_strategy;
@@ -51,7 +50,7 @@ async fn create_region_for_close(
impl CloseTester {
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> CloseTester {
let region = create_region_for_close(store_dir, false, flush_strategy.clone()).await;
let region = create_region_for_close(store_dir, flush_strategy.clone()).await;
CloseTester {
base: Some(FileTesterBase::with_region(region)),

View File

@@ -70,13 +70,12 @@ async fn create_region_for_compaction<
H: Handler<Request = FilePurgeRequest> + Send + Sync + 'static,
>(
store_dir: &str,
enable_version_column: bool,
engine_config: EngineConfig,
purge_handler: H,
flush_strategy: FlushStrategyRef,
s3_bucket: Option<String>,
) -> (RegionImpl<RaftEngineLogStore>, ObjectStore) {
let metadata = tests::new_metadata(REGION_NAME, enable_version_column);
let metadata = tests::new_metadata(REGION_NAME);
let object_store = new_object_store(store_dir, s3_bucket);
@@ -163,7 +162,6 @@ impl CompactionTester {
let purge_handler = MockFilePurgeHandler::default();
let (region, object_store) = create_region_for_compaction(
store_dir,
false,
engine_config.clone(),
purge_handler.clone(),
flush_strategy,

View File

@@ -32,10 +32,9 @@ const REGION_NAME: &str = "region-flush-0";
/// Create a new region for flush test
async fn create_region_for_flush(
store_dir: &str,
enable_version_column: bool,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(REGION_NAME, enable_version_column);
let metadata = tests::new_metadata(REGION_NAME);
let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
store_config.flush_strategy = flush_strategy;
@@ -52,7 +51,7 @@ struct FlushTester {
impl FlushTester {
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> FlushTester {
let region = create_region_for_flush(store_dir, false, flush_strategy.clone()).await;
let region = create_region_for_flush(store_dir, flush_strategy.clone()).await;
FlushTester {
base: Some(FileTesterBase::with_region(region)),

View File

@@ -534,7 +534,7 @@ mod tests {
use std::sync::Arc;
use common_test_util::temp_dir::create_temp_dir;
use datatypes::arrow::array::{Array, ArrayRef, UInt64Array, UInt8Array};
use datatypes::arrow::array::{Array, UInt64Array, UInt8Array};
use datatypes::prelude::{ScalarVector, Vector};
use datatypes::types::{TimestampMillisecondType, TimestampType};
use datatypes::vectors::TimestampMillisecondVector;
@@ -565,14 +565,7 @@ mod tests {
&*memtable,
10, // sequence
OpType::Put,
&[
(1000, 1),
(1000, 2),
(2002, 1),
(2003, 1),
(2003, 5),
(1001, 1),
], // keys
&[1000, 1002, 2002, 2003, 2003, 1001], // keys
&[
(Some(1), Some(1234)),
(Some(2), Some(1234)),
@@ -602,52 +595,45 @@ mod tests {
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let mut stream = builder.build().unwrap();
// chunk schema: timestamp, __version, v1, __sequence, __op_type
// chunk schema: timestamp, v1, __sequence, __op_type
let chunk = stream.next().await.unwrap().unwrap();
assert_eq!(6, chunk.columns().len());
assert_eq!(5, chunk.columns().len());
// timestamp
assert_eq!(
&TimestampMillisecondVector::from_slice([
1000.into(),
1000.into(),
1001.into(),
1002.into(),
2002.into(),
2003.into(),
2003.into()
])
.to_arrow_array(),
chunk.column(0)
);
// version
assert_eq!(
&(Arc::new(UInt64Array::from(vec![1, 2, 1, 1, 1, 5])) as ArrayRef),
chunk.column(1)
);
// v0
assert_eq!(
&(Arc::new(UInt64Array::from(vec![1, 2, 3, 7, 8, 9])) as Arc<dyn Array>),
chunk.column(2)
&(Arc::new(UInt64Array::from(vec![1, 3, 2, 7, 9])) as Arc<dyn Array>),
chunk.column(1)
);
// v1
assert_eq!(
&(Arc::new(UInt64Array::from(vec![1234; 6])) as Arc<dyn Array>),
chunk.column(3)
&(Arc::new(UInt64Array::from(vec![1234; 5])) as Arc<dyn Array>),
chunk.column(2)
);
// sequence
assert_eq!(
&(Arc::new(UInt64Array::from(vec![10; 6])) as Arc<dyn Array>),
chunk.column(4)
&(Arc::new(UInt64Array::from(vec![10; 5])) as Arc<dyn Array>),
chunk.column(3)
);
// op_type
assert_eq!(
&(Arc::new(UInt8Array::from(vec![1; 6])) as Arc<dyn Array>),
chunk.column(5)
&(Arc::new(UInt8Array::from(vec![1; 5])) as Arc<dyn Array>),
chunk.column(4)
);
}
@@ -662,7 +648,7 @@ mod tests {
let mut values_vec = Vec::with_capacity(rows_total);
for i in 0..rows_total {
keys_vec.push((i as i64, i as u64));
keys_vec.push(i as i64);
values_vec.push((Some(i as u64), Some(i as u64)));
}
@@ -748,14 +734,7 @@ mod tests {
&*memtable,
10, // sequence
OpType::Put,
&[
(1000, 1),
(1000, 2),
(2002, 1),
(2003, 1),
(2003, 5),
(1001, 1),
], // keys
&[1000, 1002, 2002, 2003, 2003, 1001], // keys
&[
(Some(1), Some(1234)),
(Some(2), Some(1234)),
@@ -806,7 +785,7 @@ mod tests {
let mut stream = reader.chunk_stream().await.unwrap();
assert_eq!(
6,
5,
stream
.next_batch()
.await
@@ -861,15 +840,7 @@ mod tests {
&*memtable,
10, // sequence
OpType::Put,
&[
(1000, 1),
(1000, 2),
(2002, 1),
(2003, 1),
(2003, 5),
(1001, 1),
(3001, 1),
], // keys
&[1000, 1002, 2002, 2003, 2003, 1001, 3001], // keys
&[
(Some(1), Some(1234)),
(Some(2), Some(1234)),
@@ -908,15 +879,14 @@ mod tests {
);
assert_ne!(file_size, 0);
let projected_schema =
Arc::new(ProjectedSchema::new(schema, Some(vec![1, 0, 3, 2])).unwrap());
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1, 0, 2])).unwrap());
check_range_read(
sst_file_handle.clone(),
object_store.clone(),
projected_schema.clone(),
TimestampRange::with_unit(1000, 2003, TimeUnit::Millisecond).unwrap(),
vec![1000, 1000, 1001, 2002],
vec![1000, 1001, 1002, 2002],
)
.await;
@@ -925,7 +895,7 @@ mod tests {
object_store.clone(),
projected_schema.clone(),
TimestampRange::with_unit(2002, 3001, TimeUnit::Millisecond).unwrap(),
vec![2002, 2003, 2003],
vec![2002, 2003],
)
.await;
@@ -945,7 +915,7 @@ mod tests {
object_store.clone(),
projected_schema.clone(),
TimestampRange::with_unit(1000, 3000, TimeUnit::Millisecond).unwrap(),
vec![1000, 1000, 1001, 2002, 2003, 2003],
vec![1000, 1001, 1002, 2002, 2003],
)
.await;
@@ -955,7 +925,7 @@ mod tests {
object_store,
projected_schema,
TimestampRange::min_to_max(),
vec![1000, 1000, 1001, 2002, 2003, 2003, 3001],
vec![1000, 1001, 1002, 2002, 2003, 3001],
)
.await;
}

View File

@@ -65,11 +65,6 @@ impl RegionDescBuilder {
self
}
pub fn enable_version_column(mut self, enable: bool) -> Self {
self.key_builder = self.key_builder.enable_version_column(enable);
self
}
pub fn push_key_column(mut self, column_def: ColumnDef) -> Self {
let column = self.new_column(column_def);
self.key_builder = self.key_builder.push_column(column);

View File

@@ -30,7 +30,6 @@ use crate::test_util::descriptor_util::RegionDescBuilder;
/// Create a new region schema (timestamp, v0).
fn new_region_schema() -> RegionSchemaRef {
let desc = RegionDescBuilder::new("read-util")
.enable_version_column(false)
.push_field_column(("v0", LogicalTypeId::Int64, true))
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();

View File

@@ -313,9 +313,7 @@ mod tests {
use crate::test_util::descriptor_util::RegionDescBuilder;
fn new_version_control() -> VersionControl {
let desc = RegionDescBuilder::new("version-test")
.enable_version_column(false)
.build();
let desc = RegionDescBuilder::new("version-test").build();
let metadata: RegionMetadataRef = Arc::new(desc.try_into().unwrap());
let memtable = DefaultMemtableBuilder::default().build(metadata.schema().clone());

View File

@@ -330,19 +330,17 @@ impl NameToVector {
#[cfg(test)]
pub(crate) fn new_test_batch() -> WriteBatch {
use datatypes::type_id::LogicalTypeId;
use store_api::storage::consts;
use crate::test_util::write_batch_util;
write_batch_util::new_write_batch(
&[
("k1", LogicalTypeId::UInt64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("ts", LogicalTypeId::TimestampMillisecond, false),
("v1", LogicalTypeId::Boolean, true),
],
Some(2),
3,
Some(1),
2,
)
}
@@ -357,7 +355,6 @@ mod tests {
use datatypes::vectors::{
BooleanVector, Int32Vector, Int64Vector, TimestampMillisecondVector, UInt64Vector,
};
use store_api::storage::consts;
use super::*;
use crate::test_util::write_batch_util;
@@ -368,11 +365,9 @@ mod tests {
assert!(columns.is_empty());
let vector1 = Arc::new(Int32Vector::from_slice([1, 2, 3, 4, 5])) as VectorRef;
let vector2 = Arc::new(UInt64Vector::from_slice([0, 2, 4, 6, 8])) as VectorRef;
let mut put_data = HashMap::with_capacity(3);
put_data.insert("k1".to_string(), vector1.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), vector2);
put_data.insert("v1".to_string(), vector1);
let columns = NameToVector::new(put_data).unwrap();
@@ -399,7 +394,6 @@ mod tests {
let mut put_data = HashMap::with_capacity(4);
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv);
@@ -442,7 +436,6 @@ mod tests {
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv.clone());
put_data.insert("ts".to_string(), tsv);
@@ -482,7 +475,7 @@ mod tests {
#[test]
fn test_put_missing_column() {
let boolv = Arc::new(BooleanVector::from(vec![true, false, true])) as VectorRef;
let tsv = Arc::new(Int64Vector::from_slice([0, 0, 0])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_slice([0, 0, 0])) as VectorRef;
let mut put_data = HashMap::new();
put_data.insert("v1".to_string(), boolv);
@@ -501,7 +494,6 @@ mod tests {
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv.clone());
put_data.insert("ts".to_string(), tsv);
put_data.insert("v2".to_string(), boolv);
@@ -532,7 +524,6 @@ mod tests {
let mut keys = HashMap::with_capacity(3);
keys.insert("k1".to_string(), intv.clone());
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
keys.insert("ts".to_string(), tsv);
let mut batch = new_test_batch();
@@ -540,7 +531,7 @@ mod tests {
let record_batch = &batch.payload().mutations[0].record_batch;
assert_eq!(3, record_batch.num_rows());
assert_eq!(4, record_batch.num_columns());
assert_eq!(3, record_batch.num_columns());
let v1 = record_batch.column_by_name("v1").unwrap();
assert!(v1.only_null());
}
@@ -551,7 +542,6 @@ mod tests {
let mut keys = HashMap::with_capacity(3);
keys.insert("k1".to_string(), intv.clone());
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
let mut batch = new_test_batch();
let err = batch.delete(keys).unwrap_err();
@@ -565,7 +555,6 @@ mod tests {
let mut keys = HashMap::with_capacity(3);
keys.insert("k1".to_string(), intv.clone());
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv.clone());
keys.insert("ts".to_string(), tsv);
keys.insert("v2".to_string(), intv);
@@ -581,7 +570,6 @@ mod tests {
let mut keys = HashMap::with_capacity(3);
keys.insert("k1".to_string(), intv.clone());
keys.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
keys.insert("ts".to_string(), boolv);
let mut batch = new_test_batch();

View File

@@ -133,7 +133,7 @@ mod tests {
use std::sync::Arc;
use datatypes::vectors::{BooleanVector, TimestampMillisecondVector, UInt64Vector, VectorRef};
use store_api::storage::{consts, WriteRequest};
use store_api::storage::WriteRequest;
use super::*;
use crate::write_batch::WriteBatch;
@@ -149,7 +149,6 @@ mod tests {
let mut put_data = HashMap::new();
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("v1".to_string(), boolv);
put_data.insert("ts".to_string(), tsv);
@@ -186,7 +185,6 @@ mod tests {
let mut put_data = HashMap::with_capacity(3);
put_data.insert("k1".to_string(), intv.clone());
put_data.insert(consts::VERSION_COLUMN_NAME.to_string(), intv);
put_data.insert("ts".to_string(), tsv);
batch.put(put_data).unwrap();

View File

@@ -72,9 +72,6 @@ impl ReservedColumnId {
// ---------- Names reserved for internal columns and engine -------------------
/// Name of version column.
pub const VERSION_COLUMN_NAME: &str = "__version";
/// Names for default column family.
pub const DEFAULT_CF_NAME: &str = "default";

View File

@@ -114,11 +114,6 @@ pub struct RowKeyDescriptor {
pub columns: Vec<ColumnDescriptor>,
/// Timestamp key column.
pub timestamp: ColumnDescriptor,
/// Enable version column in row key if this field is true.
///
/// The default value is false.
#[builder(default = "false")]
pub enable_version_column: bool,
}
/// A [ColumnFamilyDescriptor] contains information to create a column family.
@@ -263,7 +258,6 @@ mod tests {
.build()
.unwrap();
assert!(desc.columns.is_empty());
assert!(!desc.enable_version_column);
let desc = RowKeyDescriptorBuilder::new(timestamp.clone())
.columns_capacity(1)
@@ -280,14 +274,9 @@ mod tests {
.build()
.unwrap();
assert_eq!(2, desc.columns.len());
assert!(!desc.enable_version_column);
let desc = RowKeyDescriptorBuilder::new(timestamp)
.enable_version_column(false)
.build()
.unwrap();
let desc = RowKeyDescriptorBuilder::new(timestamp).build().unwrap();
assert!(desc.columns.is_empty());
assert!(!desc.enable_version_column);
}
#[test]