feat(storage): Implement snapshot scan for region (#46)

* feat: Maintain last sequence in VersionControl

* refactor(recordbatch): Replace `Arc<Schema>` by SchemaRef

* feat: Memtable support filter rows with invisible sequence

* feat: snapshot wip

* feat: Implement scan for SnapshotImpl

* test: Add a test that simply puts and scans a region

* chore: Fix clippy

* fix(memtable): Fix memtable returning duplicate keys

* test(memtable): Add sequence visibility test

* test: Add ValueType test

* chore: Address cr comments

* fix: Fix value is not storing but adding to committed sequence
This commit is contained in:
evenyag
2022-06-20 14:09:31 +08:00
committed by GitHub
parent e78c015fc0
commit 056185eb24
24 changed files with 760 additions and 89 deletions

View File

@@ -1,14 +1,12 @@
use std::sync::Arc;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::Schema;
use datatypes::schema::SchemaRef;
use datatypes::vectors::Helper;
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
pub schema: Arc<Schema>,
pub schema: SchemaRef,
pub df_recordbatch: DfRecordBatch,
}
@@ -35,10 +33,13 @@ impl Serialize for RecordBatch {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion_common::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::Schema;
use super::*;

View File

@@ -5,8 +5,8 @@ use crate::scalars::ScalarVectorBuilder;
use crate::value::Value;
use crate::vectors::{
BinaryVectorBuilder, BooleanVectorBuilder, Float32VectorBuilder, Float64VectorBuilder,
Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, NullVector,
StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, MutableVector,
NullVector, StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
UInt8VectorBuilder, VectorRef,
};
@@ -81,13 +81,32 @@ impl VectorBuilder {
}
}
pub fn data_type(&self) -> ConcreteDataType {
match self {
VectorBuilder::Null(_) => ConcreteDataType::null_datatype(),
VectorBuilder::Boolean(b) => b.data_type(),
VectorBuilder::UInt8(b) => b.data_type(),
VectorBuilder::UInt16(b) => b.data_type(),
VectorBuilder::UInt32(b) => b.data_type(),
VectorBuilder::UInt64(b) => b.data_type(),
VectorBuilder::Int8(b) => b.data_type(),
VectorBuilder::Int16(b) => b.data_type(),
VectorBuilder::Int32(b) => b.data_type(),
VectorBuilder::Int64(b) => b.data_type(),
VectorBuilder::Float32(b) => b.data_type(),
VectorBuilder::Float64(b) => b.data_type(),
VectorBuilder::String(b) => b.data_type(),
VectorBuilder::Binary(b) => b.data_type(),
}
}
pub fn push(&mut self, value: &Value) {
if value.is_null() {
self.push_null();
return;
}
match (self, value) {
match (&mut *self, value) {
(VectorBuilder::Boolean(b), Value::Boolean(v)) => b.push(Some(*v)),
(VectorBuilder::UInt8(b), Value::UInt8(v)) => b.push(Some(*v)),
(VectorBuilder::UInt16(b), Value::UInt16(v)) => b.push(Some(*v)),
@@ -101,7 +120,11 @@ impl VectorBuilder {
(VectorBuilder::Float64(b), Value::Float64(v)) => b.push(Some(v.into_inner())),
(VectorBuilder::String(b), Value::String(v)) => b.push(Some(v.as_utf8())),
(VectorBuilder::Binary(b), Value::Binary(v)) => b.push(Some(v)),
_ => panic!("Value {:?} does not match builder type", value),
_ => panic!(
"Value {:?} does not match builder type {:?}",
value,
self.data_type()
),
}
}
@@ -152,7 +175,10 @@ mod tests {
macro_rules! impl_integer_builder_test {
($Type: ident, $datatype: ident) => {
let mut builder = VectorBuilder::with_capacity(ConcreteDataType::$datatype(), 10);
let data_type = ConcreteDataType::$datatype();
let mut builder = VectorBuilder::with_capacity(data_type.clone(), 10);
assert_eq!(data_type, builder.data_type());
for i in 0..10 {
builder.push(&Value::$Type(i));
}
@@ -175,6 +201,7 @@ mod tests {
#[test]
fn test_null_vector_builder() {
let mut builder = VectorBuilder::new(ConcreteDataType::null_datatype());
assert_eq!(ConcreteDataType::null_datatype(), builder.data_type());
builder.push(&Value::Null);
let vector = builder.finish();
assert!(vector.is_null(0));
@@ -194,7 +221,10 @@ mod tests {
#[test]
fn test_float_vector_builder() {
let mut builder = VectorBuilder::new(ConcreteDataType::float32_datatype());
let data_type = ConcreteDataType::float32_datatype();
let mut builder = VectorBuilder::new(data_type.clone());
assert_eq!(data_type, builder.data_type());
builder.push(&Value::Float32(OrderedFloat(1.0)));
let vector = builder.finish();
assert_eq!(Value::Float32(OrderedFloat(1.0)), vector.get(0));
@@ -207,8 +237,10 @@ mod tests {
#[test]
fn test_binary_vector_builder() {
let data_type = ConcreteDataType::binary_datatype();
let hello: &[u8] = b"hello";
let mut builder = VectorBuilder::new(ConcreteDataType::binary_datatype());
let mut builder = VectorBuilder::new(data_type.clone());
assert_eq!(data_type, builder.data_type());
builder.push(&Value::Binary(hello.into()));
let vector = builder.finish();
assert_eq!(Value::Binary(hello.into()), vector.get(0));
@@ -216,8 +248,10 @@ mod tests {
#[test]
fn test_string_vector_builder() {
let data_type = ConcreteDataType::string_datatype();
let hello = "hello";
let mut builder = VectorBuilder::new(ConcreteDataType::string_datatype());
let mut builder = VectorBuilder::new(data_type.clone());
assert_eq!(data_type, builder.data_type());
builder.push(&Value::String(hello.into()));
let vector = builder.finish();
assert_eq!(Value::String(hello.into()), vector.get(0));

41
src/storage/src/chunk.rs Normal file
View File

@@ -0,0 +1,41 @@
use async_trait::async_trait;
use store_api::storage::{Chunk, ChunkReader, SchemaRef};
use crate::error::{Error, Result};
use crate::memtable::BatchIteratorPtr;
pub struct ChunkReaderImpl {
schema: SchemaRef,
// Now we only read data from one memtable, so we just holds the memtable iterator here.
iter: BatchIteratorPtr,
}
#[async_trait]
impl ChunkReader for ChunkReaderImpl {
type Error = Error;
fn schema(&self) -> &SchemaRef {
&self.schema
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
let mut batch = match self.iter.next()? {
Some(b) => b,
None => return Ok(None),
};
// TODO(yingwen): Check schema, now we assumes the schema is the same as key columns
// combine with value columns.
let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len());
columns.append(&mut batch.keys);
columns.append(&mut batch.values);
Ok(Some(Chunk::new(columns)))
}
}
impl ChunkReaderImpl {
pub fn new(schema: SchemaRef, iter: BatchIteratorPtr) -> ChunkReaderImpl {
ChunkReaderImpl { schema, iter }
}
}

View File

@@ -1,11 +1,11 @@
//! Storage engine implementation.
mod chunk;
mod engine;
mod error;
pub mod memtable;
pub mod metadata;
mod region;
mod region_writer;
mod snapshot;
pub mod sync;
mod version;

View File

@@ -9,7 +9,7 @@ use std::sync::Arc;
use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef};
use snafu::Snafu;
use store_api::storage::{SequenceNumber, ValueType};
use store_api::storage::{consts, SequenceNumber, ValueType};
use crate::error::Result;
use crate::memtable::btree::BTreeMemtable;
@@ -41,11 +41,17 @@ pub type MemtableRef = Arc<dyn Memtable>;
pub struct IterContext {
/// The suggested batch size of the iterator.
pub batch_size: usize,
/// Max visible sequence (inclusive).
pub visible_sequence: SequenceNumber,
}
impl Default for IterContext {
fn default() -> Self {
Self { batch_size: 256 }
Self {
batch_size: consts::READ_BATCH_SIZE,
// All data in memory is visible by default.
visible_sequence: SequenceNumber::MAX,
}
}
}

View File

@@ -100,7 +100,7 @@ impl BTreeIterator {
} else {
map.range(..)
};
let iter = MapIterWrapper::new(iter);
let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence);
let mut keys = Vec::with_capacity(self.ctx.batch_size);
let mut sequences = UInt64VectorBuilder::with_capacity(self.ctx.batch_size);
@@ -116,13 +116,26 @@ impl BTreeIterator {
if keys.is_empty() {
return None;
}
self.last_key = keys.last().map(|k| (*k).clone());
self.last_key = keys.last().map(|k| {
let mut last_key = (*k).clone();
last_key.reset_for_seek();
last_key
});
let key_data_types = self
.schema
.row_key_columns()
.map(|column_meta| column_meta.desc.data_type.clone());
let value_data_types = self
.schema
.value_columns()
.map(|column_meta| column_meta.desc.data_type.clone());
Some(Batch {
keys: rows_to_vectors(keys.as_slice()),
keys: rows_to_vectors(key_data_types, keys.as_slice()),
sequences: sequences.finish(),
value_types: value_types.finish(),
values: rows_to_vectors(values.as_slice()),
values: rows_to_vectors(value_data_types, values.as_slice()),
})
}
}
@@ -131,24 +144,37 @@ impl BTreeIterator {
struct MapIterWrapper<'a, InnerKey, RowValue> {
iter: btree_map::Range<'a, InnerKey, RowValue>,
prev_key: Option<InnerKey>,
visible_sequence: SequenceNumber,
}
impl<'a> MapIterWrapper<'a, InnerKey, RowValue> {
fn new(
iter: btree_map::Range<'a, InnerKey, RowValue>,
visible_sequence: SequenceNumber,
) -> MapIterWrapper<'a, InnerKey, RowValue> {
MapIterWrapper {
iter,
prev_key: None,
visible_sequence,
}
}
fn next_visible_entry(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> {
for (k, v) in self.iter.by_ref() {
if k.is_visible(self.visible_sequence) {
return Some((k, v));
}
}
None
}
}
impl<'a> Iterator for MapIterWrapper<'a, InnerKey, RowValue> {
type Item = (&'a InnerKey, &'a RowValue);
fn next(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> {
let (mut current_key, mut current_value) = self.iter.next()?;
let (mut current_key, mut current_value) = self.next_visible_entry()?;
if self.prev_key.is_none() {
self.prev_key = Some(current_key.clone());
return Some((current_key, current_value));
@@ -156,7 +182,7 @@ impl<'a> Iterator for MapIterWrapper<'a, InnerKey, RowValue> {
let prev_key = self.prev_key.take().unwrap();
while prev_key.is_row_key_equal(current_key) {
if let Some((next_key, next_value)) = self.iter.next() {
if let Some((next_key, next_value)) = self.next_visible_entry() {
(current_key, current_value) = (next_key, next_value);
} else {
return None;
@@ -256,9 +282,26 @@ impl PartialOrd for InnerKey {
}
impl InnerKey {
#[inline]
fn is_row_key_equal(&self, other: &InnerKey) -> bool {
self.row_key == other.row_key
}
#[inline]
fn is_visible(&self, sequence: SequenceNumber) -> bool {
self.sequence <= sequence
}
/// Reset the `InnerKey` so that we can use it to seek next key that
/// has different row key.
fn reset_for_seek(&mut self) {
// sequence, index_in_batch, value_type are ordered in desc order, so
// we can represent the last inner key with same row key by setting them
// to zero (Minimum value).
self.sequence = 0;
self.index_in_batch = 0;
self.value_type = ValueType::min_type();
}
}
#[derive(Clone, Debug)]
@@ -300,7 +343,10 @@ impl<'a> RowsProvider for &'a [&RowValue] {
}
}
fn rows_to_vectors<T: RowsProvider>(provider: T) -> Vec<VectorRef> {
fn rows_to_vectors<I: Iterator<Item = ConcreteDataType>, T: RowsProvider>(
data_types: I,
provider: T,
) -> Vec<VectorRef> {
if provider.is_empty() {
return Vec::new();
}
@@ -308,8 +354,8 @@ fn rows_to_vectors<T: RowsProvider>(provider: T) -> Vec<VectorRef> {
let column_num = provider.column_num();
let row_num = provider.row_num();
let mut builders = Vec::with_capacity(column_num);
for v in provider.row_by_index(0) {
builders.push(VectorBuilder::with_capacity(v.data_type(), row_num));
for data_type in data_types {
builders.push(VectorBuilder::with_capacity(data_type, row_num));
}
let mut vectors = Vec::with_capacity(column_num);

View File

@@ -199,9 +199,12 @@ fn write_iter_memtable_case(ctx: &TestContext) {
&[None, Some(5), None], // values
);
let batch_sizes = [1, 4, 8, 256];
let batch_sizes = [1, 4, 8, consts::READ_BATCH_SIZE];
for batch_size in batch_sizes {
let iter_ctx = IterContext { batch_size };
let iter_ctx = IterContext {
batch_size,
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
assert_eq!(ctx.schema, *iter.schema());
assert_eq!(RowOrdering::Key, iter.ordering());
@@ -295,10 +298,168 @@ fn test_iter_batch_size() {
// Batch size [less than, equal to, greater than] total
let batch_sizes = [1, 6, 8];
for batch_size in batch_sizes {
let iter_ctx = IterContext { batch_size };
let iter_ctx = IterContext {
batch_size,
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_batch_size(&mut *iter, total, batch_size);
}
});
}
#[test]
fn test_duplicate_key_across_batch() {
let tester = MemtableTester::default();
tester.run_testcase(|ctx| {
write_kvs(
&*ctx.memtable,
10, // sequence
ValueType::Put,
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
&[Some(1), None, None, None], // values
);
write_kvs(
&*ctx.memtable,
11, // sequence
ValueType::Put,
&[(1000, 1), (2001, 2)], // keys
&[Some(1231), Some(1232)], // values
);
let batch_sizes = [1, 2, 3, 4, 5];
for batch_size in batch_sizes {
let iter_ctx = IterContext {
batch_size,
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
&[11, 10, 10, 11], // sequences
&[
ValueType::Put,
ValueType::Put,
ValueType::Put,
ValueType::Put,
], // value types
&[Some(1231), None, None, Some(1232)], // values
);
}
});
}
#[test]
fn test_duplicate_key_in_batch() {
let tester = MemtableTester::default();
tester.run_testcase(|ctx| {
write_kvs(
&*ctx.memtable,
10, // sequence
ValueType::Put,
&[(1000, 1), (1000, 2), (1000, 1), (2001, 2)], // keys
&[None, None, Some(1234), None], // values
);
let batch_sizes = [1, 2, 3, 4, 5];
for batch_size in batch_sizes {
let iter_ctx = IterContext {
batch_size,
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2), (2001, 2)], // keys
&[10, 10, 10], // sequences
&[ValueType::Put, ValueType::Put, ValueType::Put], // value types
&[Some(1234), None, None, None], // values
);
}
});
}
#[test]
fn test_sequence_visibility() {
let tester = MemtableTester::default();
tester.run_testcase(|ctx| {
write_kvs(
&*ctx.memtable,
10, // sequence
ValueType::Put,
&[(1000, 1), (1000, 2)], // keys
&[Some(1), Some(2)], // values
);
write_kvs(
&*ctx.memtable,
11, // sequence
ValueType::Put,
&[(1000, 1), (1000, 2)], // keys
&[Some(11), Some(12)], // values
);
write_kvs(
&*ctx.memtable,
12, // sequence
ValueType::Put,
&[(1000, 1), (1000, 2)], // keys
&[Some(21), Some(22)], // values
);
{
let iter_ctx = IterContext {
batch_size: 1,
visible_sequence: 9,
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[], // keys
&[], // sequences
&[], // value types
&[], // values
);
}
{
let iter_ctx = IterContext {
batch_size: 1,
visible_sequence: 10,
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2)], // keys
&[10, 10], // sequences
&[ValueType::Put, ValueType::Put], // value types
&[Some(1), Some(2)], // values
);
}
{
let iter_ctx = IterContext {
batch_size: 1,
visible_sequence: 11,
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2)], // keys
&[11, 11], // sequences
&[ValueType::Put, ValueType::Put], // value types
&[Some(11), Some(12)], // values
);
}
});
}
// TODO(yingwen): Test key overwrite in same batch.

View File

@@ -1,3 +1,7 @@
#[cfg(test)]
mod tests;
mod writer;
use std::sync::Arc;
use async_trait::async_trait;
@@ -8,7 +12,7 @@ use tokio::sync::Mutex;
use crate::error::{self, Error, Result};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableSchema, MemtableSet};
use crate::metadata::{RegionMetaImpl, RegionMetadata};
use crate::region_writer::RegionWriter;
use crate::region::writer::RegionWriter;
use crate::snapshot::SnapshotImpl;
use crate::version::{VersionControl, VersionControlRef};
use crate::write_batch::WriteBatch;
@@ -39,7 +43,7 @@ impl Region for RegionImpl {
}
fn snapshot(&self, _ctx: &ReadContext) -> Result<SnapshotImpl> {
unimplemented!()
Ok(self.inner.create_snapshot())
}
}
@@ -59,6 +63,12 @@ impl RegionImpl {
RegionImpl { inner }
}
#[cfg(test)]
#[inline]
fn committed_sequence(&self) -> store_api::storage::SequenceNumber {
self.inner.version.committed_sequence()
}
}
struct RegionInner {
@@ -87,36 +97,11 @@ impl RegionInner {
let mut writer = self.writer.lock().await;
writer.write(ctx, &self.version, request).await
}
}
#[cfg(test)]
mod tests {
use datatypes::type_id::LogicalTypeId;
use store_api::storage::consts;
fn create_snapshot(&self) -> SnapshotImpl {
let version = self.version.current();
let sequence = self.version.committed_sequence();
use super::*;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::schema_util;
#[test]
fn test_new_region() {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.build();
let metadata = desc.try_into().unwrap();
let region = RegionImpl::new(region_name.to_string(), metadata);
let expect_schema = schema_util::new_schema_ref(&[
("k1", LogicalTypeId::Int32, false),
("timestamp", LogicalTypeId::UInt64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Float32, true),
]);
assert_eq!(region_name, region.name());
assert_eq!(expect_schema, *region.in_memory_metadata().schema());
SnapshotImpl::new(version, sequence)
}
}

View File

@@ -0,0 +1,31 @@
//! Region tests.
mod read_write;
use datatypes::type_id::LogicalTypeId;
use store_api::storage::consts;
use super::*;
use crate::test_util::{self, descriptor_util::RegionDescBuilder, schema_util};
#[test]
fn test_new_region() {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.build();
let metadata = desc.try_into().unwrap();
let region = RegionImpl::new(region_name.to_string(), metadata);
let expect_schema = schema_util::new_schema_ref(&[
("k1", LogicalTypeId::Int32, false),
(test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Float32, true),
]);
assert_eq!(region_name, region.name());
assert_eq!(expect_schema, *region.in_memory_metadata().schema());
}

View File

@@ -0,0 +1,164 @@
//! Region read/write tests.
use std::sync::Arc;
use datatypes::prelude::*;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::Int64Vector;
use store_api::storage::{
consts, Chunk, ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest,
SequenceNumber, Snapshot, WriteContext, WriteRequest, WriteResponse,
};
use crate::region::RegionImpl;
use crate::test_util::{self, descriptor_util::RegionDescBuilder, write_batch_util};
use crate::write_batch::{PutData, WriteBatch};
/// Create a new region for read/write test
fn new_region_for_rw(enable_version_column: bool) -> RegionImpl {
let region_name = "region-rw-0";
let desc = RegionDescBuilder::new(region_name)
.enable_version_column(enable_version_column)
.push_value_column(("v1", LogicalTypeId::Int64, true))
.build();
let metadata = desc.try_into().unwrap();
RegionImpl::new(region_name.to_string(), metadata)
}
fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
if enable_version_column {
write_batch_util::new_write_batch(&[
(test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Int64, true),
])
} else {
write_batch_util::new_write_batch(&[
(test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false),
("v1", LogicalTypeId::Int64, true),
])
}
}
fn new_put_data(data: &[(i64, Option<i64>)]) -> PutData {
let mut put_data = PutData::with_num_columns(2);
let timestamps = Int64Vector::from_values(data.iter().map(|kv| kv.0));
let values = Int64Vector::from_iter(data.iter().map(|kv| kv.1));
put_data
.add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps))
.unwrap();
put_data.add_value_column("v1", Arc::new(values)).unwrap();
put_data
}
fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<i64>)>) {
assert_eq!(2, chunk.columns.len());
let timestamps = chunk.columns[0]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let values = chunk.columns[1]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
for (ts, value) in timestamps.iter_data().zip(values.iter_data()) {
dst.push((ts.unwrap(), value));
}
}
/// Test region without considering version column.
struct Tester {
region: RegionImpl,
write_ctx: WriteContext,
read_ctx: ReadContext,
}
impl Default for Tester {
fn default() -> Tester {
Tester::new()
}
}
impl Tester {
fn new() -> Tester {
let region = new_region_for_rw(false);
Tester {
region,
write_ctx: WriteContext::default(),
read_ctx: ReadContext::default(),
}
}
/// Put without version specified.
///
/// Format of data: (timestamp, v1), timestamp is key, v1 is value.
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
// Build a batch without version.
let mut batch = new_write_batch_for_test(false);
let put_data = new_put_data(data);
batch.put(put_data).unwrap();
self.region.write(&self.write_ctx, batch).await.unwrap()
}
async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
let snapshot = self.region.snapshot(&self.read_ctx).unwrap();
let resp = snapshot
.scan(&self.read_ctx, ScanRequest::default())
.await
.unwrap();
let mut reader = resp.reader;
let metadata = self.region.in_memory_metadata();
assert_eq!(metadata.schema(), reader.schema());
let mut dst = Vec::new();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
append_chunk_to(&chunk, &mut dst);
}
dst
}
fn committed_sequence(&self) -> SequenceNumber {
self.region.committed_sequence()
}
}
#[tokio::test]
async fn test_simple_put_scan() {
let tester = Tester::default();
let data = vec![
(1000, Some(100)),
(1001, Some(101)),
(1002, None),
(1003, Some(103)),
(1004, Some(104)),
];
tester.put(&data).await;
let output = tester.full_scan().await;
assert_eq!(data, output);
}
#[tokio::test]
async fn test_sequence_increase() {
let tester = Tester::default();
let mut committed_sequence = tester.committed_sequence();
for i in 0..100 {
tester.put(&[(i, Some(1234))]).await;
committed_sequence += 1;
assert_eq!(committed_sequence, tester.committed_sequence());
}
}

View File

@@ -1,4 +1,4 @@
use store_api::storage::{SequenceNumber, WriteContext, WriteResponse};
use store_api::storage::{WriteContext, WriteResponse};
use crate::error::Result;
use crate::memtable::{Inserter, MemtableBuilderRef};
@@ -7,15 +7,11 @@ use crate::write_batch::WriteBatch;
pub struct RegionWriter {
_memtable_builder: MemtableBuilderRef,
last_sequence: SequenceNumber,
}
impl RegionWriter {
pub fn new(_memtable_builder: MemtableBuilderRef) -> RegionWriter {
RegionWriter {
_memtable_builder,
last_sequence: 0,
}
RegionWriter { _memtable_builder }
}
// TODO(yingwen): Support group commit so we can avoid taking mutable reference.
@@ -31,13 +27,20 @@ impl RegionWriter {
// TODO(yingwen): Write wal and get sequence.
let version = version_control.current();
let memtables = &version.memtables;
let mem = version.mutable_memtable();
let mem = memtables.mutable_memtable();
self.last_sequence += 1;
let mut inserter = Inserter::new(self.last_sequence);
let committed_sequence = version_control.committed_sequence();
// Sequence for current write batch.
let next_sequence = committed_sequence + 1;
// Insert batch into memtable.
let mut inserter = Inserter::new(next_sequence);
inserter.insert_memtable(&request, &**mem)?;
// Update committed_sequence to make current batch visible. The `&mut self` of RegionWriter
// guarantees the writer is exclusive.
version_control.set_committed_sequence(next_sequence);
Ok(WriteResponse {})
}
}

View File

@@ -1,26 +1,68 @@
use std::cmp;
use async_trait::async_trait;
use store_api::storage::{
GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, Snapshot,
GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, SequenceNumber,
Snapshot,
};
use crate::chunk::ChunkReaderImpl;
use crate::error::{Error, Result};
use crate::memtable::IterContext;
use crate::version::VersionRef;
/// [Snapshot] implementation.
pub struct SnapshotImpl {}
pub struct SnapshotImpl {
version: VersionRef,
/// Max sequence number (inclusive) visible to user.
visible_sequence: SequenceNumber,
}
#[async_trait]
impl Snapshot for SnapshotImpl {
type Error = Error;
type Reader = ChunkReaderImpl;
fn schema(&self) -> &SchemaRef {
unimplemented!()
self.version.schema()
}
async fn scan(&self, _ctx: &ReadContext, _request: ScanRequest) -> Result<ScanResponse> {
unimplemented!()
async fn scan(
&self,
ctx: &ReadContext,
request: ScanRequest,
) -> Result<ScanResponse<ChunkReaderImpl>> {
let visible_sequence = self.sequence_to_read(request.sequence);
let mem = self.version.mutable_memtable();
let iter_ctx = IterContext {
batch_size: ctx.batch_size,
visible_sequence,
};
let iter = mem.iter(iter_ctx)?;
let reader = ChunkReaderImpl::new(self.version.schema().clone(), iter);
Ok(ScanResponse { reader })
}
async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result<GetResponse> {
unimplemented!()
}
}
impl SnapshotImpl {
pub fn new(version: VersionRef, visible_sequence: SequenceNumber) -> SnapshotImpl {
SnapshotImpl {
version,
visible_sequence,
}
}
#[inline]
fn sequence_to_read(&self, request_sequence: Option<SequenceNumber>) -> SequenceNumber {
request_sequence
.map(|s| cmp::min(s, self.visible_sequence))
.unwrap_or(self.visible_sequence)
}
}

View File

@@ -50,7 +50,7 @@ impl<T: Clone> CowCell<T> {
/// A RAII implementation of a write transaction of the [CowCell].
///
/// When this txn is dropped (falls out of scope or commited), the lock will be
/// When this txn is dropped (falls out of scope or committed), the lock will be
/// unlocked, but updates to the content won't be visible unless the txn is committed.
#[must_use = "if unused the CowCell will immediately unlock"]
pub struct TxnGuard<'a, T: Clone> {

View File

@@ -1,3 +1,5 @@
pub mod descriptor_util;
pub mod schema_util;
pub mod write_batch_util;
pub const TIMESTAMP_NAME: &str = "timestamp";

View File

@@ -4,7 +4,7 @@ use store_api::storage::{
RegionDescriptor, RowKeyDescriptorBuilder,
};
use crate::test_util::schema_util::ColumnDef;
use crate::test_util::{self, schema_util::ColumnDef};
/// A RegionDescriptor builder for test.
pub struct RegionDescBuilder {
@@ -17,9 +17,13 @@ pub struct RegionDescBuilder {
impl RegionDescBuilder {
pub fn new<T: Into<String>>(name: T) -> Self {
let key_builder = RowKeyDescriptorBuilder::new(
ColumnDescriptorBuilder::new(2, "timestamp", ConcreteDataType::uint64_datatype())
.is_nullable(false)
.build(),
ColumnDescriptorBuilder::new(
2,
test_util::TIMESTAMP_NAME,
ConcreteDataType::int64_datatype(),
)
.is_nullable(false)
.build(),
);
Self {

View File

@@ -1,12 +1,26 @@
//! Version control of storage.
//!
//! To read latest data from `VersionControl`, we need to
//! 1. Acquire `Version` from `VersionControl`.
//! 2. Then acquire last sequence.
//!
//! Reason: data may be flushed/compacted and some data with old sequence may be removed
//! and became invisible between step 1 and 2, so need to acquire version at first.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use crate::memtable::MemtableSet;
use store_api::storage::{SchemaRef, SequenceNumber};
use crate::memtable::{MemtableRef, MemtableSet};
use crate::metadata::{RegionMetadata, RegionMetadataRef};
use crate::sync::CowCell;
/// Controls version of in memory state for a region.
pub struct VersionControl {
version: CowCell<Version>,
/// Latest sequence that is committed and visible to user.
committed_sequence: AtomicU64,
}
impl VersionControl {
@@ -14,10 +28,12 @@ impl VersionControl {
pub fn new(metadata: RegionMetadata, memtables: MemtableSet) -> VersionControl {
VersionControl {
version: CowCell::new(Version::new(metadata, memtables)),
committed_sequence: AtomicU64::new(0),
}
}
/// Returns current version.
#[inline]
pub fn current(&self) -> VersionRef {
self.version.get()
}
@@ -27,6 +43,21 @@ impl VersionControl {
let version = self.current();
version.metadata.clone()
}
#[inline]
pub fn committed_sequence(&self) -> SequenceNumber {
self.committed_sequence.load(Ordering::Acquire)
}
/// Set committed sequence to `value`.
///
/// External synchronization is required to ensure only one thread can update the
/// last sequence.
#[inline]
pub fn set_committed_sequence(&self, value: SequenceNumber) {
// Release ordering should be enough to guarantee sequence is updated at last.
self.committed_sequence.store(value, Ordering::Release);
}
}
pub type VersionControlRef = Arc<VersionControl>;
@@ -45,7 +76,7 @@ pub struct Version {
/// in Arc to allow sharing metadata and reuse metadata when creating a new
/// `Version`.
metadata: RegionMetadataRef,
pub memtables: MemtableSet,
memtables: MemtableSet,
// TODO(yingwen): Also need to store last sequence to this version when switching
// version, so we can know the newest data can read from this version.
}
@@ -57,4 +88,43 @@ impl Version {
memtables,
}
}
#[inline]
pub fn schema(&self) -> &SchemaRef {
&self.metadata.schema
}
#[inline]
pub fn mutable_memtable(&self) -> &MemtableRef {
self.memtables.mutable_memtable()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableSchema};
use crate::test_util::descriptor_util::RegionDescBuilder;
fn new_version_control() -> VersionControl {
let desc = RegionDescBuilder::new("version-test")
.enable_version_column(false)
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();
let schema = MemtableSchema::new(metadata.columns_row_key.clone());
let memtable = DefaultMemtableBuilder {}.build(schema);
let memtable_set = MemtableSet::new(memtable);
VersionControl::new(metadata, memtable_set)
}
#[test]
fn test_version_control() {
let version_control = new_version_control();
assert_eq!(0, version_control.committed_sequence());
version_control.set_committed_sequence(12345);
assert_eq!(12345, version_control.committed_sequence());
}
}

View File

@@ -1,5 +1,6 @@
//! Storage APIs.
mod chunk;
pub mod consts;
mod descriptors;
mod engine;
@@ -13,6 +14,7 @@ mod types;
pub use datatypes::data_type::ConcreteDataType;
pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
pub use self::chunk::{Chunk, ChunkReader};
pub use self::descriptors::{
gen_region_name, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor,
ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RegionId,

View File

@@ -0,0 +1,26 @@
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use datatypes::vectors::VectorRef;
use crate::storage::SchemaRef;
/// A bunch of rows in columnar format.
pub struct Chunk {
pub columns: Vec<VectorRef>,
// TODO(yingwen): Sequences.
}
impl Chunk {
pub fn new(columns: Vec<VectorRef>) -> Chunk {
Chunk { columns }
}
}
#[async_trait]
pub trait ChunkReader: Send {
type Error: ErrorExt + Send + Sync;
fn schema(&self) -> &SchemaRef;
async fn next_chunk(&mut self) -> Result<Option<Chunk>, Self::Error>;
}

View File

@@ -2,7 +2,7 @@
use crate::storage::descriptors::{ColumnFamilyId, ColumnId};
// Ids reserved for internal column families:
// ---------- Ids reserved for internal column families ------------------------
/// Column family Id for row key columns.
///
@@ -12,16 +12,27 @@ pub const KEY_CF_ID: ColumnFamilyId = 0;
/// Id for default column family.
pub const DEFAULT_CF_ID: ColumnFamilyId = 1;
// Ids reserved for internal columns:
// -----------------------------------------------------------------------------
// ---------- Ids reserved for internal columns --------------------------------
// TODO(yingwen): Reserve one bit for internal columns.
/// Column id for version column.
pub const VERSION_COLUMN_ID: ColumnId = 1;
// Names reserved for internal columns:
// -----------------------------------------------------------------------------
// ---------- Names reserved for internal columns and engine -------------------
/// Name of version column.
pub const VERSION_COLUMN_NAME: &str = "__version";
// Names for default column family.
pub const DEFAULT_CF_NAME: &str = "default";
// -----------------------------------------------------------------------------
// ---------- Default options --------------------------------------------------
pub const READ_BATCH_SIZE: usize = 256;
// -----------------------------------------------------------------------------

View File

@@ -45,6 +45,7 @@ pub struct RowKeyDescriptor {
/// Enable version column in row key if this field is true.
///
/// The default value is true.
// FIXME(yingwen): Change default value to true (Disable version column by default).
pub enable_version_column: bool,
}

View File

@@ -2,6 +2,8 @@ use common_error::ext::ErrorExt;
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
use crate::storage::SequenceNumber;
/// Write request holds a collection of updates to apply to a region.
pub trait WriteRequest: Send {
type Error: ErrorExt + Send + Sync;
@@ -27,8 +29,14 @@ pub trait PutOperation: Send {
fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
}
#[derive(Debug)]
pub struct ScanRequest {}
#[derive(Debug, Default)]
pub struct ScanRequest {
/// Max sequence number to read, None for latest sequence.
///
/// Default is None. Only returns data whose sequence number is less than or
/// equal to the `sequence`.
pub sequence: Option<SequenceNumber>,
}
#[derive(Debug)]
pub struct GetRequest {}

View File

@@ -2,7 +2,10 @@
pub struct WriteResponse {}
#[derive(Debug)]
pub struct ScanResponse {}
pub struct ScanResponse<R> {
/// Reader to read result chunks.
pub reader: R,
}
#[derive(Debug)]
pub struct GetResponse {}

View File

@@ -2,6 +2,8 @@ use async_trait::async_trait;
use common_error::ext::ErrorExt;
use datatypes::schema::SchemaRef;
use crate::storage::chunk::ChunkReader;
use crate::storage::consts;
use crate::storage::requests::{GetRequest, ScanRequest};
use crate::storage::responses::{GetResponse, ScanResponse};
@@ -9,6 +11,7 @@ use crate::storage::responses::{GetResponse, ScanResponse};
#[async_trait]
pub trait Snapshot: Send + Sync {
type Error: ErrorExt + Send + Sync;
type Reader: ChunkReader;
fn schema(&self) -> &SchemaRef;
@@ -16,7 +19,7 @@ pub trait Snapshot: Send + Sync {
&self,
ctx: &ReadContext,
request: ScanRequest,
) -> Result<ScanResponse, Self::Error>;
) -> Result<ScanResponse<Self::Reader>, Self::Error>;
async fn get(&self, ctx: &ReadContext, request: GetRequest)
-> Result<GetResponse, Self::Error>;
@@ -24,4 +27,15 @@ pub trait Snapshot: Send + Sync {
/// Context for read.
#[derive(Debug, Clone)]
pub struct ReadContext {}
pub struct ReadContext {
/// Suggested batch size of chunk.
pub batch_size: usize,
}
impl Default for ReadContext {
fn default() -> ReadContext {
ReadContext {
batch_size: consts::READ_BATCH_SIZE,
}
}
}

View File

@@ -15,4 +15,20 @@ impl ValueType {
pub fn as_u8(&self) -> u8 {
*self as u8
}
/// Minimum value type after casting to u8.
pub const fn min_type() -> ValueType {
ValueType::Put
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_value_type() {
assert_eq!(0, ValueType::Put.as_u8());
assert_eq!(0, ValueType::min_type().as_u8());
}
}