diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 8fcf58fdef..8b6bb96425 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -1,14 +1,12 @@ -use std::sync::Arc; - use datafusion_common::record_batch::RecordBatch as DfRecordBatch; -use datatypes::schema::Schema; +use datatypes::schema::SchemaRef; use datatypes::vectors::Helper; use serde::ser::{Error, SerializeStruct}; use serde::{Serialize, Serializer}; #[derive(Clone, Debug, PartialEq)] pub struct RecordBatch { - pub schema: Arc, + pub schema: SchemaRef, pub df_recordbatch: DfRecordBatch, } @@ -35,10 +33,13 @@ impl Serialize for RecordBatch { #[cfg(test)] mod tests { + use std::sync::Arc; + use arrow::array::UInt32Array; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion_common::field_util::SchemaExt; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; + use datatypes::schema::Schema; use super::*; diff --git a/src/datatypes/src/vectors/builder.rs b/src/datatypes/src/vectors/builder.rs index fc2c2bf87f..ce8318e6b1 100644 --- a/src/datatypes/src/vectors/builder.rs +++ b/src/datatypes/src/vectors/builder.rs @@ -5,8 +5,8 @@ use crate::scalars::ScalarVectorBuilder; use crate::value::Value; use crate::vectors::{ BinaryVectorBuilder, BooleanVectorBuilder, Float32VectorBuilder, Float64VectorBuilder, - Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, NullVector, - StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, + Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, MutableVector, + NullVector, StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, UInt8VectorBuilder, VectorRef, }; @@ -81,13 +81,32 @@ impl VectorBuilder { } } + pub fn data_type(&self) -> ConcreteDataType { + match self { + VectorBuilder::Null(_) => ConcreteDataType::null_datatype(), + VectorBuilder::Boolean(b) => b.data_type(), + VectorBuilder::UInt8(b) => b.data_type(), + VectorBuilder::UInt16(b) => b.data_type(), + VectorBuilder::UInt32(b) => b.data_type(), + VectorBuilder::UInt64(b) => b.data_type(), + VectorBuilder::Int8(b) => b.data_type(), + VectorBuilder::Int16(b) => b.data_type(), + VectorBuilder::Int32(b) => b.data_type(), + VectorBuilder::Int64(b) => b.data_type(), + VectorBuilder::Float32(b) => b.data_type(), + VectorBuilder::Float64(b) => b.data_type(), + VectorBuilder::String(b) => b.data_type(), + VectorBuilder::Binary(b) => b.data_type(), + } + } + pub fn push(&mut self, value: &Value) { if value.is_null() { self.push_null(); return; } - match (self, value) { + match (&mut *self, value) { (VectorBuilder::Boolean(b), Value::Boolean(v)) => b.push(Some(*v)), (VectorBuilder::UInt8(b), Value::UInt8(v)) => b.push(Some(*v)), (VectorBuilder::UInt16(b), Value::UInt16(v)) => b.push(Some(*v)), @@ -101,7 +120,11 @@ impl VectorBuilder { (VectorBuilder::Float64(b), Value::Float64(v)) => b.push(Some(v.into_inner())), (VectorBuilder::String(b), Value::String(v)) => b.push(Some(v.as_utf8())), (VectorBuilder::Binary(b), Value::Binary(v)) => b.push(Some(v)), - _ => panic!("Value {:?} does not match builder type", value), + _ => panic!( + "Value {:?} does not match builder type {:?}", + value, + self.data_type() + ), } } @@ -152,7 +175,10 @@ mod tests { macro_rules! impl_integer_builder_test { ($Type: ident, $datatype: ident) => { - let mut builder = VectorBuilder::with_capacity(ConcreteDataType::$datatype(), 10); + let data_type = ConcreteDataType::$datatype(); + let mut builder = VectorBuilder::with_capacity(data_type.clone(), 10); + assert_eq!(data_type, builder.data_type()); + for i in 0..10 { builder.push(&Value::$Type(i)); } @@ -175,6 +201,7 @@ mod tests { #[test] fn test_null_vector_builder() { let mut builder = VectorBuilder::new(ConcreteDataType::null_datatype()); + assert_eq!(ConcreteDataType::null_datatype(), builder.data_type()); builder.push(&Value::Null); let vector = builder.finish(); assert!(vector.is_null(0)); @@ -194,7 +221,10 @@ mod tests { #[test] fn test_float_vector_builder() { - let mut builder = VectorBuilder::new(ConcreteDataType::float32_datatype()); + let data_type = ConcreteDataType::float32_datatype(); + let mut builder = VectorBuilder::new(data_type.clone()); + assert_eq!(data_type, builder.data_type()); + builder.push(&Value::Float32(OrderedFloat(1.0))); let vector = builder.finish(); assert_eq!(Value::Float32(OrderedFloat(1.0)), vector.get(0)); @@ -207,8 +237,10 @@ mod tests { #[test] fn test_binary_vector_builder() { + let data_type = ConcreteDataType::binary_datatype(); let hello: &[u8] = b"hello"; - let mut builder = VectorBuilder::new(ConcreteDataType::binary_datatype()); + let mut builder = VectorBuilder::new(data_type.clone()); + assert_eq!(data_type, builder.data_type()); builder.push(&Value::Binary(hello.into())); let vector = builder.finish(); assert_eq!(Value::Binary(hello.into()), vector.get(0)); @@ -216,8 +248,10 @@ mod tests { #[test] fn test_string_vector_builder() { + let data_type = ConcreteDataType::string_datatype(); let hello = "hello"; - let mut builder = VectorBuilder::new(ConcreteDataType::string_datatype()); + let mut builder = VectorBuilder::new(data_type.clone()); + assert_eq!(data_type, builder.data_type()); builder.push(&Value::String(hello.into())); let vector = builder.finish(); assert_eq!(Value::String(hello.into()), vector.get(0)); diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs new file mode 100644 index 0000000000..74ad5c390c --- /dev/null +++ b/src/storage/src/chunk.rs @@ -0,0 +1,41 @@ +use async_trait::async_trait; +use store_api::storage::{Chunk, ChunkReader, SchemaRef}; + +use crate::error::{Error, Result}; +use crate::memtable::BatchIteratorPtr; + +pub struct ChunkReaderImpl { + schema: SchemaRef, + // Now we only read data from one memtable, so we just holds the memtable iterator here. + iter: BatchIteratorPtr, +} + +#[async_trait] +impl ChunkReader for ChunkReaderImpl { + type Error = Error; + + fn schema(&self) -> &SchemaRef { + &self.schema + } + + async fn next_chunk(&mut self) -> Result> { + let mut batch = match self.iter.next()? { + Some(b) => b, + None => return Ok(None), + }; + + // TODO(yingwen): Check schema, now we assumes the schema is the same as key columns + // combine with value columns. + let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len()); + columns.append(&mut batch.keys); + columns.append(&mut batch.values); + + Ok(Some(Chunk::new(columns))) + } +} + +impl ChunkReaderImpl { + pub fn new(schema: SchemaRef, iter: BatchIteratorPtr) -> ChunkReaderImpl { + ChunkReaderImpl { schema, iter } + } +} diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index e0e88d67a7..9fd1437c30 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -1,11 +1,11 @@ //! Storage engine implementation. +mod chunk; mod engine; mod error; pub mod memtable; pub mod metadata; mod region; -mod region_writer; mod snapshot; pub mod sync; mod version; diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index d828398adf..6e77c221e2 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef}; use snafu::Snafu; -use store_api::storage::{SequenceNumber, ValueType}; +use store_api::storage::{consts, SequenceNumber, ValueType}; use crate::error::Result; use crate::memtable::btree::BTreeMemtable; @@ -41,11 +41,17 @@ pub type MemtableRef = Arc; pub struct IterContext { /// The suggested batch size of the iterator. pub batch_size: usize, + /// Max visible sequence (inclusive). + pub visible_sequence: SequenceNumber, } impl Default for IterContext { fn default() -> Self { - Self { batch_size: 256 } + Self { + batch_size: consts::READ_BATCH_SIZE, + // All data in memory is visible by default. + visible_sequence: SequenceNumber::MAX, + } } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 4d3464a4d8..7a2bcb76bd 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -100,7 +100,7 @@ impl BTreeIterator { } else { map.range(..) }; - let iter = MapIterWrapper::new(iter); + let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence); let mut keys = Vec::with_capacity(self.ctx.batch_size); let mut sequences = UInt64VectorBuilder::with_capacity(self.ctx.batch_size); @@ -116,13 +116,26 @@ impl BTreeIterator { if keys.is_empty() { return None; } - self.last_key = keys.last().map(|k| (*k).clone()); + self.last_key = keys.last().map(|k| { + let mut last_key = (*k).clone(); + last_key.reset_for_seek(); + last_key + }); + + let key_data_types = self + .schema + .row_key_columns() + .map(|column_meta| column_meta.desc.data_type.clone()); + let value_data_types = self + .schema + .value_columns() + .map(|column_meta| column_meta.desc.data_type.clone()); Some(Batch { - keys: rows_to_vectors(keys.as_slice()), + keys: rows_to_vectors(key_data_types, keys.as_slice()), sequences: sequences.finish(), value_types: value_types.finish(), - values: rows_to_vectors(values.as_slice()), + values: rows_to_vectors(value_data_types, values.as_slice()), }) } } @@ -131,24 +144,37 @@ impl BTreeIterator { struct MapIterWrapper<'a, InnerKey, RowValue> { iter: btree_map::Range<'a, InnerKey, RowValue>, prev_key: Option, + visible_sequence: SequenceNumber, } impl<'a> MapIterWrapper<'a, InnerKey, RowValue> { fn new( iter: btree_map::Range<'a, InnerKey, RowValue>, + visible_sequence: SequenceNumber, ) -> MapIterWrapper<'a, InnerKey, RowValue> { MapIterWrapper { iter, prev_key: None, + visible_sequence, } } + + fn next_visible_entry(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> { + for (k, v) in self.iter.by_ref() { + if k.is_visible(self.visible_sequence) { + return Some((k, v)); + } + } + + None + } } impl<'a> Iterator for MapIterWrapper<'a, InnerKey, RowValue> { type Item = (&'a InnerKey, &'a RowValue); fn next(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> { - let (mut current_key, mut current_value) = self.iter.next()?; + let (mut current_key, mut current_value) = self.next_visible_entry()?; if self.prev_key.is_none() { self.prev_key = Some(current_key.clone()); return Some((current_key, current_value)); @@ -156,7 +182,7 @@ impl<'a> Iterator for MapIterWrapper<'a, InnerKey, RowValue> { let prev_key = self.prev_key.take().unwrap(); while prev_key.is_row_key_equal(current_key) { - if let Some((next_key, next_value)) = self.iter.next() { + if let Some((next_key, next_value)) = self.next_visible_entry() { (current_key, current_value) = (next_key, next_value); } else { return None; @@ -256,9 +282,26 @@ impl PartialOrd for InnerKey { } impl InnerKey { + #[inline] fn is_row_key_equal(&self, other: &InnerKey) -> bool { self.row_key == other.row_key } + + #[inline] + fn is_visible(&self, sequence: SequenceNumber) -> bool { + self.sequence <= sequence + } + + /// Reset the `InnerKey` so that we can use it to seek next key that + /// has different row key. + fn reset_for_seek(&mut self) { + // sequence, index_in_batch, value_type are ordered in desc order, so + // we can represent the last inner key with same row key by setting them + // to zero (Minimum value). + self.sequence = 0; + self.index_in_batch = 0; + self.value_type = ValueType::min_type(); + } } #[derive(Clone, Debug)] @@ -300,7 +343,10 @@ impl<'a> RowsProvider for &'a [&RowValue] { } } -fn rows_to_vectors(provider: T) -> Vec { +fn rows_to_vectors, T: RowsProvider>( + data_types: I, + provider: T, +) -> Vec { if provider.is_empty() { return Vec::new(); } @@ -308,8 +354,8 @@ fn rows_to_vectors(provider: T) -> Vec { let column_num = provider.column_num(); let row_num = provider.row_num(); let mut builders = Vec::with_capacity(column_num); - for v in provider.row_by_index(0) { - builders.push(VectorBuilder::with_capacity(v.data_type(), row_num)); + for data_type in data_types { + builders.push(VectorBuilder::with_capacity(data_type, row_num)); } let mut vectors = Vec::with_capacity(column_num); diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 13349a9449..81a38947dc 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -199,9 +199,12 @@ fn write_iter_memtable_case(ctx: &TestContext) { &[None, Some(5), None], // values ); - let batch_sizes = [1, 4, 8, 256]; + let batch_sizes = [1, 4, 8, consts::READ_BATCH_SIZE]; for batch_size in batch_sizes { - let iter_ctx = IterContext { batch_size }; + let iter_ctx = IterContext { + batch_size, + ..Default::default() + }; let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); assert_eq!(ctx.schema, *iter.schema()); assert_eq!(RowOrdering::Key, iter.ordering()); @@ -295,10 +298,168 @@ fn test_iter_batch_size() { // Batch size [less than, equal to, greater than] total let batch_sizes = [1, 6, 8]; for batch_size in batch_sizes { - let iter_ctx = IterContext { batch_size }; + let iter_ctx = IterContext { + batch_size, + ..Default::default() + }; let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); check_iter_batch_size(&mut *iter, total, batch_size); } }); } + +#[test] +fn test_duplicate_key_across_batch() { + let tester = MemtableTester::default(); + tester.run_testcase(|ctx| { + write_kvs( + &*ctx.memtable, + 10, // sequence + ValueType::Put, + &[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys + &[Some(1), None, None, None], // values + ); + + write_kvs( + &*ctx.memtable, + 11, // sequence + ValueType::Put, + &[(1000, 1), (2001, 2)], // keys + &[Some(1231), Some(1232)], // values + ); + + let batch_sizes = [1, 2, 3, 4, 5]; + for batch_size in batch_sizes { + let iter_ctx = IterContext { + batch_size, + ..Default::default() + }; + + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + check_iter_content( + &mut *iter, + &[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys + &[11, 10, 10, 11], // sequences + &[ + ValueType::Put, + ValueType::Put, + ValueType::Put, + ValueType::Put, + ], // value types + &[Some(1231), None, None, Some(1232)], // values + ); + } + }); +} + +#[test] +fn test_duplicate_key_in_batch() { + let tester = MemtableTester::default(); + tester.run_testcase(|ctx| { + write_kvs( + &*ctx.memtable, + 10, // sequence + ValueType::Put, + &[(1000, 1), (1000, 2), (1000, 1), (2001, 2)], // keys + &[None, None, Some(1234), None], // values + ); + + let batch_sizes = [1, 2, 3, 4, 5]; + for batch_size in batch_sizes { + let iter_ctx = IterContext { + batch_size, + ..Default::default() + }; + + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + check_iter_content( + &mut *iter, + &[(1000, 1), (1000, 2), (2001, 2)], // keys + &[10, 10, 10], // sequences + &[ValueType::Put, ValueType::Put, ValueType::Put], // value types + &[Some(1234), None, None, None], // values + ); + } + }); +} + +#[test] +fn test_sequence_visibility() { + let tester = MemtableTester::default(); + tester.run_testcase(|ctx| { + write_kvs( + &*ctx.memtable, + 10, // sequence + ValueType::Put, + &[(1000, 1), (1000, 2)], // keys + &[Some(1), Some(2)], // values + ); + + write_kvs( + &*ctx.memtable, + 11, // sequence + ValueType::Put, + &[(1000, 1), (1000, 2)], // keys + &[Some(11), Some(12)], // values + ); + + write_kvs( + &*ctx.memtable, + 12, // sequence + ValueType::Put, + &[(1000, 1), (1000, 2)], // keys + &[Some(21), Some(22)], // values + ); + + { + let iter_ctx = IterContext { + batch_size: 1, + visible_sequence: 9, + }; + + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + check_iter_content( + &mut *iter, + &[], // keys + &[], // sequences + &[], // value types + &[], // values + ); + } + + { + let iter_ctx = IterContext { + batch_size: 1, + visible_sequence: 10, + }; + + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + check_iter_content( + &mut *iter, + &[(1000, 1), (1000, 2)], // keys + &[10, 10], // sequences + &[ValueType::Put, ValueType::Put], // value types + &[Some(1), Some(2)], // values + ); + } + + { + let iter_ctx = IterContext { + batch_size: 1, + visible_sequence: 11, + }; + + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + check_iter_content( + &mut *iter, + &[(1000, 1), (1000, 2)], // keys + &[11, 11], // sequences + &[ValueType::Put, ValueType::Put], // value types + &[Some(11), Some(12)], // values + ); + } + }); +} + +// TODO(yingwen): Test key overwrite in same batch. diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index b3bfdfff7b..84d86130b3 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -1,3 +1,7 @@ +#[cfg(test)] +mod tests; +mod writer; + use std::sync::Arc; use async_trait::async_trait; @@ -8,7 +12,7 @@ use tokio::sync::Mutex; use crate::error::{self, Error, Result}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableSchema, MemtableSet}; use crate::metadata::{RegionMetaImpl, RegionMetadata}; -use crate::region_writer::RegionWriter; +use crate::region::writer::RegionWriter; use crate::snapshot::SnapshotImpl; use crate::version::{VersionControl, VersionControlRef}; use crate::write_batch::WriteBatch; @@ -39,7 +43,7 @@ impl Region for RegionImpl { } fn snapshot(&self, _ctx: &ReadContext) -> Result { - unimplemented!() + Ok(self.inner.create_snapshot()) } } @@ -59,6 +63,12 @@ impl RegionImpl { RegionImpl { inner } } + + #[cfg(test)] + #[inline] + fn committed_sequence(&self) -> store_api::storage::SequenceNumber { + self.inner.version.committed_sequence() + } } struct RegionInner { @@ -87,36 +97,11 @@ impl RegionInner { let mut writer = self.writer.lock().await; writer.write(ctx, &self.version, request).await } -} -#[cfg(test)] -mod tests { - use datatypes::type_id::LogicalTypeId; - use store_api::storage::consts; + fn create_snapshot(&self) -> SnapshotImpl { + let version = self.version.current(); + let sequence = self.version.committed_sequence(); - use super::*; - use crate::test_util::descriptor_util::RegionDescBuilder; - use crate::test_util::schema_util; - - #[test] - fn test_new_region() { - let region_name = "region-0"; - let desc = RegionDescBuilder::new(region_name) - .push_key_column(("k1", LogicalTypeId::Int32, false)) - .push_value_column(("v1", LogicalTypeId::Float32, true)) - .build(); - let metadata = desc.try_into().unwrap(); - - let region = RegionImpl::new(region_name.to_string(), metadata); - - let expect_schema = schema_util::new_schema_ref(&[ - ("k1", LogicalTypeId::Int32, false), - ("timestamp", LogicalTypeId::UInt64, false), - (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), - ("v1", LogicalTypeId::Float32, true), - ]); - - assert_eq!(region_name, region.name()); - assert_eq!(expect_schema, *region.in_memory_metadata().schema()); + SnapshotImpl::new(version, sequence) } } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs new file mode 100644 index 0000000000..ce1f22c407 --- /dev/null +++ b/src/storage/src/region/tests.rs @@ -0,0 +1,31 @@ +//! Region tests. + +mod read_write; + +use datatypes::type_id::LogicalTypeId; +use store_api::storage::consts; + +use super::*; +use crate::test_util::{self, descriptor_util::RegionDescBuilder, schema_util}; + +#[test] +fn test_new_region() { + let region_name = "region-0"; + let desc = RegionDescBuilder::new(region_name) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_value_column(("v1", LogicalTypeId::Float32, true)) + .build(); + let metadata = desc.try_into().unwrap(); + + let region = RegionImpl::new(region_name.to_string(), metadata); + + let expect_schema = schema_util::new_schema_ref(&[ + ("k1", LogicalTypeId::Int32, false), + (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), + ("v1", LogicalTypeId::Float32, true), + ]); + + assert_eq!(region_name, region.name()); + assert_eq!(expect_schema, *region.in_memory_metadata().schema()); +} diff --git a/src/storage/src/region/tests/read_write.rs b/src/storage/src/region/tests/read_write.rs new file mode 100644 index 0000000000..2e3cc16ee7 --- /dev/null +++ b/src/storage/src/region/tests/read_write.rs @@ -0,0 +1,164 @@ +//! Region read/write tests. + +use std::sync::Arc; + +use datatypes::prelude::*; +use datatypes::type_id::LogicalTypeId; +use datatypes::vectors::Int64Vector; +use store_api::storage::{ + consts, Chunk, ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, + SequenceNumber, Snapshot, WriteContext, WriteRequest, WriteResponse, +}; + +use crate::region::RegionImpl; +use crate::test_util::{self, descriptor_util::RegionDescBuilder, write_batch_util}; +use crate::write_batch::{PutData, WriteBatch}; + +/// Create a new region for read/write test +fn new_region_for_rw(enable_version_column: bool) -> RegionImpl { + let region_name = "region-rw-0"; + let desc = RegionDescBuilder::new(region_name) + .enable_version_column(enable_version_column) + .push_value_column(("v1", LogicalTypeId::Int64, true)) + .build(); + let metadata = desc.try_into().unwrap(); + + RegionImpl::new(region_name.to_string(), metadata) +} + +fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { + if enable_version_column { + write_batch_util::new_write_batch(&[ + (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), + ("v1", LogicalTypeId::Int64, true), + ]) + } else { + write_batch_util::new_write_batch(&[ + (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + ("v1", LogicalTypeId::Int64, true), + ]) + } +} + +fn new_put_data(data: &[(i64, Option)]) -> PutData { + let mut put_data = PutData::with_num_columns(2); + + let timestamps = Int64Vector::from_values(data.iter().map(|kv| kv.0)); + let values = Int64Vector::from_iter(data.iter().map(|kv| kv.1)); + + put_data + .add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps)) + .unwrap(); + put_data.add_value_column("v1", Arc::new(values)).unwrap(); + + put_data +} + +fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option)>) { + assert_eq!(2, chunk.columns.len()); + + let timestamps = chunk.columns[0] + .as_any() + .downcast_ref::() + .unwrap(); + let values = chunk.columns[1] + .as_any() + .downcast_ref::() + .unwrap(); + for (ts, value) in timestamps.iter_data().zip(values.iter_data()) { + dst.push((ts.unwrap(), value)); + } +} + +/// Test region without considering version column. +struct Tester { + region: RegionImpl, + write_ctx: WriteContext, + read_ctx: ReadContext, +} + +impl Default for Tester { + fn default() -> Tester { + Tester::new() + } +} + +impl Tester { + fn new() -> Tester { + let region = new_region_for_rw(false); + + Tester { + region, + write_ctx: WriteContext::default(), + read_ctx: ReadContext::default(), + } + } + + /// Put without version specified. + /// + /// Format of data: (timestamp, v1), timestamp is key, v1 is value. + async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { + // Build a batch without version. + let mut batch = new_write_batch_for_test(false); + let put_data = new_put_data(data); + batch.put(put_data).unwrap(); + + self.region.write(&self.write_ctx, batch).await.unwrap() + } + + async fn full_scan(&self) -> Vec<(i64, Option)> { + let snapshot = self.region.snapshot(&self.read_ctx).unwrap(); + + let resp = snapshot + .scan(&self.read_ctx, ScanRequest::default()) + .await + .unwrap(); + let mut reader = resp.reader; + + let metadata = self.region.in_memory_metadata(); + assert_eq!(metadata.schema(), reader.schema()); + + let mut dst = Vec::new(); + while let Some(chunk) = reader.next_chunk().await.unwrap() { + append_chunk_to(&chunk, &mut dst); + } + + dst + } + + fn committed_sequence(&self) -> SequenceNumber { + self.region.committed_sequence() + } +} + +#[tokio::test] +async fn test_simple_put_scan() { + let tester = Tester::default(); + + let data = vec![ + (1000, Some(100)), + (1001, Some(101)), + (1002, None), + (1003, Some(103)), + (1004, Some(104)), + ]; + + tester.put(&data).await; + + let output = tester.full_scan().await; + assert_eq!(data, output); +} + +#[tokio::test] +async fn test_sequence_increase() { + let tester = Tester::default(); + + let mut committed_sequence = tester.committed_sequence(); + for i in 0..100 { + tester.put(&[(i, Some(1234))]).await; + committed_sequence += 1; + + assert_eq!(committed_sequence, tester.committed_sequence()); + } +} diff --git a/src/storage/src/region_writer.rs b/src/storage/src/region/writer.rs similarity index 62% rename from src/storage/src/region_writer.rs rename to src/storage/src/region/writer.rs index 3d4e7eb2c4..899da728a1 100644 --- a/src/storage/src/region_writer.rs +++ b/src/storage/src/region/writer.rs @@ -1,4 +1,4 @@ -use store_api::storage::{SequenceNumber, WriteContext, WriteResponse}; +use store_api::storage::{WriteContext, WriteResponse}; use crate::error::Result; use crate::memtable::{Inserter, MemtableBuilderRef}; @@ -7,15 +7,11 @@ use crate::write_batch::WriteBatch; pub struct RegionWriter { _memtable_builder: MemtableBuilderRef, - last_sequence: SequenceNumber, } impl RegionWriter { pub fn new(_memtable_builder: MemtableBuilderRef) -> RegionWriter { - RegionWriter { - _memtable_builder, - last_sequence: 0, - } + RegionWriter { _memtable_builder } } // TODO(yingwen): Support group commit so we can avoid taking mutable reference. @@ -31,13 +27,20 @@ impl RegionWriter { // TODO(yingwen): Write wal and get sequence. let version = version_control.current(); - let memtables = &version.memtables; + let mem = version.mutable_memtable(); - let mem = memtables.mutable_memtable(); - self.last_sequence += 1; - let mut inserter = Inserter::new(self.last_sequence); + let committed_sequence = version_control.committed_sequence(); + // Sequence for current write batch. + let next_sequence = committed_sequence + 1; + + // Insert batch into memtable. + let mut inserter = Inserter::new(next_sequence); inserter.insert_memtable(&request, &**mem)?; + // Update committed_sequence to make current batch visible. The `&mut self` of RegionWriter + // guarantees the writer is exclusive. + version_control.set_committed_sequence(next_sequence); + Ok(WriteResponse {}) } } diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index ff76c4947e..7b89a19a8e 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -1,26 +1,68 @@ +use std::cmp; + use async_trait::async_trait; use store_api::storage::{ - GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, Snapshot, + GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, SequenceNumber, + Snapshot, }; +use crate::chunk::ChunkReaderImpl; use crate::error::{Error, Result}; +use crate::memtable::IterContext; +use crate::version::VersionRef; /// [Snapshot] implementation. -pub struct SnapshotImpl {} +pub struct SnapshotImpl { + version: VersionRef, + /// Max sequence number (inclusive) visible to user. + visible_sequence: SequenceNumber, +} #[async_trait] impl Snapshot for SnapshotImpl { type Error = Error; + type Reader = ChunkReaderImpl; fn schema(&self) -> &SchemaRef { - unimplemented!() + self.version.schema() } - async fn scan(&self, _ctx: &ReadContext, _request: ScanRequest) -> Result { - unimplemented!() + async fn scan( + &self, + ctx: &ReadContext, + request: ScanRequest, + ) -> Result> { + let visible_sequence = self.sequence_to_read(request.sequence); + + let mem = self.version.mutable_memtable(); + let iter_ctx = IterContext { + batch_size: ctx.batch_size, + visible_sequence, + }; + let iter = mem.iter(iter_ctx)?; + + let reader = ChunkReaderImpl::new(self.version.schema().clone(), iter); + + Ok(ScanResponse { reader }) } async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result { unimplemented!() } } + +impl SnapshotImpl { + pub fn new(version: VersionRef, visible_sequence: SequenceNumber) -> SnapshotImpl { + SnapshotImpl { + version, + visible_sequence, + } + } + + #[inline] + fn sequence_to_read(&self, request_sequence: Option) -> SequenceNumber { + request_sequence + .map(|s| cmp::min(s, self.visible_sequence)) + .unwrap_or(self.visible_sequence) + } +} diff --git a/src/storage/src/sync.rs b/src/storage/src/sync.rs index 3455fe2ade..7408db7d94 100644 --- a/src/storage/src/sync.rs +++ b/src/storage/src/sync.rs @@ -50,7 +50,7 @@ impl CowCell { /// A RAII implementation of a write transaction of the [CowCell]. /// -/// When this txn is dropped (falls out of scope or commited), the lock will be +/// When this txn is dropped (falls out of scope or committed), the lock will be /// unlocked, but updates to the content won't be visible unless the txn is committed. #[must_use = "if unused the CowCell will immediately unlock"] pub struct TxnGuard<'a, T: Clone> { diff --git a/src/storage/src/test_util.rs b/src/storage/src/test_util.rs index dcd7bb18f2..92828d8d59 100644 --- a/src/storage/src/test_util.rs +++ b/src/storage/src/test_util.rs @@ -1,3 +1,5 @@ pub mod descriptor_util; pub mod schema_util; pub mod write_batch_util; + +pub const TIMESTAMP_NAME: &str = "timestamp"; diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index 81da5cb52d..b16b8aaf13 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -4,7 +4,7 @@ use store_api::storage::{ RegionDescriptor, RowKeyDescriptorBuilder, }; -use crate::test_util::schema_util::ColumnDef; +use crate::test_util::{self, schema_util::ColumnDef}; /// A RegionDescriptor builder for test. pub struct RegionDescBuilder { @@ -17,9 +17,13 @@ pub struct RegionDescBuilder { impl RegionDescBuilder { pub fn new>(name: T) -> Self { let key_builder = RowKeyDescriptorBuilder::new( - ColumnDescriptorBuilder::new(2, "timestamp", ConcreteDataType::uint64_datatype()) - .is_nullable(false) - .build(), + ColumnDescriptorBuilder::new( + 2, + test_util::TIMESTAMP_NAME, + ConcreteDataType::int64_datatype(), + ) + .is_nullable(false) + .build(), ); Self { diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 6ca8d6ce37..6267db5fd4 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -1,12 +1,26 @@ +//! Version control of storage. +//! +//! To read latest data from `VersionControl`, we need to +//! 1. Acquire `Version` from `VersionControl`. +//! 2. Then acquire last sequence. +//! +//! Reason: data may be flushed/compacted and some data with old sequence may be removed +//! and became invisible between step 1 and 2, so need to acquire version at first. + +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use crate::memtable::MemtableSet; +use store_api::storage::{SchemaRef, SequenceNumber}; + +use crate::memtable::{MemtableRef, MemtableSet}; use crate::metadata::{RegionMetadata, RegionMetadataRef}; use crate::sync::CowCell; /// Controls version of in memory state for a region. pub struct VersionControl { version: CowCell, + /// Latest sequence that is committed and visible to user. + committed_sequence: AtomicU64, } impl VersionControl { @@ -14,10 +28,12 @@ impl VersionControl { pub fn new(metadata: RegionMetadata, memtables: MemtableSet) -> VersionControl { VersionControl { version: CowCell::new(Version::new(metadata, memtables)), + committed_sequence: AtomicU64::new(0), } } /// Returns current version. + #[inline] pub fn current(&self) -> VersionRef { self.version.get() } @@ -27,6 +43,21 @@ impl VersionControl { let version = self.current(); version.metadata.clone() } + + #[inline] + pub fn committed_sequence(&self) -> SequenceNumber { + self.committed_sequence.load(Ordering::Acquire) + } + + /// Set committed sequence to `value`. + /// + /// External synchronization is required to ensure only one thread can update the + /// last sequence. + #[inline] + pub fn set_committed_sequence(&self, value: SequenceNumber) { + // Release ordering should be enough to guarantee sequence is updated at last. + self.committed_sequence.store(value, Ordering::Release); + } } pub type VersionControlRef = Arc; @@ -45,7 +76,7 @@ pub struct Version { /// in Arc to allow sharing metadata and reuse metadata when creating a new /// `Version`. metadata: RegionMetadataRef, - pub memtables: MemtableSet, + memtables: MemtableSet, // TODO(yingwen): Also need to store last sequence to this version when switching // version, so we can know the newest data can read from this version. } @@ -57,4 +88,43 @@ impl Version { memtables, } } + + #[inline] + pub fn schema(&self) -> &SchemaRef { + &self.metadata.schema + } + + #[inline] + pub fn mutable_memtable(&self) -> &MemtableRef { + self.memtables.mutable_memtable() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableSchema}; + use crate::test_util::descriptor_util::RegionDescBuilder; + + fn new_version_control() -> VersionControl { + let desc = RegionDescBuilder::new("version-test") + .enable_version_column(false) + .build(); + let metadata: RegionMetadata = desc.try_into().unwrap(); + + let schema = MemtableSchema::new(metadata.columns_row_key.clone()); + let memtable = DefaultMemtableBuilder {}.build(schema); + let memtable_set = MemtableSet::new(memtable); + + VersionControl::new(metadata, memtable_set) + } + + #[test] + fn test_version_control() { + let version_control = new_version_control(); + + assert_eq!(0, version_control.committed_sequence()); + version_control.set_committed_sequence(12345); + assert_eq!(12345, version_control.committed_sequence()); + } } diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 29b4d47925..3f39a8e56a 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -1,5 +1,6 @@ //! Storage APIs. +mod chunk; pub mod consts; mod descriptors; mod engine; @@ -13,6 +14,7 @@ mod types; pub use datatypes::data_type::ConcreteDataType; pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::{ gen_region_name, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RegionId, diff --git a/src/store-api/src/storage/chunk.rs b/src/store-api/src/storage/chunk.rs new file mode 100644 index 0000000000..f2eb031f65 --- /dev/null +++ b/src/store-api/src/storage/chunk.rs @@ -0,0 +1,26 @@ +use async_trait::async_trait; +use common_error::ext::ErrorExt; +use datatypes::vectors::VectorRef; + +use crate::storage::SchemaRef; + +/// A bunch of rows in columnar format. +pub struct Chunk { + pub columns: Vec, + // TODO(yingwen): Sequences. +} + +impl Chunk { + pub fn new(columns: Vec) -> Chunk { + Chunk { columns } + } +} + +#[async_trait] +pub trait ChunkReader: Send { + type Error: ErrorExt + Send + Sync; + + fn schema(&self) -> &SchemaRef; + + async fn next_chunk(&mut self) -> Result, Self::Error>; +} diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs index 2effce4718..5863d3cb8f 100644 --- a/src/store-api/src/storage/consts.rs +++ b/src/store-api/src/storage/consts.rs @@ -2,7 +2,7 @@ use crate::storage::descriptors::{ColumnFamilyId, ColumnId}; -// Ids reserved for internal column families: +// ---------- Ids reserved for internal column families ------------------------ /// Column family Id for row key columns. /// @@ -12,16 +12,27 @@ pub const KEY_CF_ID: ColumnFamilyId = 0; /// Id for default column family. pub const DEFAULT_CF_ID: ColumnFamilyId = 1; -// Ids reserved for internal columns: +// ----------------------------------------------------------------------------- + +// ---------- Ids reserved for internal columns -------------------------------- // TODO(yingwen): Reserve one bit for internal columns. /// Column id for version column. pub const VERSION_COLUMN_ID: ColumnId = 1; -// Names reserved for internal columns: +// ----------------------------------------------------------------------------- + +// ---------- Names reserved for internal columns and engine ------------------- /// Name of version column. pub const VERSION_COLUMN_NAME: &str = "__version"; - // Names for default column family. pub const DEFAULT_CF_NAME: &str = "default"; + +// ----------------------------------------------------------------------------- + +// ---------- Default options -------------------------------------------------- + +pub const READ_BATCH_SIZE: usize = 256; + +// ----------------------------------------------------------------------------- diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 5295a457ca..c10e8b81b6 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -45,6 +45,7 @@ pub struct RowKeyDescriptor { /// Enable version column in row key if this field is true. /// /// The default value is true. + // FIXME(yingwen): Change default value to true (Disable version column by default). pub enable_version_column: bool, } diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index bb41344664..8d50f8aede 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -2,6 +2,8 @@ use common_error::ext::ErrorExt; use datatypes::schema::SchemaRef; use datatypes::vectors::VectorRef; +use crate::storage::SequenceNumber; + /// Write request holds a collection of updates to apply to a region. pub trait WriteRequest: Send { type Error: ErrorExt + Send + Sync; @@ -27,8 +29,14 @@ pub trait PutOperation: Send { fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; } -#[derive(Debug)] -pub struct ScanRequest {} +#[derive(Debug, Default)] +pub struct ScanRequest { + /// Max sequence number to read, None for latest sequence. + /// + /// Default is None. Only returns data whose sequence number is less than or + /// equal to the `sequence`. + pub sequence: Option, +} #[derive(Debug)] pub struct GetRequest {} diff --git a/src/store-api/src/storage/responses.rs b/src/store-api/src/storage/responses.rs index 823eb060d9..7cd094abff 100644 --- a/src/store-api/src/storage/responses.rs +++ b/src/store-api/src/storage/responses.rs @@ -2,7 +2,10 @@ pub struct WriteResponse {} #[derive(Debug)] -pub struct ScanResponse {} +pub struct ScanResponse { + /// Reader to read result chunks. + pub reader: R, +} #[derive(Debug)] pub struct GetResponse {} diff --git a/src/store-api/src/storage/snapshot.rs b/src/store-api/src/storage/snapshot.rs index 9527ac8635..7761c69c4d 100644 --- a/src/store-api/src/storage/snapshot.rs +++ b/src/store-api/src/storage/snapshot.rs @@ -2,6 +2,8 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; use datatypes::schema::SchemaRef; +use crate::storage::chunk::ChunkReader; +use crate::storage::consts; use crate::storage::requests::{GetRequest, ScanRequest}; use crate::storage::responses::{GetResponse, ScanResponse}; @@ -9,6 +11,7 @@ use crate::storage::responses::{GetResponse, ScanResponse}; #[async_trait] pub trait Snapshot: Send + Sync { type Error: ErrorExt + Send + Sync; + type Reader: ChunkReader; fn schema(&self) -> &SchemaRef; @@ -16,7 +19,7 @@ pub trait Snapshot: Send + Sync { &self, ctx: &ReadContext, request: ScanRequest, - ) -> Result; + ) -> Result, Self::Error>; async fn get(&self, ctx: &ReadContext, request: GetRequest) -> Result; @@ -24,4 +27,15 @@ pub trait Snapshot: Send + Sync { /// Context for read. #[derive(Debug, Clone)] -pub struct ReadContext {} +pub struct ReadContext { + /// Suggested batch size of chunk. + pub batch_size: usize, +} + +impl Default for ReadContext { + fn default() -> ReadContext { + ReadContext { + batch_size: consts::READ_BATCH_SIZE, + } + } +} diff --git a/src/store-api/src/storage/types.rs b/src/store-api/src/storage/types.rs index 361ba7caca..91e6bec061 100644 --- a/src/store-api/src/storage/types.rs +++ b/src/store-api/src/storage/types.rs @@ -15,4 +15,20 @@ impl ValueType { pub fn as_u8(&self) -> u8 { *self as u8 } + + /// Minimum value type after casting to u8. + pub const fn min_type() -> ValueType { + ValueType::Put + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_value_type() { + assert_eq!(0, ValueType::Put.as_u8()); + assert_eq!(0, ValueType::min_type().as_u8()); + } }