feat: Support projection (#192)

* feat: Add projected schema

* feat: Use projected schema to read sst

* feat: Use vector of column to implement Batch

* feat: Use projected schema to convert batch to chunk

* feat: Add no_projection() to build ProjectedSchema

* feat: Memtable supports projection

The btree memtable use `is_needed()` to filter unneeded value columns,
then use `ProjectedSchema::batch_from_parts()` to construct
batch, so it don't need to known the layout of internal columns.

* test: Add tests for ProjectedSchema

* test: Add tests for ProjectedSchema

Also returns error if the `projected_columns` used to build the
`ProjectedSchema` is empty.

* test: Add test for memtable projection

* feat: Table pass projection to storage engine

* fix: Use timestamp column name as schema metadata

This fix the issue that the metadata refer to the wrong timestamp column
if datafusion reorder the fields of the arrow schema.

* fix: Fix projected schema not passed to memtable

* feat: Add tests for region projection

* chore: fix clippy

* test: Add test for unordered projection

* chore: Move projected_schema to ReadOptions

Also fix some typo
This commit is contained in:
evenyag
2022-08-25 15:27:47 +08:00
committed by GitHub
parent 465dcca65e
commit 53637c90fd
24 changed files with 1200 additions and 356 deletions

View File

@@ -31,16 +31,8 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display(
"Failed to parse index in schema meta, value: {}, source: {}",
value,
source
))]
ParseSchemaIndex {
value: String,
source: std::num::ParseIntError,
backtrace: Backtrace,
},
#[snafu(display("Timestamp column {} not found", name,))]
TimestampNotFound { name: String, backtrace: Backtrace },
#[snafu(display(
"Failed to parse version in schema meta, value: {}, source: {}",

View File

@@ -4,12 +4,19 @@ use std::sync::Arc;
pub use arrow::datatypes::Metadata;
use arrow::datatypes::{Field, Schema as ArrowSchema};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Error, Result};
const TIMESTAMP_INDEX_KEY: &str = "greptime:timestamp_index";
/// Key used to store column name of the timestamp column in metadata.
///
/// Instead of storing the column index, we store the column name as the
/// query engine may modify the column order of the arrow schema, then
/// we would fail to recover the correct timestamp column when converting
/// the arrow schema back to our schema.
const TIMESTAMP_COLUMN_KEY: &str = "greptime:timestamp_column";
/// Key used to store version number of the schema in metadata.
const VERSION_KEY: &str = "greptime:version";
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -80,6 +87,11 @@ impl Schema {
self.column_schemas.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.column_schemas.is_empty()
}
/// Returns index of the timestamp key column.
#[inline]
pub fn timestamp_index(&self) -> Option<usize> {
@@ -154,8 +166,9 @@ impl SchemaBuilder {
pub fn build(mut self) -> Result<Schema> {
if let Some(timestamp_index) = self.timestamp_index {
validate_timestamp_index(&self.column_schemas, timestamp_index)?;
let timestamp_name = self.column_schemas[timestamp_index].name.clone();
self.metadata
.insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string());
.insert(TIMESTAMP_COLUMN_KEY.to_string(), timestamp_name);
}
self.metadata
.insert(VERSION_KEY.to_string(), self.version.to_string());
@@ -241,9 +254,14 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
column_schemas.push(column_schema);
}
let timestamp_index = try_parse_index(&arrow_schema.metadata, TIMESTAMP_INDEX_KEY)?;
if let Some(index) = timestamp_index {
validate_timestamp_index(&column_schemas, index)?;
let timestamp_name = arrow_schema.metadata.get(TIMESTAMP_COLUMN_KEY);
let mut timestamp_index = None;
if let Some(name) = timestamp_name {
let index = name_to_index
.get(name)
.context(error::TimestampNotFoundSnafu { name })?;
validate_timestamp_index(&column_schemas, *index)?;
timestamp_index = Some(*index);
}
let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
@@ -267,18 +285,6 @@ impl TryFrom<ArrowSchema> for Schema {
}
}
fn try_parse_index(metadata: &Metadata, key: &str) -> Result<Option<usize>> {
if let Some(value) = metadata.get(key) {
let index = value
.parse()
.context(error::ParseSchemaIndexSnafu { value })?;
Ok(Some(index))
} else {
Ok(None)
}
}
fn try_parse_version(metadata: &Metadata, key: &str) -> Result<u32> {
if let Some(value) = metadata.get(key) {
let version = value
@@ -313,6 +319,7 @@ mod tests {
fn test_build_empty_schema() {
let schema = SchemaBuilder::default().build().unwrap();
assert_eq!(0, schema.num_columns());
assert!(schema.is_empty());
assert!(SchemaBuilder::default().timestamp_index(0).build().is_err());
}
@@ -326,6 +333,7 @@ mod tests {
let schema = Schema::new(column_schemas.clone());
assert_eq!(2, schema.num_columns());
assert!(!schema.is_empty());
assert!(schema.timestamp_index().is_none());
assert!(schema.timestamp_column().is_none());
assert_eq!(Schema::INITIAL_VERSION, schema.version());

View File

@@ -1,5 +1,4 @@
use storage::memtable::{IterContext, KeyValues, MemtableRef};
use store_api::storage::SequenceNumber;
use crate::memtable::util::new_memtable;
@@ -26,10 +25,9 @@ impl BenchContext {
let mut read_count = 0;
let iter_ctx = IterContext {
batch_size,
visible_sequence: SequenceNumber::MAX,
for_flush: false,
..Default::default()
};
let iter = self.memtable.iter(iter_ctx).unwrap();
let iter = self.memtable.iter(&iter_ctx).unwrap();
for batch in iter {
batch.unwrap();
read_count += batch_size;

View File

@@ -1,9 +1,13 @@
use async_trait::async_trait;
use store_api::storage::{Chunk, ChunkReader, SchemaRef};
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::memtable::{BoxedBatchIterator, IterContext, MemtableSet};
use async_trait::async_trait;
use snafu::ResultExt;
use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber};
use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef, MemtableSet};
use crate::read::{Batch, BatchReader, ConcatReader};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
type BoxedIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
@@ -13,7 +17,7 @@ type BoxedIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
// using `Stream`, maybe change to `Stream` if we find out it is more efficient and have
// necessary to do so.
pub struct ChunkReaderImpl {
schema: SchemaRef,
schema: ProjectedSchemaRef,
iter: Option<BoxedIterator>,
sst_reader: ConcatReader,
}
@@ -23,7 +27,7 @@ impl ChunkReader for ChunkReaderImpl {
type Error = Error;
fn schema(&self) -> &SchemaRef {
&self.schema
self.schema.projected_user_schema()
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
@@ -32,8 +36,7 @@ impl ChunkReader for ChunkReaderImpl {
None => return Ok(None),
};
// TODO(yingwen): Check schema.
let chunk = batch_to_chunk(batch);
let chunk = self.schema.batch_to_chunk(&batch);
Ok(Some(chunk))
}
@@ -41,7 +44,7 @@ impl ChunkReader for ChunkReaderImpl {
impl ChunkReaderImpl {
pub fn new(
schema: SchemaRef,
schema: ProjectedSchemaRef,
iter: BoxedIterator,
sst_reader: ConcatReader,
) -> ChunkReaderImpl {
@@ -64,54 +67,55 @@ impl ChunkReaderImpl {
}
}
// Assumes the schema is the same as key columns combine with value columns.
fn batch_to_chunk(mut batch: Batch) -> Chunk {
let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len());
columns.append(&mut batch.keys);
columns.append(&mut batch.values);
Chunk::new(columns)
}
/// Builder to create a new [ChunkReaderImpl] from scan request.
pub struct ChunkReaderBuilder {
schema: SchemaRef,
schema: RegionSchemaRef,
projection: Option<Vec<usize>>,
sst_layer: AccessLayerRef,
iter_ctx: IterContext,
iters: Vec<BoxedBatchIterator>,
memtables: Vec<MemtableRef>,
files_to_read: Vec<FileHandle>,
}
impl ChunkReaderBuilder {
pub fn new(schema: SchemaRef, sst_layer: AccessLayerRef) -> Self {
pub fn new(schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Self {
ChunkReaderBuilder {
schema,
iter_ctx: IterContext::default(),
iters: Vec::new(),
projection: None,
sst_layer,
iter_ctx: IterContext::default(),
memtables: Vec::new(),
files_to_read: Vec::new(),
}
}
/// Reserve space for iterating `num` memtables.
pub fn reserve_num_memtables(mut self, num: usize) -> Self {
self.iters.reserve(num);
self.memtables.reserve(num);
self
}
pub fn iter_ctx(mut self, iter_ctx: IterContext) -> Self {
self.iter_ctx = iter_ctx;
pub fn projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Result<Self> {
pub fn batch_size(mut self, batch_size: usize) -> Self {
self.iter_ctx.batch_size = batch_size;
self
}
pub fn visible_sequence(mut self, sequence: SequenceNumber) -> Self {
self.iter_ctx.visible_sequence = sequence;
self
}
pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Self {
for (_range, mem) in memtables.iter() {
let iter = mem.iter(self.iter_ctx.clone())?;
self.iters.push(iter);
self.memtables.push(mem.clone());
}
Ok(self)
self
}
pub fn pick_ssts(mut self, ssts: &LevelMetas) -> Result<Self> {
@@ -120,12 +124,24 @@ impl ChunkReaderBuilder {
Ok(self)
}
pub async fn build(self) -> Result<ChunkReaderImpl> {
pub async fn build(mut self) -> Result<ChunkReaderImpl> {
let schema = Arc::new(
ProjectedSchema::new(self.schema, self.projection)
.context(error::InvalidProjectionSnafu)?,
);
self.iter_ctx.projected_schema = Some(schema.clone());
let mut iters = Vec::with_capacity(self.memtables.len());
for mem in self.memtables {
let iter = mem.iter(&self.iter_ctx)?;
iters.push(iter);
}
// Now we just simply chain all iterators together, ignore duplications/ordering.
let iter = Box::new(self.iters.into_iter().flatten());
let iter = Box::new(iters.into_iter().flatten());
let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,
projected_schema: schema.clone(),
};
let mut sst_readers = Vec::with_capacity(self.files_to_read.len());
for file in &self.files_to_read {
@@ -138,7 +154,7 @@ impl ChunkReaderBuilder {
}
let reader = ConcatReader::new(sst_readers);
Ok(ChunkReaderImpl::new(self.schema, iter, reader))
Ok(ChunkReaderImpl::new(schema, iter, reader))
}
}

View File

@@ -231,6 +231,12 @@ pub enum Error {
#[snafu(backtrace)]
source: MetadataError,
},
#[snafu(display("Invalid projection, source: {}", source))]
InvalidProjection {
#[snafu(backtrace)]
source: crate::schema::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -245,7 +251,8 @@ impl ErrorExt for Error {
| InvalidInputSchema { .. }
| BatchMissingColumn { .. }
| BatchMissingTimestamp { .. }
| InvalidTimestamp { .. } => StatusCode::InvalidArguments,
| InvalidTimestamp { .. }
| InvalidProjection { .. } => StatusCode::InvalidArguments,
Utf8 { .. }
| EncodeJson { .. }

View File

@@ -164,16 +164,14 @@ impl<S: LogStore> FlushJob<S> {
}
let mut futures = Vec::with_capacity(self.memtables.len());
let iter_ctx = IterContext {
for_flush: true,
..Default::default()
};
for m in &self.memtables {
let file_name = Self::generate_sst_file_name();
// TODO(hl): Check if random file name already exists in meta.
let iter_ctx = IterContext {
for_flush: true,
..Default::default()
};
let iter = m.memtable.iter(iter_ctx)?;
let iter = m.memtable.iter(&iter_ctx)?;
futures.push(async move {
self.sst_layer
.write_sst(&file_name, iter, &WriteOptions::default())

View File

@@ -14,15 +14,17 @@ use crate::memtable::btree::BTreeMemtable;
pub use crate::memtable::inserter::Inserter;
pub use crate::memtable::version::{MemtableSet, MemtableVersion};
use crate::read::Batch;
use crate::schema::RegionSchemaRef;
use crate::schema::{ProjectedSchemaRef, RegionSchemaRef};
/// Unique id for memtables under same region.
pub type MemtableId = u32;
/// In memory storage.
pub trait Memtable: Send + Sync + std::fmt::Debug {
/// Returns id of this memtable.
fn id(&self) -> MemtableId;
/// Returns schema of the memtable.
fn schema(&self) -> RegionSchemaRef;
/// Write key/values to the memtable.
@@ -32,8 +34,7 @@ pub trait Memtable: Send + Sync + std::fmt::Debug {
fn write(&self, kvs: &KeyValues) -> Result<()>;
/// Iterates the memtable.
// TODO(yingwen): 1. Use reference of IterContext? 2. Consider passing a projector (does column projection).
fn iter(&self, ctx: IterContext) -> Result<BoxedBatchIterator>;
fn iter(&self, ctx: &IterContext) -> Result<BoxedBatchIterator>;
/// Returns the estimated bytes allocated by this memtable from heap.
fn bytes_allocated(&self) -> usize;
@@ -42,6 +43,8 @@ pub trait Memtable: Send + Sync + std::fmt::Debug {
pub type MemtableRef = Arc<dyn Memtable>;
/// Context for iterating memtable.
///
/// Should be cheap to clone.
#[derive(Debug, Clone)]
pub struct IterContext {
/// The suggested batch size of the iterator.
@@ -53,6 +56,11 @@ pub struct IterContext {
// in memtable.
/// Returns all rows, ignores sequence visibility and key duplication.
pub for_flush: bool,
/// Schema the reader expect to read.
///
/// Set to `None` to read all columns.
pub projected_schema: Option<ProjectedSchemaRef>,
}
impl Default for IterContext {
@@ -62,6 +70,7 @@ impl Default for IterContext {
// All data in memory is visible by default.
visible_sequence: SequenceNumber::MAX,
for_flush: false,
projected_schema: None,
}
}
}
@@ -82,7 +91,7 @@ pub enum RowOrdering {
/// as an async trait.
pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
/// Returns the schema of this iterator.
fn schema(&self) -> RegionSchemaRef;
fn schema(&self) -> ProjectedSchemaRef;
/// Returns the ordering of the output rows from this iterator.
fn ordering(&self) -> RowOrdering;

View File

@@ -18,7 +18,7 @@ use crate::memtable::{
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering,
};
use crate::read::Batch;
use crate::schema::RegionSchemaRef;
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
type RwLockMap = RwLock<BTreeMap<InnerKey, RowValue>>;
@@ -66,10 +66,10 @@ impl Memtable for BTreeMemtable {
Ok(())
}
fn iter(&self, ctx: IterContext) -> Result<BoxedBatchIterator> {
fn iter(&self, ctx: &IterContext) -> Result<BoxedBatchIterator> {
assert!(ctx.batch_size > 0);
let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone());
let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone());
Ok(Box::new(iter))
}
@@ -81,14 +81,17 @@ impl Memtable for BTreeMemtable {
struct BTreeIterator {
ctx: IterContext,
/// Schema of this memtable.
schema: RegionSchemaRef,
/// Projected schema that user expect to read.
projected_schema: ProjectedSchemaRef,
map: Arc<RwLockMap>,
last_key: Option<InnerKey>,
}
impl BatchIterator for BTreeIterator {
fn schema(&self) -> RegionSchemaRef {
self.schema.clone()
fn schema(&self) -> ProjectedSchemaRef {
self.projected_schema.clone()
}
fn ordering(&self) -> RowOrdering {
@@ -106,9 +109,15 @@ impl Iterator for BTreeIterator {
impl BTreeIterator {
fn new(ctx: IterContext, schema: RegionSchemaRef, map: Arc<RwLockMap>) -> BTreeIterator {
let projected_schema = ctx
.projected_schema
.clone()
.unwrap_or_else(|| Arc::new(ProjectedSchema::no_projection(schema.clone())));
BTreeIterator {
ctx,
schema,
projected_schema,
map,
last_key: None,
}
@@ -142,17 +151,27 @@ impl BTreeIterator {
.schema
.row_key_columns()
.map(|column_meta| column_meta.desc.data_type.clone());
let key_needed = vec![true; self.schema.num_row_key_columns()];
let value_data_types = self
.schema
.value_columns()
.map(|column_meta| column_meta.desc.data_type.clone());
let value_needed: Vec<_> = self
.schema
.value_columns()
.map(|column_meta| self.projected_schema.is_needed(column_meta.id()))
.collect();
Some(Batch {
keys: rows_to_vectors(key_data_types, keys.as_slice()),
sequences,
op_types,
values: rows_to_vectors(value_data_types, values.as_slice()),
})
let key_columns = rows_to_vectors(key_data_types, &key_needed, keys.as_slice());
let value_columns = rows_to_vectors(value_data_types, &value_needed, values.as_slice());
let batch = self.projected_schema.batch_from_parts(
key_columns,
value_columns,
Arc::new(sequences),
Arc::new(op_types),
);
Some(batch)
}
}
@@ -384,6 +403,7 @@ impl<'a> RowsProvider for &'a [&RowValue] {
fn rows_to_vectors<I: Iterator<Item = ConcreteDataType>, T: RowsProvider>(
data_types: I,
column_needed: &[bool],
provider: T,
) -> Vec<VectorRef> {
if provider.is_empty() {
@@ -399,6 +419,10 @@ fn rows_to_vectors<I: Iterator<Item = ConcreteDataType>, T: RowsProvider>(
let mut vectors = Vec::with_capacity(column_num);
for (col_idx, builder) in builders.iter_mut().enumerate() {
if !column_needed[col_idx] {
continue;
}
for row_idx in 0..row_num {
let row = provider.row_by_index(row_idx);
let value = &row[col_idx];

View File

@@ -593,18 +593,18 @@ mod tests {
sequence: SequenceNumber,
data: &[(i64, Option<i64>)],
) {
let iter = mem.iter(IterContext::default()).unwrap();
let iter = mem.iter(&IterContext::default()).unwrap();
let mut index = 0;
for batch in iter {
let batch = batch.unwrap();
let row_num = batch.keys[0].len();
let row_num = batch.column(0).len();
for i in 0..row_num {
let ts = batch.keys[0].get(i);
let v = batch.values[0].get(i);
let ts = batch.column(0).get(i);
let v = batch.column(1).get(i);
assert_eq!(Value::from(data[index].0), ts);
assert_eq!(Value::from(data[index].1), v);
assert_eq!(sequence, batch.sequences.get_data(i).unwrap());
assert_eq!(Value::from(sequence), batch.column(2).get(i));
index += 1;
}

View File

@@ -1,10 +1,11 @@
use datatypes::arrow::array::{Int64Array, UInt64Array, UInt8Array};
use datatypes::prelude::*;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder};
use super::*;
use crate::metadata::RegionMetadata;
use crate::schema::RegionSchemaRef;
use crate::schema::{ProjectedSchema, RegionSchemaRef};
use crate::test_util::descriptor_util::RegionDescBuilder;
// For simplicity, all memtables in test share same memtable id.
@@ -12,11 +13,12 @@ const MEMTABLE_ID: MemtableId = 1;
// Schema for testing memtable:
// - key: Int64(timestamp), UInt64(version),
// - value: UInt64
// - value: UInt64, UInt64
pub fn schema_for_test() -> RegionSchemaRef {
// Just build a region desc and use its columns metadata.
let desc = RegionDescBuilder::new("test")
.enable_version_column(true)
.push_value_column(("v0", LogicalTypeId::UInt64, true))
.push_value_column(("v1", LogicalTypeId::UInt64, true))
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();
@@ -29,7 +31,7 @@ fn kvs_for_test_with_index(
op_type: OpType,
start_index_in_batch: usize,
keys: &[(i64, u64)],
values: &[Option<u64>],
values: &[(Option<u64>, Option<u64>)],
) -> KeyValues {
assert_eq!(keys.len(), values.len());
@@ -46,11 +48,18 @@ fn kvs_for_test_with_index(
Arc::new(key_builders.1.finish()) as _,
];
let mut value_builder = UInt64VectorBuilder::with_capacity(values.len());
let mut value_builders = (
UInt64VectorBuilder::with_capacity(values.len()),
UInt64VectorBuilder::with_capacity(values.len()),
);
for value in values {
value_builder.push(*value);
value_builders.0.push(value.0);
value_builders.1.push(value.1);
}
let row_values = vec![Arc::new(value_builder.finish()) as _];
let row_values = vec![
Arc::new(value_builders.0.finish()) as _,
Arc::new(value_builders.1.finish()) as _,
];
let kvs = KeyValues {
sequence,
@@ -70,7 +79,7 @@ fn kvs_for_test(
sequence: SequenceNumber,
op_type: OpType,
keys: &[(i64, u64)],
values: &[Option<u64>],
values: &[(Option<u64>, Option<u64>)],
) -> KeyValues {
kvs_for_test_with_index(sequence, op_type, 0, keys, values)
}
@@ -80,7 +89,7 @@ pub fn write_kvs(
sequence: SequenceNumber,
op_type: OpType,
keys: &[(i64, u64)],
values: &[Option<u64>],
values: &[(Option<u64>, Option<u64>)],
) {
let kvs = kvs_for_test(sequence, op_type, keys, values);
@@ -88,13 +97,11 @@ pub fn write_kvs(
}
fn check_batch_valid(batch: &Batch) {
assert_eq!(2, batch.keys.len());
assert_eq!(1, batch.values.len());
let row_num = batch.keys[0].len();
assert_eq!(row_num, batch.keys[1].len());
assert_eq!(row_num, batch.sequences.len());
assert_eq!(row_num, batch.op_types.len());
assert_eq!(row_num, batch.values[0].len());
assert_eq!(6, batch.num_columns());
let row_num = batch.column(0).len();
for i in 1..6 {
assert_eq!(row_num, batch.column(i).len());
}
}
fn check_iter_content(
@@ -102,25 +109,26 @@ fn check_iter_content(
keys: &[(i64, u64)],
sequences: &[u64],
op_types: &[OpType],
values: &[Option<u64>],
values: &[(Option<u64>, Option<u64>)],
) {
let mut index = 0;
for batch in iter {
let batch = batch.unwrap();
check_batch_valid(&batch);
let row_num = batch.keys[0].len();
let row_num = batch.column(0).len();
for i in 0..row_num {
let (k0, k1) = (batch.keys[0].get(i), batch.keys[1].get(i));
let sequence = batch.sequences.get_data(i).unwrap();
let op_type = batch.op_types.get_data(i).unwrap();
let v = batch.values[0].get(i);
let (k0, k1) = (batch.column(0).get(i), batch.column(1).get(i));
let (v0, v1) = (batch.column(2).get(i), batch.column(3).get(i));
let sequence = batch.column(4).get(i);
let op_type = batch.column(5).get(i);
assert_eq!(Value::from(keys[index].0), k0);
assert_eq!(Value::from(keys[index].1), k1);
assert_eq!(sequences[index], sequence);
assert_eq!(op_types[index].as_u8(), op_type);
assert_eq!(Value::from(values[index]), v);
assert_eq!(Value::from(values[index].0), v0);
assert_eq!(Value::from(values[index].1), v1);
assert_eq!(Value::from(sequences[index]), sequence);
assert_eq!(Value::from(op_types[index].as_u8()), op_type);
index += 1;
}
@@ -177,7 +185,7 @@ struct TestContext {
fn write_iter_memtable_case(ctx: &TestContext) {
// Test iterating an empty memtable.
let mut iter = ctx.memtable.iter(IterContext::default()).unwrap();
let mut iter = ctx.memtable.iter(&IterContext::default()).unwrap();
assert!(iter.next().is_none());
// Poll the empty iterator again.
assert!(iter.next().is_none());
@@ -196,17 +204,25 @@ fn write_iter_memtable_case(ctx: &TestContext) {
(2003, 5),
(1001, 1),
], // keys
&[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values
&[
(Some(1), None),
(Some(2), None),
(Some(7), None),
(Some(8), None),
(Some(9), None),
(Some(3), None),
], // values
);
write_kvs(
&*ctx.memtable,
11, // sequence
OpType::Put,
&[(1002, 1), (1003, 1), (1004, 1)], // keys
&[None, Some(5), None], // values
&[(1002, 1), (1003, 1), (1004, 1)], // keys
&[(None, None), (Some(5), None), (None, None)], // values
);
assert_eq!(216, ctx.memtable.bytes_allocated());
// 9 key value pairs (6 + 3).
assert_eq!(288, ctx.memtable.bytes_allocated());
let batch_sizes = [1, 4, 8, consts::READ_BATCH_SIZE];
for batch_size in batch_sizes {
@@ -214,8 +230,11 @@ fn write_iter_memtable_case(ctx: &TestContext) {
batch_size,
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
assert_eq!(ctx.schema, iter.schema());
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
assert_eq!(
ctx.schema.user_schema(),
iter.schema().projected_user_schema()
);
assert_eq!(RowOrdering::Key, iter.ordering());
check_iter_content(
@@ -244,15 +263,15 @@ fn write_iter_memtable_case(ctx: &TestContext) {
OpType::Put,
], // op_types
&[
Some(1),
Some(2),
Some(3),
None,
Some(5),
None,
Some(7),
Some(8),
Some(9),
(Some(1), None),
(Some(2), None),
(Some(3), None),
(None, None),
(Some(5), None),
(None, None),
(Some(7), None),
(Some(8), None),
(Some(9), None),
], // values
);
}
@@ -272,7 +291,7 @@ fn check_iter_batch_size(iter: &mut dyn BatchIterator, total: usize, batch_size:
let batch = batch.unwrap();
check_batch_valid(&batch);
let row_num = batch.keys[0].len();
let row_num = batch.column(0).len();
if remains >= batch_size {
assert_eq!(batch_size, row_num);
remains -= batch_size;
@@ -301,7 +320,14 @@ fn test_iter_batch_size() {
(2003, 1),
(2003, 5),
], // keys
&[Some(1), Some(2), Some(3), Some(4), None, None], // values
&[
(Some(1), None),
(Some(2), None),
(Some(3), None),
(Some(4), None),
(None, None),
(None, None),
], // values
);
let total = 6;
@@ -313,7 +339,7 @@ fn test_iter_batch_size() {
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_batch_size(&mut *iter, total, batch_size);
}
});
@@ -328,15 +354,15 @@ fn test_duplicate_key_across_batch() {
10, // sequence
OpType::Put,
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
&[Some(1), None, None, None], // values
&[(Some(1), None), (None, None), (None, None), (None, None)], // values
);
write_kvs(
&*ctx.memtable,
11, // sequence
OpType::Put,
&[(1000, 1), (2001, 2)], // keys
&[Some(1231), Some(1232)], // values
&[(1000, 1), (2001, 2)], // keys
&[(Some(1231), None), (Some(1232), None)], // values
);
let batch_sizes = [1, 2, 3, 4, 5];
@@ -346,13 +372,18 @@ fn test_duplicate_key_across_batch() {
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys
&[11, 10, 10, 11], // sequences
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put], // op_types
&[Some(1231), None, None, Some(1232)], // values
&[
(Some(1231), None),
(None, None),
(None, None),
(Some(1232), None),
], // values
);
}
});
@@ -367,7 +398,7 @@ fn test_duplicate_key_in_batch() {
10, // sequence
OpType::Put,
&[(1000, 1), (1000, 2), (1000, 1), (2001, 2)], // keys
&[None, None, Some(1234), None], // values
&[(None, None), (None, None), (Some(1234), None), (None, None)], // values
);
let batch_sizes = [1, 2, 3, 4, 5];
@@ -377,13 +408,13 @@ fn test_duplicate_key_in_batch() {
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2), (2001, 2)], // keys
&[10, 10, 10], // sequences
&[OpType::Put, OpType::Put, OpType::Put], // op_types
&[Some(1234), None, None, None], // values
&[(Some(1234), None), (None, None), (None, None), (None, None)], // values
);
}
});
@@ -397,24 +428,24 @@ fn test_sequence_visibility() {
&*ctx.memtable,
10, // sequence
OpType::Put,
&[(1000, 1), (1000, 2)], // keys
&[Some(1), Some(2)], // values
&[(1000, 1), (1000, 2)], // keys
&[(Some(1), None), (Some(2), None)], // values
);
write_kvs(
&*ctx.memtable,
11, // sequence
OpType::Put,
&[(1000, 1), (1000, 2)], // keys
&[Some(11), Some(12)], // values
&[(1000, 1), (1000, 2)], // keys
&[(Some(11), None), (Some(12), None)], // values
);
write_kvs(
&*ctx.memtable,
12, // sequence
OpType::Put,
&[(1000, 1), (1000, 2)], // keys
&[Some(21), Some(22)], // values
&[(1000, 1), (1000, 2)], // keys
&[(Some(21), None), (Some(22), None)], // values
);
{
@@ -422,9 +453,10 @@ fn test_sequence_visibility() {
batch_size: 1,
visible_sequence: 9,
for_flush: false,
projected_schema: None,
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[], // keys
@@ -439,15 +471,16 @@ fn test_sequence_visibility() {
batch_size: 1,
visible_sequence: 10,
for_flush: false,
projected_schema: None,
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2)], // keys
&[10, 10], // sequences
&[OpType::Put, OpType::Put], // op_types
&[Some(1), Some(2)], // values
&[(1000, 1), (1000, 2)], // keys
&[10, 10], // sequences
&[OpType::Put, OpType::Put], // op_types
&[(Some(1), None), (Some(2), None)], // values
);
}
@@ -456,15 +489,16 @@ fn test_sequence_visibility() {
batch_size: 1,
visible_sequence: 11,
for_flush: false,
projected_schema: None,
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[(1000, 1), (1000, 2)], // keys
&[11, 11], // sequences
&[OpType::Put, OpType::Put], // op_types
&[Some(11), Some(12)], // values
&[(1000, 1), (1000, 2)], // keys
&[11, 11], // sequences
&[OpType::Put, OpType::Put], // op_types
&[(Some(11), None), (Some(12), None)], // values
);
}
});
@@ -479,7 +513,7 @@ fn test_iter_after_none() {
10, // sequence
OpType::Put,
&[(1000, 0), (1001, 1), (1002, 2)], // keys
&[Some(0), Some(1), Some(2)], // values
&[(Some(0), None), (Some(1), None), (Some(2), None)], // values
);
let iter_ctx = IterContext {
@@ -487,9 +521,54 @@ fn test_iter_after_none() {
..Default::default()
};
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
assert!(iter.next().is_some());
assert!(iter.next().is_none());
assert!(iter.next().is_none());
});
}
#[test]
fn test_memtable_projection() {
let tester = MemtableTester::default();
// Only need v0, but row key columns and internal columns would also be read.
let projected_schema =
Arc::new(ProjectedSchema::new(tester.schema.clone(), Some(vec![2])).unwrap());
tester.run_testcase(|ctx| {
write_kvs(
&*ctx.memtable,
9, // sequence
OpType::Put,
&[(1000, 0), (1001, 1), (1002, 2)], // keys
&[
(Some(10), Some(20)),
(Some(11), Some(21)),
(Some(12), Some(22)),
], // values
);
let iter_ctx = IterContext {
batch_size: 4,
projected_schema: Some(projected_schema.clone()),
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let batch = iter.next().unwrap().unwrap();
assert!(iter.next().is_none());
assert_eq!(5, batch.num_columns());
let k0 = Int64Array::from_slice(&[1000, 1001, 1002]);
let k1 = UInt64Array::from_slice(&[0, 1, 2]);
let v0 = UInt64Array::from_slice(&[10, 11, 12]);
let sequences = UInt64Array::from_slice(&[9, 9, 9]);
let op_types = UInt8Array::from_slice(&[0, 0, 0]);
assert_eq!(k0, &*batch.column(0).to_arrow_array());
assert_eq!(k1, &*batch.column(1).to_arrow_array());
assert_eq!(v0, &*batch.column(2).to_arrow_array());
assert_eq!(sequences, &*batch.column(3).to_arrow_array());
assert_eq!(op_types, &*batch.column(4).to_arrow_array());
});
}

View File

@@ -267,7 +267,14 @@ mod tests {
(2003, 5),
(1001, 1),
], // keys
&[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values
&[
(Some(1), None),
(Some(2), None),
(Some(7), None),
(Some(8), None),
(Some(9), None),
(Some(3), None),
], // values
);
set.insert(RangeMillis::new(20, 30).unwrap(), memtable.clone());

View File

@@ -149,6 +149,18 @@ pub struct ColumnMetadata {
pub desc: ColumnDescriptor,
}
impl ColumnMetadata {
#[inline]
pub fn id(&self) -> ColumnId {
self.desc.id
}
#[inline]
pub fn name(&self) -> &str {
&self.desc.name
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ColumnsMetadata {
/// All columns.
@@ -221,6 +233,11 @@ impl ColumnsMetadata {
pub fn user_column_end(&self) -> usize {
self.user_column_end
}
#[inline]
pub fn column_metadata(&self, idx: usize) -> &ColumnMetadata {
&self.columns[idx]
}
}
pub type ColumnsMetadataRef = Arc<ColumnsMetadata>;

View File

@@ -1,18 +1,40 @@
//! Common structs and utilities for read.
use async_trait::async_trait;
use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef};
use datatypes::vectors::VectorRef;
use crate::error::Result;
// TODO(yingwen): Maybe pack op_type with sequence (reserve 8bits in u64 for op_type) like RocksDB.
/// Storage internal representation of a batch of rows.
// Now the structure of `Batch` is still unstable, all pub fields may be changed.
#[derive(Debug, Default)]
pub struct Batch {
// Now the structure of `Batch` is still unstable, all pub fields may be changed.
pub keys: Vec<VectorRef>,
pub sequences: UInt64Vector,
pub op_types: UInt8Vector,
pub values: Vec<VectorRef>,
/// Rows organized in columnar format.
///
/// Columns follow the same order convention of region schema:
/// key, value, internal columns.
columns: Vec<VectorRef>,
}
impl Batch {
pub fn new(columns: Vec<VectorRef>) -> Batch {
Batch { columns }
}
#[inline]
pub fn num_columns(&self) -> usize {
self.columns.len()
}
#[inline]
pub fn columns(&self) -> &[VectorRef] {
&self.columns
}
#[inline]
pub fn column(&self, idx: usize) -> &VectorRef {
&self.columns[idx]
}
}
/// Async batch reader.

View File

@@ -2,6 +2,7 @@
mod basic;
mod flush;
mod projection;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;

View File

@@ -0,0 +1,161 @@
use std::sync::Arc;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::Int64Vector;
use log_store::fs::log::LocalFileLogStore;
use store_api::logstore::LogStore;
use store_api::storage::{
Chunk, ChunkReader, PutOperation, ReadContext, Region, ScanRequest, Snapshot, WriteContext,
WriteRequest,
};
use tempdir::TempDir;
use crate::region::RegionImpl;
use crate::region::RegionMetadata;
use crate::test_util::{self, config_util, descriptor_util, write_batch_util};
use crate::write_batch::{PutData, WriteBatch};
/// Create metadata with schema (k0, timestamp, v0, v1)
fn new_metadata(region_name: &str) -> RegionMetadata {
let desc = descriptor_util::desc_with_value_columns(region_name, 2);
desc.try_into().unwrap()
}
fn new_write_batch_for_test() -> WriteBatch {
write_batch_util::new_write_batch(
&[
("k0", LogicalTypeId::Int64, false),
(test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false),
("v0", LogicalTypeId::Int64, true),
("v1", LogicalTypeId::Int64, true),
],
Some(1),
)
}
/// Build put data
///
/// ```text
/// k0: [key_start, key_start + 1, ... key_start + len - 1]
/// timestamp: [ts_start, ts_start + 1, ... ts_start + len - 1]
/// v0: [initial_value, ...., initial_value]
/// v1: [initial_value, ..., initial_value + len - 1]
/// ```
fn new_put_data(len: usize, key_start: i64, ts_start: i64, initial_value: i64) -> PutData {
let mut put_data = PutData::with_num_columns(4);
let k0 = Int64Vector::from_values((0..len).map(|v| key_start + v as i64));
let ts = Int64Vector::from_values((0..len).map(|v| ts_start + v as i64));
let v0 = Int64Vector::from_values(std::iter::repeat(initial_value).take(len));
let v1 = Int64Vector::from_values((0..len).map(|v| initial_value + v as i64));
put_data.add_key_column("k0", Arc::new(k0)).unwrap();
put_data
.add_key_column(test_util::TIMESTAMP_NAME, Arc::new(ts))
.unwrap();
put_data.add_value_column("v0", Arc::new(v0)).unwrap();
put_data.add_value_column("v1", Arc::new(v1)).unwrap();
put_data
}
fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<Vec<i64>>) {
if chunk.columns.is_empty() {
return;
}
let num_rows = chunk.columns[0].len();
dst.resize(num_rows, Vec::new());
for (i, row) in dst.iter_mut().enumerate() {
for col in &chunk.columns {
let val = col
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap()
.get_data(i)
.unwrap();
row.push(val);
}
}
}
struct ProjectionTester<S: LogStore> {
region: RegionImpl<S>,
write_ctx: WriteContext,
read_ctx: ReadContext,
}
impl<S: LogStore> ProjectionTester<S> {
fn with_region(region: RegionImpl<S>) -> ProjectionTester<S> {
ProjectionTester {
region,
write_ctx: WriteContext::default(),
read_ctx: ReadContext::default(),
}
}
async fn put(&self, len: usize, key_start: i64, ts_start: i64, initial_value: i64) {
let mut batch = new_write_batch_for_test();
let put_data = new_put_data(len, key_start, ts_start, initial_value);
batch.put(put_data).unwrap();
self.region.write(&self.write_ctx, batch).await.unwrap();
}
async fn scan(&self, projection: Option<Vec<usize>>) -> Vec<Vec<i64>> {
let snapshot = self.region.snapshot(&self.read_ctx).unwrap();
let request = ScanRequest {
projection,
..Default::default()
};
let resp = snapshot.scan(&self.read_ctx, request).await.unwrap();
let mut reader = resp.reader;
let mut dst = Vec::new();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
append_chunk_to(&chunk, &mut dst);
}
dst
}
}
const REGION_NAME: &str = "region-projection-0";
async fn new_tester(store_dir: &str) -> ProjectionTester<LocalFileLogStore> {
let metadata = new_metadata(REGION_NAME);
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
let region = RegionImpl::create(metadata, store_config).await.unwrap();
ProjectionTester::with_region(region)
}
#[tokio::test]
async fn test_projection_ordered() {
let dir = TempDir::new("projection-ordered").unwrap();
let store_dir = dir.path().to_str().unwrap();
let tester = new_tester(store_dir).await;
tester.put(4, 1, 10, 100).await;
// timestamp, v1
let output = tester.scan(Some(vec![1, 3])).await;
let expect = vec![vec![10, 100], vec![11, 101], vec![12, 102], vec![13, 103]];
assert_eq!(expect, output);
}
#[tokio::test]
async fn test_projection_unordered() {
let dir = TempDir::new("projection-unordered").unwrap();
let store_dir = dir.path().to_str().unwrap();
let tester = new_tester(store_dir).await;
tester.put(4, 1, 10, 100).await;
// v1, k0
let output = tester.scan(Some(vec![3, 0])).await;
let expect = vec![vec![100, 1], vec![101, 2], vec![102, 3], vec![103, 4]];
assert_eq!(expect, output);
}

View File

@@ -1,15 +1,15 @@
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use common_error::prelude::*;
use datatypes::arrow::array::Array;
use datatypes::arrow::chunk::Chunk;
use datatypes::arrow::chunk::Chunk as ArrowChunk;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::prelude::Vector;
use datatypes::schema::Metadata;
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector};
use datatypes::vectors::{Helper, VectorRef};
use serde::{Deserialize, Serialize};
use snafu::ensure;
use store_api::storage::{consts, ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use store_api::storage::{consts, Chunk, ColumnId, ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef};
use crate::read::Batch;
@@ -64,6 +64,9 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Invalid projection, {}", msg))]
InvalidProjection { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -99,7 +102,7 @@ pub struct RegionSchema {
impl RegionSchema {
pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result<RegionSchema> {
let user_schema = Arc::new(build_user_schema(&columns, version)?);
let sst_schema = SstSchema::new(&columns, version)?;
let sst_schema = SstSchema::from_columns_metadata(&columns, version)?;
debug_assert_eq!(user_schema.version(), sst_schema.version());
debug_assert_eq!(version, user_schema.version());
@@ -148,10 +151,37 @@ impl RegionSchema {
pub fn version(&self) -> u32 {
self.user_schema.version()
}
#[inline]
fn sequence_index(&self) -> usize {
self.sst_schema.sequence_index()
}
#[inline]
fn op_type_index(&self) -> usize {
self.sst_schema.op_type_index()
}
#[inline]
fn row_key_indices(&self) -> impl Iterator<Item = usize> {
self.sst_schema.row_key_indices()
}
#[inline]
fn column_metadata(&self, idx: usize) -> &ColumnMetadata {
self.columns.column_metadata(idx)
}
#[inline]
fn timestamp_key_index(&self) -> usize {
self.columns.timestamp_key_index()
}
}
pub type RegionSchemaRef = Arc<RegionSchema>;
// TODO(yingwen): Now this schema in not only used by SST, maybe rename it to InternalSchema
// or something else.
/// Schema of SST.
///
/// Only contains a reference to schema and some indices, so it should be cheap to clone.
@@ -163,37 +193,6 @@ pub struct SstSchema {
}
impl SstSchema {
fn new(columns: &ColumnsMetadata, version: u32) -> Result<SstSchema> {
let column_schemas: Vec<_> = columns
.iter_all_columns()
.map(|col| ColumnSchema::from(&col.desc))
.collect();
let schema = SchemaBuilder::from(column_schemas)
.timestamp_index(columns.timestamp_key_index())
.version(version)
.add_metadata(ROW_KEY_END_KEY, columns.row_key_end().to_string())
.add_metadata(USER_COLUMN_END_KEY, columns.user_column_end().to_string())
.build()
.context(BuildSchemaSnafu)?;
let user_column_end = columns.user_column_end();
assert_eq!(
consts::SEQUENCE_COLUMN_NAME,
schema.column_schemas()[user_column_end].name
);
assert_eq!(
consts::OP_TYPE_COLUMN_NAME,
schema.column_schemas()[user_column_end + 1].name
);
Ok(SstSchema {
schema: Arc::new(schema),
row_key_end: columns.row_key_end(),
user_column_end,
})
}
#[inline]
pub fn version(&self) -> u32 {
self.schema.version()
@@ -209,56 +208,71 @@ impl SstSchema {
self.schema.arrow_schema()
}
pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> Chunk<Arc<dyn Array>> {
assert_eq!(
self.schema.num_columns(),
// key columns + value columns + sequence + op_type
batch.keys.len() + batch.values.len() + 2
);
pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> ArrowChunk<Arc<dyn Array>> {
assert_eq!(self.schema.num_columns(), batch.num_columns());
Chunk::new(
batch
.keys
.iter()
.map(|v| v.to_arrow_array())
.chain(batch.values.iter().map(|v| v.to_arrow_array()))
.chain(std::iter::once(batch.sequences.to_arrow_array()))
.chain(std::iter::once(batch.op_types.to_arrow_array()))
.collect(),
ArrowChunk::new(batch.columns().iter().map(|v| v.to_arrow_array()).collect())
}
pub fn arrow_chunk_to_batch(&self, chunk: &ArrowChunk<Arc<dyn Array>>) -> Result<Batch> {
assert_eq!(self.schema.num_columns(), chunk.columns().len());
let columns = chunk
.iter()
.enumerate()
.map(|(i, column)| {
Helper::try_into_vector(column.clone()).context(ConvertChunkSnafu {
name: self.column_name(i),
})
})
.collect::<Result<_>>()?;
Ok(Batch::new(columns))
}
fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result<SstSchema> {
let column_schemas: Vec<_> = columns
.iter_all_columns()
.map(|col| ColumnSchema::from(&col.desc))
.collect();
SstSchema::new(
column_schemas,
version,
columns.timestamp_key_index(),
columns.row_key_end(),
columns.user_column_end(),
)
}
pub fn arrow_chunk_to_batch(&self, chunk: &Chunk<Arc<dyn Array>>) -> Result<Batch> {
let keys = self
.row_key_indices()
.map(|i| {
Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu {
name: self.column_name(i),
})
})
.collect::<Result<_>>()?;
let sequences = UInt64Vector::try_from_arrow_array(&chunk[self.sequence_index()].clone())
.context(ConvertChunkSnafu {
name: consts::SEQUENCE_COLUMN_NAME,
})?;
let op_types = UInt8Vector::try_from_arrow_array(&chunk[self.op_type_index()].clone())
.context(ConvertChunkSnafu {
name: consts::OP_TYPE_COLUMN_NAME,
})?;
let values = self
.value_indices()
.map(|i| {
Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu {
name: self.column_name(i),
})
})
.collect::<Result<_>>()?;
fn new(
column_schemas: Vec<ColumnSchema>,
version: u32,
timestamp_key_index: usize,
row_key_end: usize,
user_column_end: usize,
) -> Result<SstSchema> {
let schema = SchemaBuilder::from(column_schemas)
.timestamp_index(timestamp_key_index)
.version(version)
.add_metadata(ROW_KEY_END_KEY, row_key_end.to_string())
.add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string())
.build()
.context(BuildSchemaSnafu)?;
Ok(Batch {
keys,
sequences,
op_types,
values,
assert_eq!(
consts::SEQUENCE_COLUMN_NAME,
schema.column_schemas()[user_column_end].name
);
assert_eq!(
consts::OP_TYPE_COLUMN_NAME,
schema.column_schemas()[user_column_end + 1].name
);
Ok(SstSchema {
schema: Arc::new(schema),
row_key_end,
user_column_end,
})
}
@@ -278,13 +292,13 @@ impl SstSchema {
}
#[inline]
fn value_indices(&self) -> impl Iterator<Item = usize> {
self.row_key_end..self.user_column_end
fn column_name(&self, idx: usize) -> &str {
&self.schema.column_schemas()[idx].name
}
#[inline]
fn column_name(&self, idx: usize) -> &str {
&self.schema.column_schemas()[idx].name
fn num_columns(&self) -> usize {
self.schema.num_columns()
}
}
@@ -315,6 +329,288 @@ impl TryFrom<ArrowSchema> for SstSchema {
}
}
/// Metadata about projection.
#[derive(Debug, Default)]
struct Projection {
/// Column indices of projection.
projected_columns: Vec<usize>,
/// Sorted and deduplicated indices of columns to read, includes all row key columns
/// and internal columns.
///
/// We use these indices to read from data sources.
columns_to_read: Vec<usize>,
/// Maps column id to its index in `columns_to_read`.
///
/// Used to ask whether the column with given column id is needed in projection.
id_to_read_idx: HashMap<ColumnId, usize>,
/// Maps index of `projected_columns` to index of the column in `columns_to_read`.
///
/// Invariant:
/// - `projected_idx_to_read_idx.len() == projected_columns.len()`
projected_idx_to_read_idx: Vec<usize>,
/// Number of user columns to read.
num_user_columns: usize,
}
impl Projection {
fn new(region_schema: &RegionSchema, projected_columns: Vec<usize>) -> Projection {
// Get a sorted list of column indices to read.
let mut column_indices: BTreeSet<_> = projected_columns.iter().cloned().collect();
column_indices.extend(region_schema.row_key_indices());
let num_user_columns = column_indices.len();
// Now insert internal columns.
column_indices.extend([
region_schema.sequence_index(),
region_schema.op_type_index(),
]);
let columns_to_read: Vec<_> = column_indices.into_iter().collect();
// The region schema ensure that last two column must be internal columns.
assert_eq!(
region_schema.sequence_index(),
columns_to_read[num_user_columns]
);
assert_eq!(
region_schema.op_type_index(),
columns_to_read[num_user_columns + 1]
);
// Mapping: <column id> => <index in `columns_to_read`>
let id_to_read_idx: HashMap<_, _> = columns_to_read
.iter()
.enumerate()
.map(|(idx, col_idx)| (region_schema.column_metadata(*col_idx).id(), idx))
.collect();
// Use column id to find index in `columns_to_read` of a column in `projected_columns`.
let projected_idx_to_read_idx = projected_columns
.iter()
.map(|col_idx| {
let column_id = region_schema.column_metadata(*col_idx).id();
// This unwrap() should be safe since `columns_to_read` must contains all columns in `projected_columns`.
let read_idx = id_to_read_idx.get(&column_id).unwrap();
*read_idx
})
.collect();
Projection {
projected_columns,
columns_to_read,
id_to_read_idx,
projected_idx_to_read_idx,
num_user_columns,
}
}
}
/// Schema with projection info.
#[derive(Debug)]
pub struct ProjectedSchema {
/// Projection info, `None` means don't need to do projection.
projection: Option<Projection>,
/// Schema used to read from data sources.
schema_to_read: SstSchema,
/// User schema after projection.
projected_user_schema: SchemaRef,
}
pub type ProjectedSchemaRef = Arc<ProjectedSchema>;
impl ProjectedSchema {
/// Create a new `ProjectedSchema` with given `projected_columns`.
///
/// If `projected_columns` is None, then all columns would be read. If `projected_columns` is
/// `Some`, then the `Vec` in it contains the indices of columns need to be read.
///
/// If the `Vec` is empty or contains invalid index, `Err` would be returned.
pub fn new(
region_schema: RegionSchemaRef,
projected_columns: Option<Vec<usize>>,
) -> Result<ProjectedSchema> {
match projected_columns {
Some(indices) => {
Self::validate_projection(&region_schema, &indices)?;
let projection = Projection::new(&region_schema, indices);
let schema_to_read = Self::build_schema_to_read(&region_schema, &projection)?;
let projected_user_schema =
Self::build_projected_user_schema(&region_schema, &projection)?;
Ok(ProjectedSchema {
projection: Some(projection),
schema_to_read,
projected_user_schema,
})
}
None => Ok(ProjectedSchema::no_projection(region_schema)),
}
}
/// Create a `ProjectedSchema` that read all columns.
pub fn no_projection(region_schema: RegionSchemaRef) -> ProjectedSchema {
// We could just reuse the SstSchema and user schema.
ProjectedSchema {
projection: None,
schema_to_read: region_schema.sst_schema().clone(),
projected_user_schema: region_schema.user_schema().clone(),
}
}
#[inline]
pub fn projected_user_schema(&self) -> &SchemaRef {
&self.projected_user_schema
}
#[inline]
pub fn schema_to_read(&self) -> &SstSchema {
&self.schema_to_read
}
/// Convert [Batch] into [Chunk].
///
/// This will remove all internal columns. The input `batch` should has the
/// same schema as `self.schema_to_read()`.
pub fn batch_to_chunk(&self, batch: &Batch) -> Chunk {
let columns = match &self.projection {
Some(projection) => projection
.projected_idx_to_read_idx
.iter()
.map(|col_idx| batch.column(*col_idx))
.cloned()
.collect(),
None => {
let num_user_columns = self.projected_user_schema.num_columns();
batch
.columns()
.iter()
.take(num_user_columns)
.cloned()
.collect()
}
};
Chunk::new(columns)
}
/// Returns true if column with given `column_id` is needed (in projection).
pub fn is_needed(&self, column_id: ColumnId) -> bool {
self.projection
.as_ref()
.map(|p| p.id_to_read_idx.contains_key(&column_id))
.unwrap_or(true)
}
/// Construct a new [Batch] from row key, value, sequence and op_type.
///
/// # Panics
/// Panics if number of columns are not the same as this schema.
pub fn batch_from_parts(
&self,
row_key_columns: Vec<VectorRef>,
mut value_columns: Vec<VectorRef>,
sequences: VectorRef,
op_types: VectorRef,
) -> Batch {
// sequence and op_type
let num_internal_columns = 2;
assert_eq!(
self.schema_to_read.num_columns(),
row_key_columns.len() + value_columns.len() + num_internal_columns
);
let mut columns = row_key_columns;
// Reserve space for value, sequence and op_type
columns.reserve(value_columns.len() + num_internal_columns);
columns.append(&mut value_columns);
// Internal columns are push in sequence, op_type order.
columns.push(sequences);
columns.push(op_types);
Batch::new(columns)
}
fn build_schema_to_read(
region_schema: &RegionSchema,
projection: &Projection,
) -> Result<SstSchema> {
let column_schemas: Vec<_> = projection
.columns_to_read
.iter()
.map(|col_idx| ColumnSchema::from(&region_schema.column_metadata(*col_idx).desc))
.collect();
// All row key columns are reserved in this schema, so we can use the row_key_end
// and timestamp_key_index from region schema.
SstSchema::new(
column_schemas,
region_schema.version(),
region_schema.timestamp_key_index(),
region_schema.columns.row_key_end(),
projection.num_user_columns,
)
}
fn build_projected_user_schema(
region_schema: &RegionSchema,
projection: &Projection,
) -> Result<SchemaRef> {
let timestamp_index =
projection
.projected_columns
.iter()
.enumerate()
.find_map(|(idx, col_idx)| {
if *col_idx == region_schema.timestamp_key_index() {
Some(idx)
} else {
None
}
});
let column_schemas: Vec<_> = projection
.projected_columns
.iter()
.map(|col_idx| ColumnSchema::from(&region_schema.column_metadata(*col_idx).desc))
.collect();
let mut builder = SchemaBuilder::from(column_schemas).version(region_schema.version());
if let Some(timestamp_index) = timestamp_index {
builder = builder.timestamp_index(timestamp_index);
}
let schema = builder.build().context(BuildSchemaSnafu)?;
Ok(Arc::new(schema))
}
fn validate_projection(region_schema: &RegionSchema, indices: &[usize]) -> Result<()> {
// The projection indices should not be empty, at least the timestamp column
// should be always read, and the `SstSchema` also requires the timestamp column.
ensure!(
!indices.is_empty(),
InvalidProjectionSnafu {
msg: "at least one column should be read",
}
);
// Now only allowed to read user columns.
let user_schema = region_schema.user_schema();
for i in indices {
ensure!(
*i < user_schema.num_columns(),
InvalidProjectionSnafu {
msg: format!(
"index {} out of bound, only contains {} columns",
i,
user_schema.num_columns()
),
}
);
}
Ok(())
}
}
fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result<usize> {
let value = metadata.get(key).context(MissingMetaSnafu { key })?;
value.parse().context(ParseIndexSnafu { value })
@@ -341,88 +637,93 @@ mod tests {
use super::*;
use crate::metadata::RegionMetadata;
use crate::test_util::{descriptor_util::RegionDescBuilder, schema_util};
use crate::test_util::{descriptor_util, schema_util};
fn new_batch() -> Batch {
let k1 = Int64Vector::from_slice(&[1, 2, 3]);
let k0 = Int64Vector::from_slice(&[1, 2, 3]);
let timestamp = Int64Vector::from_slice(&[4, 5, 6]);
let v1 = Int64Vector::from_slice(&[7, 8, 9]);
let v0 = Int64Vector::from_slice(&[7, 8, 9]);
let sequences = UInt64Vector::from_slice(&[100, 100, 100]);
let op_types = UInt8Vector::from_slice(&[0, 0, 0]);
Batch {
keys: vec![Arc::new(k1), Arc::new(timestamp)],
values: vec![Arc::new(v1)],
sequences: UInt64Vector::from_slice(&[100, 100, 100]),
op_types: UInt8Vector::from_slice(&[0, 0, 0]),
}
Batch::new(vec![
Arc::new(k0),
Arc::new(timestamp),
Arc::new(v0),
Arc::new(sequences),
Arc::new(op_types),
])
}
fn check_chunk_batch(chunk: &Chunk<Arc<dyn Array>>, batch: &Batch) {
fn check_chunk_batch(chunk: &ArrowChunk<Arc<dyn Array>>, batch: &Batch) {
assert_eq!(5, chunk.columns().len());
assert_eq!(3, chunk.len());
for i in 0..2 {
assert_eq!(chunk[i], batch.keys[i].to_arrow_array());
for i in 0..5 {
assert_eq!(chunk[i], batch.column(i).to_arrow_array());
}
assert_eq!(chunk[2], batch.values[0].to_arrow_array());
assert_eq!(chunk[3], batch.sequences.to_arrow_array());
assert_eq!(chunk[4], batch.op_types.to_arrow_array());
}
fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema {
let metadata: RegionMetadata =
descriptor_util::desc_with_value_columns("test", num_value_columns)
.try_into()
.unwrap();
let columns = metadata.columns;
RegionSchema::new(columns, version).unwrap()
}
#[test]
fn test_region_schema() {
let desc = RegionDescBuilder::new("test")
.push_key_column(("k1", LogicalTypeId::Int64, false))
.push_value_column(("v1", LogicalTypeId::Int64, true))
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();
let region_schema = Arc::new(new_region_schema(123, 1));
let columns = metadata.columns;
let region_schema = RegionSchema::new(columns.clone(), 0).unwrap();
let expect_schema = schema_util::new_schema(
let expect_schema = schema_util::new_schema_with_version(
&[
("k1", LogicalTypeId::Int64, false),
("k0", LogicalTypeId::Int64, false),
("timestamp", LogicalTypeId::Int64, false),
("v1", LogicalTypeId::Int64, true),
("v0", LogicalTypeId::Int64, true),
],
Some(1),
123,
);
assert_eq!(expect_schema, **region_schema.user_schema());
// Checks row key column.
let mut row_keys = region_schema.row_key_columns();
assert_eq!("k1", row_keys.next().unwrap().desc.name);
assert_eq!("k0", row_keys.next().unwrap().desc.name);
assert_eq!("timestamp", row_keys.next().unwrap().desc.name);
assert_eq!(None, row_keys.next());
assert_eq!(2, region_schema.num_row_key_columns());
// Checks value column.
let mut values = region_schema.value_columns();
assert_eq!("v1", values.next().unwrap().desc.name);
assert_eq!("v0", values.next().unwrap().desc.name);
assert_eq!(None, values.next());
assert_eq!(1, region_schema.num_value_columns());
assert_eq!(0, region_schema.version());
{
let region_schema = RegionSchema::new(columns, 1234).unwrap();
assert_eq!(1234, region_schema.version());
assert_eq!(1234, region_schema.sst_schema().version());
}
// Checks version.
assert_eq!(123, region_schema.version());
assert_eq!(123, region_schema.sst_schema().version());
// Checks SstSchema.
let sst_schema = region_schema.sst_schema();
let sst_arrow_schema = sst_schema.arrow_schema();
let converted_sst_schema = SstSchema::try_from((**sst_arrow_schema).clone()).unwrap();
assert_eq!(*sst_schema, converted_sst_schema);
let expect_schema = schema_util::new_schema(
let expect_schema = schema_util::new_schema_with_version(
&[
("k1", LogicalTypeId::Int64, false),
("k0", LogicalTypeId::Int64, false),
("timestamp", LogicalTypeId::Int64, false),
("v1", LogicalTypeId::Int64, true),
("v0", LogicalTypeId::Int64, true),
(consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false),
(consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false),
],
Some(1),
123,
);
assert_eq!(
expect_schema.column_schemas(),
@@ -432,8 +733,6 @@ mod tests {
assert_eq!(4, sst_schema.op_type_index());
let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect();
assert_eq!([0, 1], &row_key_indices[..]);
let value_indices: Vec<_> = sst_schema.value_indices().collect();
assert_eq!([2], &value_indices[..]);
// Test batch and chunk conversion.
let batch = new_batch();
@@ -445,4 +744,165 @@ mod tests {
let converted_batch = sst_schema.arrow_chunk_to_batch(&chunk).unwrap();
check_chunk_batch(&chunk, &converted_batch);
}
#[test]
fn test_projection() {
// Build a region schema with 2 value columns. So the final user schema is
// (k0, timestamp, v0, v1)
let region_schema = new_region_schema(0, 2);
// Projection, but still keep column order.
// After projection: (timestamp, v0)
let projected_columns = vec![1, 2];
let projection = Projection::new(&region_schema, projected_columns.clone());
assert_eq!(projected_columns, projection.projected_columns);
// Need to read (k0, timestamp, v0, sequence, op_type)
assert_eq!(&[0, 1, 2, 4, 5], &projection.columns_to_read[..]);
assert_eq!(5, projection.id_to_read_idx.len());
// Index of timestamp, v0 in `columns_to_read`
assert_eq!(&[1, 2], &projection.projected_idx_to_read_idx[..]);
// 3 columns: k0, timestamp, v0
assert_eq!(3, projection.num_user_columns);
// Projection, unordered.
// After projection: (timestamp, v1, k0)
let projected_columns = vec![1, 3, 0];
let projection = Projection::new(&region_schema, projected_columns.clone());
assert_eq!(projected_columns, projection.projected_columns);
// Need to read (k0, timestamp, v1, sequence, op_type)
assert_eq!(&[0, 1, 3, 4, 5], &projection.columns_to_read[..]);
assert_eq!(5, projection.id_to_read_idx.len());
// Index of timestamp, v1, k0 in `columns_to_read`
assert_eq!(&[1, 2, 0], &projection.projected_idx_to_read_idx[..]);
// 3 columns: k0, timestamp, v1
assert_eq!(3, projection.num_user_columns);
// Empty projection.
let projection = Projection::new(&region_schema, Vec::new());
assert!(projection.projected_columns.is_empty());
// Still need to read row keys.
assert_eq!(&[0, 1, 4, 5], &projection.columns_to_read[..]);
assert_eq!(4, projection.id_to_read_idx.len());
assert!(projection.projected_idx_to_read_idx.is_empty());
assert_eq!(2, projection.num_user_columns);
}
#[test]
fn test_projected_schema_with_projection() {
// (k0, timestamp, v0, v1, v2)
let region_schema = Arc::new(new_region_schema(123, 3));
// After projection: (v1, timestamp)
let projected_schema =
ProjectedSchema::new(region_schema.clone(), Some(vec![3, 1])).unwrap();
let expect_user = schema_util::new_schema_with_version(
&[
("v1", LogicalTypeId::Int64, true),
("timestamp", LogicalTypeId::Int64, false),
],
Some(1),
123,
);
assert_eq!(expect_user, **projected_schema.projected_user_schema());
// Test is_needed
let needed: Vec<_> = region_schema
.columns
.iter_all_columns()
.enumerate()
.filter_map(|(idx, column_meta)| {
if projected_schema.is_needed(column_meta.id()) {
Some(idx)
} else {
None
}
})
.collect();
// (k0, timestamp, v1, sequence, op_type)
assert_eq!(&[0, 1, 3, 5, 6], &needed[..]);
// Use another projection.
// After projection: (v0, timestamp)
let projected_schema = ProjectedSchema::new(region_schema, Some(vec![2, 1])).unwrap();
// The schema to read should be same as region schema with (k0, timestamp, v0).
// We can't use `new_schema_with_version()` because the SstSchema also store other
// metadata that `new_schema_with_version()` can't store.
let expect_schema = new_region_schema(123, 1);
assert_eq!(
expect_schema.sst_schema(),
projected_schema.schema_to_read()
);
// (k0, timestamp, v0, sequence, op_type)
let batch = new_batch();
// Test Batch to our Chunk.
// (v0, timestamp)
let chunk = projected_schema.batch_to_chunk(&batch);
assert_eq!(2, chunk.columns.len());
assert_eq!(
chunk.columns[0].to_arrow_array(),
batch.column(2).to_arrow_array()
);
assert_eq!(
chunk.columns[1].to_arrow_array(),
batch.column(1).to_arrow_array()
);
// Test batch_from_parts
let keys = batch.columns()[0..2].to_vec();
let values = batch.columns()[2..3].to_vec();
let created = projected_schema.batch_from_parts(
keys,
values,
batch.column(3).clone(),
batch.column(4).clone(),
);
assert_eq!(5, created.num_columns());
for i in 0..5 {
assert_eq!(
batch.column(i).to_arrow_array(),
created.column(i).to_arrow_array()
);
}
}
#[test]
fn test_projected_schema_no_projection() {
// (k0, timestamp, v0)
let region_schema = Arc::new(new_region_schema(123, 1));
let projected_schema = ProjectedSchema::no_projection(region_schema.clone());
assert_eq!(
region_schema.user_schema(),
projected_schema.projected_user_schema()
);
assert_eq!(
region_schema.sst_schema(),
projected_schema.schema_to_read()
);
for column in region_schema.columns.iter_all_columns() {
assert!(projected_schema.is_needed(column.id()));
}
// (k0, timestamp, v0, sequence, op_type)
let batch = new_batch();
// Test Batch to our Chunk.
// (k0, timestamp, v0)
let chunk = projected_schema.batch_to_chunk(&batch);
assert_eq!(3, chunk.columns.len());
}
#[test]
fn test_projected_schema_empty_projection() {
// (k0, timestamp, v0)
let region_schema = Arc::new(new_region_schema(123, 1));
let err = ProjectedSchema::new(region_schema, Some(Vec::new()))
.err()
.unwrap();
assert!(matches!(err, Error::InvalidProjection { .. }));
}
}

View File

@@ -8,7 +8,6 @@ use store_api::storage::{
use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl};
use crate::error::{Error, Result};
use crate::memtable::IterContext;
use crate::sst::AccessLayerRef;
use crate::version::VersionRef;
@@ -41,17 +40,15 @@ impl Snapshot for SnapshotImpl {
let immutables = memtable_version.immutable_memtables();
let mut builder =
ChunkReaderBuilder::new(self.version.user_schema().clone(), self.sst_layer.clone())
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
.reserve_num_memtables(memtable_version.num_memtables())
.iter_ctx(IterContext {
batch_size: ctx.batch_size,
visible_sequence,
..Default::default()
})
.pick_memtables(mutables)?;
.projection(request.projection)
.batch_size(ctx.batch_size)
.visible_sequence(visible_sequence)
.pick_memtables(mutables);
for mem_set in immutables {
builder = builder.pick_memtables(mem_set)?;
builder = builder.pick_memtables(mem_set);
}
let reader = builder.pick_ssts(&**self.version.ssts())?.build().await?;

View File

@@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::read::BoxedBatchReader;
use crate::schema::ProjectedSchemaRef;
use crate::sst::parquet::{ParquetReader, ParquetWriter};
/// Maximum level of SSTs.
@@ -173,6 +174,9 @@ pub struct WriteOptions {
pub struct ReadOptions {
/// Suggested size of each batch.
pub batch_size: usize,
/// The schema that user expected to read, might not the same as the
/// schema of the SST file.
pub projected_schema: ProjectedSchemaRef,
}
/// SST access layer.
@@ -186,8 +190,7 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
opts: &WriteOptions,
) -> Result<()>;
/// Read SST file with given `file_name`.
// TODO(yingwen): Read SST according to scan request and returns a chunk stream.
/// Read SST file with given `file_name` and schema.
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader>;
}
@@ -233,9 +236,13 @@ impl AccessLayer for FsAccessLayer {
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader> {
let file_path = self.sst_file_path(file_name);
let reader = ParquetReader::new(&file_path, self.object_store.clone());
let reader = ParquetReader::new(
&file_path,
self.object_store.clone(),
opts.projected_schema.clone(),
);
let stream = reader.chunk_stream(None, opts.batch_size).await?;
let stream = reader.chunk_stream(opts.batch_size).await?;
Ok(Box::new(stream))
}
}

View File

@@ -23,7 +23,7 @@ use snafu::ResultExt;
use crate::error::{self, Result};
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchReader};
use crate::schema::SstSchema;
use crate::schema::{ProjectedSchemaRef, SstSchema};
use crate::sst;
/// Parquet sst writer.
@@ -54,8 +54,8 @@ impl<'a> ParquetWriter<'a> {
/// A chunk of records yielded from each iteration with a size given
/// in config will be written to a single row group.
async fn write_rows(self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
let region_schema = self.iter.schema();
let sst_schema = region_schema.sst_schema();
let projected_schema = self.iter.schema();
let sst_schema = projected_schema.schema_to_read();
let schema = sst_schema.arrow_schema();
let object = self.object_store.object(self.file_path);
@@ -156,28 +156,26 @@ fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
pub struct ParquetReader<'a> {
file_path: &'a str,
object_store: ObjectStore,
projected_schema: ProjectedSchemaRef,
}
type ReaderFactoryFuture<'a, R> =
Pin<Box<dyn futures_util::Future<Output = std::io::Result<R>> + Send + 'a>>;
pub type FieldProjection = Box<dyn Fn(&Schema) -> Vec<Field> + Send + Sync>;
impl<'a> ParquetReader<'a> {
pub fn new(file_path: &str, object_store: ObjectStore) -> ParquetReader {
pub fn new(
file_path: &str,
object_store: ObjectStore,
projected_schema: ProjectedSchemaRef,
) -> ParquetReader {
ParquetReader {
file_path,
object_store,
projected_schema,
}
}
// TODO(yingwen): Projection is not supported now, since field index would change after projection.
// To support projection, we may need to implement some helper methods in schema.
pub async fn chunk_stream(
&self,
_projection: Option<FieldProjection>,
chunk_size: usize,
) -> Result<ChunkStream> {
pub async fn chunk_stream(&self, chunk_size: usize) -> Result<ChunkStream> {
let file_path = self.file_path.to_string();
let operator = self.object_store.clone();
let reader_factory = move || -> ReaderFactoryFuture<SeekableReader> {
@@ -195,12 +193,12 @@ impl<'a> ParquetReader<'a> {
.context(error::ReadParquetSnafu { file: &file_path })?;
let arrow_schema =
infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?;
// Just read all fields.
let projected_fields = arrow_schema.fields.clone();
let sst_schema = SstSchema::try_from(arrow_schema)
// Now the SstSchema is only used to validate metadata of the parquet file, but this schema
// would be useful once we support altering schema, as this is the actual schema of the SST.
let _sst_schema = SstSchema::try_from(arrow_schema)
.context(error::ConvertSstSchemaSnafu { file: &file_path })?;
let projected_fields = self.projected_fields().to_vec();
let chunk_stream = try_stream!({
for rg in metadata.row_groups {
let column_chunks = read_columns_many_async(
@@ -221,20 +219,27 @@ impl<'a> ParquetReader<'a> {
}
});
ChunkStream::new(sst_schema, Box::pin(chunk_stream))
ChunkStream::new(self.projected_schema.clone(), Box::pin(chunk_stream))
}
fn projected_fields(&self) -> &[Field] {
&self.projected_schema.schema_to_read().arrow_schema().fields
}
}
pub type SendableChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk<Arc<dyn Array>>>> + Send>>;
pub struct ChunkStream {
schema: SstSchema,
projected_schema: ProjectedSchemaRef,
stream: SendableChunkStream,
}
impl ChunkStream {
pub fn new(schema: SstSchema, stream: SendableChunkStream) -> Result<Self> {
Ok(Self { schema, stream })
pub fn new(projected_schema: ProjectedSchemaRef, stream: SendableChunkStream) -> Result<Self> {
Ok(Self {
projected_schema,
stream,
})
}
}
@@ -245,7 +250,8 @@ impl BatchReader for ChunkStream {
.try_next()
.await?
.map(|chunk| {
self.schema
self.projected_schema
.schema_to_read()
.arrow_chunk_to_batch(&chunk)
.context(error::InvalidParquetSchemaSnafu)
})
@@ -284,7 +290,14 @@ mod tests {
(2003, 5),
(1001, 1),
], // keys
&[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values
&[
(Some(1), Some(1234)),
(Some(2), Some(1234)),
(Some(7), Some(1234)),
(Some(8), Some(1234)),
(Some(9), Some(1234)),
(Some(3), Some(1234)),
], // values
);
let dir = TempDir::new("write_parquet").unwrap();
@@ -292,7 +305,7 @@ mod tests {
let backend = Backend::build().root(path).finish().await.unwrap();
let object_store = ObjectStore::new(backend);
let sst_file_name = "test-flush.parquet";
let iter = memtable.iter(IterContext::default()).unwrap();
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, iter, object_store);
writer
@@ -307,7 +320,7 @@ mod tests {
// chunk schema: timestamp, __version, v1, __sequence, __op_type
let chunk = file_reader.next().unwrap().unwrap();
assert_eq!(5, chunk.arrays().len());
assert_eq!(6, chunk.arrays().len());
// timestamp
assert_eq!(
@@ -323,22 +336,30 @@ mod tests {
chunk.arrays()[1]
);
// v1
// v0
assert_eq!(
Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc<dyn Array>,
chunk.arrays()[2]
);
// v1
assert_eq!(
Arc::new(UInt64Array::from_slice(&[
1234, 1234, 1234, 1234, 1234, 1234
])) as Arc<dyn Array>,
chunk.arrays()[3]
);
// sequence
assert_eq!(
Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc<dyn Array>,
chunk.arrays()[3]
chunk.arrays()[4]
);
// op_type
assert_eq!(
Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc<dyn Array>,
chunk.arrays()[4]
chunk.arrays()[5]
);
}
}

View File

@@ -1,4 +1,5 @@
use datatypes::prelude::ConcreteDataType;
use datatypes::type_id::LogicalTypeId;
use store_api::storage::{
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId,
RegionDescriptor, RegionId, RowKeyDescriptorBuilder,
@@ -99,3 +100,14 @@ impl RegionDescBuilder {
.unwrap()
}
}
/// Create desc with schema (k0, timestamp, v0, ... vn-1)
pub fn desc_with_value_columns(region_name: &str, num_value_columns: usize) -> RegionDescriptor {
let mut builder =
RegionDescBuilder::new(region_name).push_key_column(("k0", LogicalTypeId::Int64, false));
for i in 0..num_value_columns {
let name = format!("v{}", i);
builder = builder.push_value_column((&name, LogicalTypeId::Int64, true));
}
builder.build()
}

View File

@@ -11,24 +11,21 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader};
fn new_kv_batch(key_values: &[(i64, Option<i64>)]) -> Batch {
let key = Arc::new(Int64Vector::from_values(key_values.iter().map(|v| v.0)));
let value = Arc::new(Int64Vector::from_iter(key_values.iter().map(|v| v.1)));
let sequences = UInt64Vector::from_vec(vec![0; key_values.len()]);
let op_types = UInt8Vector::from_vec(vec![0; key_values.len()]);
let sequences = Arc::new(UInt64Vector::from_vec(vec![0; key_values.len()]));
let op_types = Arc::new(UInt8Vector::from_vec(vec![0; key_values.len()]));
Batch {
keys: vec![key],
sequences,
op_types,
values: vec![value],
}
Batch::new(vec![key, value, sequences, op_types])
}
fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option<i64>)]]) {
for (batch, key_values) in batches.iter().zip(expect.iter()) {
let key = batch.keys[0]
let key = batch
.column(0)
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let value = batch.values[0]
let value = batch
.column(1)
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();

View File

@@ -7,7 +7,15 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> Schema {
let column_schemas = column_defs
new_schema_with_version(column_defs, timestamp_index, 0)
}
pub fn new_schema_with_version(
column_defs: &[ColumnDef],
timestamp_index: Option<usize>,
version: u32,
) -> Schema {
let column_schemas: Vec<_> = column_defs
.iter()
.map(|column_def| {
let datatype = column_def.1.data_type();
@@ -15,14 +23,11 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) ->
})
.collect();
let mut builder = SchemaBuilder::from(column_schemas).version(version);
if let Some(index) = timestamp_index {
SchemaBuilder::from(column_schemas)
.timestamp_index(index)
.build()
.unwrap()
} else {
Schema::new(column_schemas)
builder = builder.timestamp_index(index);
}
builder.build().unwrap()
}
pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> SchemaRef {

View File

@@ -41,6 +41,8 @@ pub struct ScanRequest {
/// Default is None. Only returns data whose sequence number is less than or
/// equal to the `sequence`.
pub sequence: Option<SequenceNumber>,
/// Indices of columns to read, `None` to read all columns.
pub projection: Option<Vec<usize>>,
}
#[derive(Debug)]

View File

@@ -105,15 +105,19 @@ impl<R: Region> Table for MitoTable<R> {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<SendableRecordBatchStream> {
let read_ctx = ReadContext::default();
let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?;
let scan_request = ScanRequest {
projection: projection.clone(),
..Default::default()
};
let mut reader = snapshot
.scan(&read_ctx, ScanRequest::default())
.scan(&read_ctx, scan_request)
.await
.map_err(TableError::new)?
.reader;