feat: implement read framework (#108)

* feat: implement read framework

feat: chunk reader builder

refactor: rename BatchIteratorPtr to BoxedBatchIterator

feat: BatchReader to read batch from ssts

feat: Add a ConcatReader to concat sst readers

test: Add tests for concat reader

chore: Fix clippy

* feat: implement SST parquet reader (#109)

* feat: implement parquet sst reader

* chores: fix some CR comments

* gst

* fix sst writer flush issue

* feat: Implement FsAccessLayer::read_sst

* fix: remove lifetime from ChunkStream

* refactor: Store file name in FileMeta

- Store file name instead of path (`region-name/file-name`) in FileMeta.
- `AccessLayer::read()` takes file name instead of path, so the read/write api are consistent

Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
evenyag
2022-07-28 11:46:51 +08:00
committed by GitHub
parent 3b2716ed70
commit 03e965954a
22 changed files with 751 additions and 100 deletions

1
Cargo.lock generated
View File

@@ -3483,6 +3483,7 @@ version = "0.1.0"
dependencies = [
"arc-swap",
"arrow-format",
"async-stream",
"async-trait",
"atomic_float",
"bit-vec",

View File

@@ -1,6 +1,6 @@
pub use opendal::{
Accessor, DirEntry, DirStreamer, Layer, Metadata, Object, ObjectMetadata, ObjectMode,
Operator as ObjectStore,
io_util::SeekableReader, Accessor, DirEntry, DirStreamer, Layer, Metadata, Object,
ObjectMetadata, ObjectMode, Operator as ObjectStore,
};
pub mod backend;
pub mod util;

View File

@@ -9,6 +9,7 @@ edition = "2021"
arc-swap = "1.0"
arrow-format = { version = "0.4", features = ["ipc"] }
async-trait = "0.1"
async-stream = "0.3"
bit-vec = "0.6"
bytes = "1.1"
common-error = { path = "../common/error" }

View File

@@ -2,14 +2,20 @@ use async_trait::async_trait;
use store_api::storage::{Chunk, ChunkReader, SchemaRef};
use crate::error::{Error, Result};
use crate::memtable::Batch;
use crate::memtable::{BoxedBatchIterator, IterContext, MemtableSet};
use crate::read::{Batch, BatchReader, ConcatReader};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
type IteratorPtr = Box<dyn Iterator<Item = Result<Batch>> + Send>;
type BoxedIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
/// Chunk reader implementation.
// Now we use async-trait to implement the chunk reader, which is easier to implement than
// using `Stream`, maybe change to `Stream` if we find out it is more efficient and have
// necessary to do so.
pub struct ChunkReaderImpl {
schema: SchemaRef,
// Now we only read data from memtables, so we just holds the iterator here.
iter: IteratorPtr,
iter: Option<BoxedIterator>,
sst_reader: ConcatReader,
}
#[async_trait]
@@ -21,23 +27,133 @@ impl ChunkReader for ChunkReaderImpl {
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
let mut batch = match self.iter.next() {
Some(b) => b?,
let batch = match self.fetch_batch().await? {
Some(b) => b,
None => return Ok(None),
};
// TODO(yingwen): Check schema, now we assumes the schema is the same as key columns
// combine with value columns.
let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len());
columns.append(&mut batch.keys);
columns.append(&mut batch.values);
// TODO(yingwen): Check schema.
let chunk = batch_to_chunk(batch);
Ok(Some(Chunk::new(columns)))
Ok(Some(chunk))
}
}
impl ChunkReaderImpl {
pub fn new(schema: SchemaRef, iter: IteratorPtr) -> ChunkReaderImpl {
ChunkReaderImpl { schema, iter }
pub fn new(
schema: SchemaRef,
iter: BoxedIterator,
sst_reader: ConcatReader,
) -> ChunkReaderImpl {
ChunkReaderImpl {
schema,
iter: Some(iter),
sst_reader,
}
}
async fn fetch_batch(&mut self) -> Result<Option<Batch>> {
if let Some(iter) = &mut self.iter {
match iter.next() {
Some(b) => return Ok(Some(b?)),
None => self.iter = None,
}
}
self.sst_reader.next_batch().await
}
}
// Assumes the schema is the same as key columns combine with value columns.
fn batch_to_chunk(mut batch: Batch) -> Chunk {
let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len());
columns.append(&mut batch.keys);
columns.append(&mut batch.values);
Chunk::new(columns)
}
/// Builder to create a new [ChunkReaderImpl] from scan request.
pub struct ChunkReaderBuilder {
schema: SchemaRef,
sst_layer: AccessLayerRef,
iter_ctx: IterContext,
iters: Vec<BoxedBatchIterator>,
files_to_read: Vec<FileHandle>,
}
impl ChunkReaderBuilder {
pub fn new(schema: SchemaRef, sst_layer: AccessLayerRef) -> Self {
ChunkReaderBuilder {
schema,
iter_ctx: IterContext::default(),
iters: Vec::new(),
sst_layer,
files_to_read: Vec::new(),
}
}
/// Reserve space for iterating `num` memtables.
pub fn reserve_num_memtables(mut self, num: usize) -> Self {
self.iters.reserve(num);
self
}
pub fn iter_ctx(mut self, iter_ctx: IterContext) -> Self {
self.iter_ctx = iter_ctx;
self
}
pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Result<Self> {
for (_range, mem) in memtables.iter() {
let iter = mem.iter(self.iter_ctx.clone())?;
self.iters.push(iter);
}
Ok(self)
}
pub fn pick_ssts(mut self, ssts: &LevelMetas) -> Result<Self> {
ssts.visit_levels(&mut self)?;
Ok(self)
}
pub async fn build(self) -> Result<ChunkReaderImpl> {
// Now we just simply chain all iterators together, ignore duplications/ordering.
let iter = Box::new(self.iters.into_iter().flatten());
let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,
};
let mut sst_readers = Vec::with_capacity(self.files_to_read.len());
for file in &self.files_to_read {
let reader = self
.sst_layer
.read_sst(file.file_name(), &read_opts)
.await?;
sst_readers.push(reader);
}
let reader = ConcatReader::new(sst_readers);
Ok(ChunkReaderImpl::new(self.schema, iter, reader))
}
}
impl Visitor for ChunkReaderBuilder {
fn visit(&mut self, _level: usize, files: &[FileHandle]) -> Result<()> {
// TODO(yingwen): Filter files by time range.
// Now we read all files, so just reserve enough space to hold all files.
self.files_to_read.reserve(files.len());
for file in files {
// We can't invoke async functions here, so we collects all files first, and
// create the batch reader later in `ChunkReaderBuilder`.
self.files_to_read.push(file.clone());
}
Ok(())
}
}

View File

@@ -4,6 +4,7 @@ use std::str::Utf8Error;
use common_error::prelude::*;
use datatypes::arrow;
use datatypes::arrow::error::ArrowError;
use serde_json::error::Error as JsonError;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
@@ -171,6 +172,30 @@ pub enum Error {
#[snafu(display("Failed to read line, err: {}", source))]
Readline { source: IoError },
#[snafu(display("Failed to read Parquet file: {}, source: {}", file, source))]
ReadParquet {
file: String,
source: ArrowError,
backtrace: Backtrace,
},
#[snafu(display("IO failed while reading Parquet file: {}, source: {}", file, source))]
ReadParquetIo {
file: String,
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Parquet file schema is not valid: {}", msg))]
SequenceColumnNotFound { msg: String, backtrace: Backtrace },
#[snafu(display("Parquet file schema is not valid, msg: {}, source: {}", msg, source))]
InvalidParquetSchema {
msg: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -193,7 +218,9 @@ impl ErrorExt for Error {
| JoinTask { .. }
| Cancelled { .. }
| DecodeRegionMetaActionList { .. }
| Readline { .. } => StatusCode::Unexpected,
| Readline { .. }
| InvalidParquetSchema { .. }
| SequenceColumnNotFound { .. } => StatusCode::Unexpected,
FlushIo { .. }
| InitBackend { .. }
@@ -206,7 +233,9 @@ impl ErrorExt for Error {
| DecodeWalHeader { .. }
| EncodeWalHeader { .. }
| ManifestProtocolForbidRead { .. }
| ManifestProtocolForbidWrite { .. } => StatusCode::StorageUnavailable,
| ManifestProtocolForbidWrite { .. }
| ReadParquet { .. }
| ReadParquetIo { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -177,8 +177,13 @@ impl<S: LogStore> FlushJob<S> {
let iter = m.memtable.iter(iter_ctx)?;
futures.push(async move {
self.sst_layer
.write_sst(&file_name, iter, WriteOptions::default())
.await
.write_sst(&file_name, iter, &WriteOptions::default())
.await?;
Ok(FileMeta {
file_name,
level: 0,
})
});
}
@@ -187,10 +192,6 @@ impl<S: LogStore> FlushJob<S> {
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|f| FileMeta {
file_path: f,
level: 0,
})
.collect();
logging::info!("Successfully flush memtables to files: {:?}", metas);

View File

@@ -11,6 +11,7 @@ mod manifest;
pub mod memtable;
pub mod metadata;
mod proto;
mod read;
mod region;
mod snapshot;
mod sst;

View File

@@ -183,16 +183,16 @@ mod tests {
flush_sequence: 99,
files_to_add: vec![
FileMeta {
file_path: "test1".to_string(),
file_name: "test1".to_string(),
level: 1,
},
FileMeta {
file_path: "test2".to_string(),
file_name: "test2".to_string(),
level: 2,
},
],
files_to_remove: vec![FileMeta {
file_path: "test0".to_string(),
file_name: "test0".to_string(),
level: 0,
}],
}),
@@ -202,7 +202,7 @@ mod tests {
let bs = action_list.encode().unwrap();
// {"prev_version":3}
// {"Protocol":{"min_reader_version":1,"min_writer_version":0}}
// {"Edit":{"region_id":1,"region_version":10,"flush_sequence":99,"files_to_add":[{"file_path":"test1","level":1},{"file_path":"test2","level":2}],"files_to_remove":[{"file_path":"test0","level":0}]}}
// {"Edit":{"region_id":1,"region_version":10,"flush_sequence":99,"files_to_add":[{"file_name":"test1","level":1},{"file_name":"test2","level":2}],"files_to_remove":[{"file_name":"test0","level":0}]}}
logging::debug!(
"Encoded action list: \r\n{}",

View File

@@ -7,7 +7,7 @@ mod version;
use std::sync::Arc;
use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef};
use datatypes::vectors::VectorRef;
use store_api::storage::{consts, SequenceNumber, ValueType};
use crate::error::Result;
@@ -15,6 +15,7 @@ use crate::memtable::btree::BTreeMemtable;
pub use crate::memtable::inserter::Inserter;
pub use crate::memtable::schema::MemtableSchema;
pub use crate::memtable::version::{MemtableSet, MemtableVersion};
use crate::read::Batch;
/// Unique id for memtables under same region.
pub type MemtableId = u32;
@@ -33,7 +34,7 @@ pub trait Memtable: Send + Sync + std::fmt::Debug {
/// Iterates the memtable.
// TODO(yingwen): 1. Use reference of IterContext? 2. Consider passing a projector (does column projection).
fn iter(&self, ctx: IterContext) -> Result<BatchIteratorPtr>;
fn iter(&self, ctx: IterContext) -> Result<BoxedBatchIterator>;
/// Returns the estimated bytes allocated by this memtable from heap.
fn bytes_allocated(&self) -> usize;
@@ -76,15 +77,10 @@ pub enum RowOrdering {
Key,
}
// TODO(yingwen): Maybe pack value_type with sequence (reserve 8bits in u64 for value type) like RocksDB.
pub struct Batch {
pub keys: Vec<VectorRef>,
pub sequences: UInt64Vector,
pub value_types: UInt8Vector,
pub values: Vec<VectorRef>,
}
/// Iterator of memtable.
///
/// Since data of memtable are stored in memory, so avoid defining this trait
/// as an async trait.
pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
/// Returns the schema of this iterator.
fn schema(&self) -> &MemtableSchema;
@@ -93,7 +89,7 @@ pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
fn ordering(&self) -> RowOrdering;
}
pub type BatchIteratorPtr = Box<dyn BatchIterator>;
pub type BoxedBatchIterator = Box<dyn BatchIterator>;
pub trait MemtableBuilder: Send + Sync + std::fmt::Debug {
fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef;

View File

@@ -15,9 +15,10 @@ use store_api::storage::{SequenceNumber, ValueType};
use crate::error::Result;
use crate::memtable::{
Batch, BatchIterator, BatchIteratorPtr, IterContext, KeyValues, Memtable, MemtableId,
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId,
MemtableSchema, RowOrdering,
};
use crate::read::Batch;
type RwLockMap = RwLock<BTreeMap<InnerKey, RowValue>>;
@@ -65,7 +66,7 @@ impl Memtable for BTreeMemtable {
Ok(())
}
fn iter(&self, ctx: IterContext) -> Result<BatchIteratorPtr> {
fn iter(&self, ctx: IterContext) -> Result<BoxedBatchIterator> {
assert!(ctx.batch_size > 0);
let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone());

126
src/storage/src/read.rs Normal file
View File

@@ -0,0 +1,126 @@
//! Common structs and utilities for read.
use async_trait::async_trait;
use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef};
use crate::error::Result;
// TODO(yingwen): Maybe pack value_type with sequence (reserve 8bits in u64 for value type) like RocksDB.
/// Storage internal representation of a batch of rows.
pub struct Batch {
// Now the structure of `Batch` is still unstable, all pub fields may be changed.
pub keys: Vec<VectorRef>,
pub sequences: UInt64Vector,
pub value_types: UInt8Vector,
pub values: Vec<VectorRef>,
}
/// Async batch reader.
#[async_trait]
pub trait BatchReader: Send {
// TODO(yingwen): Schema of batch.
/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
/// again won't return batch again.
///
/// If `Err` is returned, caller should not call this method again, the implementor
/// may or may not panic in such case.
async fn next_batch(&mut self) -> Result<Option<Batch>>;
}
/// Pointer to [BatchReader].
pub type BoxedBatchReader = Box<dyn BatchReader>;
/// Concat reader inputs.
pub struct ConcatReader {
readers: Vec<BoxedBatchReader>,
curr_idx: usize,
}
impl ConcatReader {
pub fn new(readers: Vec<BoxedBatchReader>) -> ConcatReader {
ConcatReader {
readers,
curr_idx: 0,
}
}
}
#[async_trait]
impl BatchReader for ConcatReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
loop {
if self.curr_idx >= self.readers.len() {
return Ok(None);
}
let reader = &mut self.readers[self.curr_idx];
match reader.next_batch().await? {
Some(batch) => return Ok(Some(batch)),
None => self.curr_idx += 1,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::read::BatchReader;
use crate::test_util::read_util;
#[tokio::test]
async fn test_concat_reader_empty() {
let mut reader = ConcatReader::new(Vec::new());
assert!(reader.next_batch().await.unwrap().is_none());
// Call next_batch() again is allowed.
assert!(reader.next_batch().await.unwrap().is_none());
}
#[tokio::test]
async fn test_concat_multiple_readers() {
let readers = vec![
read_util::build_boxed_vec_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]),
read_util::build_boxed_vec_reader(&[&[(4, None)]]),
read_util::build_boxed_vec_reader(&[&[(5, Some(5)), (6, Some(6))]]),
];
let mut reader = ConcatReader::new(readers);
read_util::check_reader_with_kv_batch(
&mut reader,
&[
&[(1, Some(1)), (2, Some(2))],
&[(3, None)],
&[(4, None)],
&[(5, Some(5)), (6, Some(6))],
],
)
.await;
}
#[tokio::test]
async fn test_concat_reader_with_empty_reader() {
let readers = vec![
read_util::build_boxed_vec_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]),
// Empty reader.
read_util::build_boxed_vec_reader(&[&[]]),
read_util::build_boxed_vec_reader(&[&[(5, Some(5)), (6, Some(6))]]),
];
let mut reader = ConcatReader::new(readers);
read_util::check_reader_with_kv_batch(
&mut reader,
&[
&[(1, Some(1)), (2, Some(2))],
&[(3, None)],
&[(5, Some(5)), (6, Some(6))],
],
)
.await;
}
}

View File

@@ -108,6 +108,10 @@ impl<S: LogStore> RegionImpl<S> {
fn committed_sequence(&self) -> store_api::storage::SequenceNumber {
self.inner.version_control().committed_sequence()
}
async fn wait_flush_done(&self) -> Result<()> {
self.inner.writer.wait_flush_done().await
}
}
/// Shared data of region.
@@ -148,7 +152,7 @@ impl<S: LogStore> RegionInner<S> {
let version = self.version_control().current();
let sequence = self.version_control().committed_sequence();
SnapshotImpl::new(version, sequence)
SnapshotImpl::new(version, sequence, self.sst_layer.clone())
}
async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {

View File

@@ -45,6 +45,14 @@ impl FlushTester {
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.tester.put(data).await
}
async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
self.tester.full_scan().await
}
async fn wait_flush_done(&self) {
self.tester.region.wait_flush_done().await.unwrap();
}
}
#[derive(Debug, Default)]
@@ -70,10 +78,10 @@ impl FlushStrategy for FlushSwitch {
}
#[tokio::test]
async fn test_flush() {
async fn test_flush_and_stall() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("flush").unwrap();
let dir = TempDir::new("flush-stall").unwrap();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
@@ -106,3 +114,31 @@ async fn test_flush() {
}
assert!(has_parquet_file);
}
#[tokio::test]
async fn test_read_after_flush() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("read-flush").unwrap();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
// Always trigger flush before write.
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
// Put elements so we have content to flush.
tester.put(&[(1000, Some(100))]).await;
tester.put(&[(2000, Some(200))]).await;
// Now set should flush to true to trigger flush.
flush_switch.set_should_flush(true);
// Put element to trigger flush.
tester.put(&[(3000, Some(300))]).await;
tester.wait_flush_done().await;
let expect = vec![(3000, Some(300)), (1000, Some(100)), (2000, Some(200))];
let output = tester.full_scan().await;
assert_eq!(expect, output);
}

View File

@@ -92,7 +92,7 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<i64>)>) {
/// Test region without considering version column.
pub struct Tester {
region: RegionImpl<NoopLogStore>,
pub region: RegionImpl<NoopLogStore>,
write_ctx: WriteContext,
read_ctx: ReadContext,
}
@@ -124,7 +124,8 @@ impl Tester {
self.region.write(&self.write_ctx, batch).await.unwrap()
}
async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
/// Scan all data.
pub async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
let snapshot = self.region.snapshot(&self.read_ctx).unwrap();
let resp = snapshot

View File

@@ -79,6 +79,9 @@ impl RegionWriter {
version_control.set_committed_sequence(next_sequence);
// TODO(yingwen): We should set the flush handle to `None`, but we can't acquire
// write lock here.
Ok(())
}
@@ -96,6 +99,19 @@ impl RegionWriter {
}
}
// Private methods for tests.
#[cfg(test)]
impl RegionWriter {
pub async fn wait_flush_done(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
if let Some(handle) = inner.flush_handle.take() {
handle.join().await?;
}
Ok(())
}
}
pub struct WriterContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub flush_strategy: &'a FlushStrategyRef,

View File

@@ -6,9 +6,10 @@ use store_api::storage::{
Snapshot,
};
use crate::chunk::ChunkReaderImpl;
use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl};
use crate::error::{Error, Result};
use crate::memtable::IterContext;
use crate::sst::AccessLayerRef;
use crate::version::VersionRef;
/// [Snapshot] implementation.
@@ -16,6 +17,7 @@ pub struct SnapshotImpl {
version: VersionRef,
/// Max sequence number (inclusive) visible to user.
visible_sequence: SequenceNumber,
sst_layer: AccessLayerRef,
}
#[async_trait]
@@ -37,32 +39,22 @@ impl Snapshot for SnapshotImpl {
let mutables = memtable_version.mutable_memtables();
let immutables = memtable_version.immutable_memtables();
let mut batch_iters = Vec::with_capacity(memtable_version.num_memtables());
let iter_ctx = IterContext {
batch_size: ctx.batch_size,
visible_sequence,
..Default::default()
};
for (_range, mem) in mutables.iter() {
let iter = mem.iter(iter_ctx.clone())?;
batch_iters.push(iter);
}
let mut builder =
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
.reserve_num_memtables(memtable_version.num_memtables())
.iter_ctx(IterContext {
batch_size: ctx.batch_size,
visible_sequence,
..Default::default()
})
.pick_memtables(mutables)?;
for mem_set in immutables {
for (_range, mem) in mem_set.iter() {
let iter = mem.iter(iter_ctx.clone())?;
batch_iters.push(iter);
}
builder = builder.pick_memtables(mem_set)?;
}
// Now we just simply chain all iterators together, ignore duplications/ordering.
let iter = Box::new(batch_iters.into_iter().flatten());
let reader = ChunkReaderImpl::new(self.version.schema().clone(), iter);
let reader = builder.pick_ssts(&**self.version.ssts())?.build().await?;
Ok(ScanResponse { reader })
}
@@ -73,10 +65,15 @@ impl Snapshot for SnapshotImpl {
}
impl SnapshotImpl {
pub fn new(version: VersionRef, visible_sequence: SequenceNumber) -> SnapshotImpl {
pub fn new(
version: VersionRef,
visible_sequence: SequenceNumber,
sst_layer: AccessLayerRef,
) -> SnapshotImpl {
SnapshotImpl {
version,
visible_sequence,
sst_layer,
}
}

View File

@@ -7,17 +7,26 @@ use object_store::{util, ObjectStore};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::memtable::BatchIteratorPtr;
use crate::sst::parquet::ParquetWriter;
use crate::memtable::BoxedBatchIterator;
use crate::read::BoxedBatchReader;
use crate::sst::parquet::{ParquetReader, ParquetWriter};
/// Maximum level of ssts.
/// Maximum level of SSTs.
pub const MAX_LEVEL: usize = 1;
// We only has fixed number of level, so we array to hold elements. This implement
// detail of LevelMetaVec should not be exposed to the user of [LevelMetas].
type LevelMetaVec = [LevelMeta; MAX_LEVEL];
/// Metadata of all ssts under a region.
/// Visitor to access file in each level.
pub trait Visitor {
/// Visit all `files` in `level`.
///
/// Now the input `files` are unordered.
fn visit(&mut self, level: usize, files: &[FileHandle]) -> Result<()>;
}
/// Metadata of all SSTs under a region.
///
/// Files are organized into multiple level, though there may be only one level.
#[derive(Debug, Clone)]
@@ -29,7 +38,7 @@ impl LevelMetas {
/// Create a new LevelMetas and initialized each level.
pub fn new() -> LevelMetas {
LevelMetas {
levels: [LevelMeta::default(); MAX_LEVEL],
levels: new_level_meta_vec(),
}
}
@@ -49,6 +58,18 @@ impl LevelMetas {
merged
}
/// Visit all SST files.
///
/// Stop visiting remaining files if the visitor returns `Err`, and the `Err`
/// will be returned to caller.
pub fn visit_levels<V: Visitor>(&self, visitor: &mut V) -> Result<()> {
for level in &self.levels {
level.visit_level(visitor)?;
}
Ok(())
}
}
impl Default for LevelMetas {
@@ -57,9 +78,10 @@ impl Default for LevelMetas {
}
}
/// Metadata of files in same sst level.
/// Metadata of files in same SST level.
#[derive(Debug, Default, Clone)]
pub struct LevelMeta {
level: u8,
/// Handles to the files in this level.
// TODO(yingwen): Now for simplicity, files are unordered, maybe sort the files by time range
// or use another structure to hold them.
@@ -70,6 +92,19 @@ impl LevelMeta {
fn add_file(&mut self, file: FileHandle) {
self.files.push(file);
}
fn visit_level<V: Visitor>(&self, visitor: &mut V) -> Result<()> {
visitor.visit(self.level.into(), &self.files)
}
}
fn new_level_meta_vec() -> LevelMetaVec {
let mut levels = [LevelMeta::default(); MAX_LEVEL];
for (i, level) in levels.iter_mut().enumerate() {
level.level = i as u8;
}
levels
}
/// In-memory handle to a file.
@@ -90,6 +125,11 @@ impl FileHandle {
pub fn level_index(&self) -> usize {
self.inner.meta.level.into()
}
#[inline]
pub fn file_name(&self) -> &str {
&self.inner.meta.file_name
}
}
/// Actually data of [FileHandle].
@@ -107,9 +147,9 @@ impl FileHandleInner {
}
/// Immutable metadata of a sst file.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct FileMeta {
pub file_path: String,
pub file_name: String,
/// SST level of the file.
pub level: u8,
}
@@ -119,16 +159,26 @@ pub struct WriteOptions {
// TODO(yingwen): [flush] row group size.
}
/// Sst access layer.
#[derive(Debug)]
pub struct ReadOptions {
/// Suggested size of each batch.
pub batch_size: usize,
}
/// SST access layer.
#[async_trait]
pub trait AccessLayer: Send + Sync + std::fmt::Debug {
// Writes SST file with given name and returns the full path.
/// Writes SST file with given `file_name`.
async fn write_sst(
&self,
file_name: &str,
iter: BatchIteratorPtr,
opts: WriteOptions,
) -> Result<String>;
iter: BoxedBatchIterator,
opts: &WriteOptions,
) -> Result<()>;
/// Read SST file with given `file_name`.
// TODO(yingwen): Read SST according to scan request and returns a chunk stream.
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader>;
}
pub type AccessLayerRef = Arc<dyn AccessLayer>;
@@ -159,15 +209,23 @@ impl AccessLayer for FsAccessLayer {
async fn write_sst(
&self,
file_name: &str,
iter: BatchIteratorPtr,
opts: WriteOptions,
) -> Result<String> {
// Now we only supports parquet format. We may allow caller to specific sst format in
iter: BoxedBatchIterator,
opts: &WriteOptions,
) -> Result<()> {
// Now we only supports parquet format. We may allow caller to specific SST format in
// WriteOptions in the future.
let file_path = self.sst_file_path(file_name);
let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone());
writer.write_sst(opts).await?;
Ok(file_path)
Ok(())
}
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader> {
let file_path = self.sst_file_path(file_name);
let reader = ParquetReader::new(&file_path, self.object_store.clone());
let stream = reader.chunk_stream(None, opts.batch_size).await?;
Ok(Box::new(stream))
}
}

View File

@@ -1,45 +1,59 @@
//! Parquet sst format.
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use async_stream::try_stream;
use async_trait::async_trait;
use datatypes::arrow::array::Array;
use datatypes::arrow::chunk::Chunk;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::arrow::io::parquet::read::{
infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer,
};
use datatypes::arrow::io::parquet::write::{
Compression, Encoding, FileSink, Version, WriteOptions,
};
use datatypes::prelude::{ConcreteDataType, Vector};
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector};
use futures_util::sink::SinkExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use futures_util::{Stream, TryStreamExt};
use object_store::{ObjectStore, SeekableReader};
use snafu::{OptionExt, ResultExt};
use store_api::storage::consts;
use crate::error::{FlushIoSnafu, Result, WriteParquetSnafu};
use crate::memtable::{BatchIteratorPtr, MemtableSchema};
use crate::error::{
FlushIoSnafu, InvalidParquetSchemaSnafu, ReadParquetIoSnafu, ReadParquetSnafu, Result,
SequenceColumnNotFoundSnafu, WriteParquetSnafu,
};
use crate::memtable::{BoxedBatchIterator, MemtableSchema};
use crate::metadata::ColumnMetadata;
use crate::read::{Batch, BatchReader};
use crate::sst;
/// Parquet sst writer.
pub struct ParquetWriter<'a> {
file_name: &'a str,
iter: BatchIteratorPtr,
file_path: &'a str,
iter: BoxedBatchIterator,
object_store: ObjectStore,
}
impl<'a> ParquetWriter<'a> {
pub fn new(
file_name: &'a str,
iter: BatchIteratorPtr,
file_path: &'a str,
iter: BoxedBatchIterator,
object_store: ObjectStore,
) -> ParquetWriter {
ParquetWriter {
file_name,
file_path,
iter,
object_store,
}
}
pub async fn write_sst(self, _opts: sst::WriteOptions) -> Result<()> {
pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<()> {
self.write_rows(None).await
}
@@ -48,7 +62,7 @@ impl<'a> ParquetWriter<'a> {
/// in config will be written to a single row group.
async fn write_rows(self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
let schema = memtable_schema_to_arrow_schema(self.iter.schema());
let object = self.object_store.object(self.file_name);
let object = self.object_store.object(self.file_path);
// FIXME(hl): writer size is not used in fs backend so just leave it to 0,
// but in s3/azblob backend the Content-Length field of HTTP request is set
@@ -91,11 +105,15 @@ impl<'a> ParquetWriter<'a> {
sink.metadata.insert(k, Some(v));
}
}
sink.close().await.context(WriteParquetSnafu)
sink.close().await.context(WriteParquetSnafu)?;
// FIXME(yingwen): Hack to workaround an [arrow2 BUG](https://github.com/jorgecarleitao/parquet2/issues/162),
// upgrading to latest arrow2 can fixed this, but now datafusion is still using an old arrow2 version.
sink.flush().await.context(WriteParquetSnafu)
}
}
/// Assembles arrow schema from memtable schema info.
/// TODO(hl): implement `From<Schema>` for `MemtableSchema`/`Physical schema`
fn memtable_schema_to_arrow_schema(schema: &MemtableSchema) -> Schema {
let col_meta_to_field: fn(&ColumnMetadata) -> Field = |col_meta| {
Field::from(&ColumnSchema::new(
@@ -177,6 +195,157 @@ fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
}
}
pub struct ParquetReader<'a> {
file_path: &'a str,
object_store: ObjectStore,
}
type ReaderFactoryFuture<'a, R> =
Pin<Box<dyn futures_util::Future<Output = std::io::Result<R>> + Send + 'a>>;
pub type FieldProjection = Box<dyn Fn(&Schema) -> Vec<Field> + Send + Sync>;
impl<'a> ParquetReader<'a> {
pub fn new(file_path: &str, object_store: ObjectStore) -> ParquetReader {
ParquetReader {
file_path,
object_store,
}
}
pub async fn chunk_stream(
&self,
projection: Option<FieldProjection>,
chunk_size: usize,
) -> Result<ChunkStream> {
let file_path = self.file_path.to_string();
let operator = self.object_store.clone();
let reader_factory = move || -> ReaderFactoryFuture<SeekableReader> {
let file_path = file_path.clone();
let operator = operator.clone();
Box::pin(async move { Ok(operator.object(&file_path).seekable_reader(..)) })
};
let file_path = self.file_path.to_string();
let mut reader = reader_factory()
.await
.context(ReadParquetIoSnafu { file: &file_path })?;
let metadata = read_metadata_async(&mut reader)
.await
.context(ReadParquetSnafu { file: &file_path })?;
let schema = infer_schema(&metadata).context(ReadParquetSnafu { file: &file_path })?;
let projected_fields = projection.map_or_else(|| schema.fields.clone(), |p| p(&schema));
let projected_schema = Schema {
fields: projected_fields.clone(),
metadata: schema.metadata,
};
let chunk_stream = try_stream!({
for rg in metadata.row_groups {
let column_chunks = read_columns_many_async(
&reader_factory,
&rg,
projected_fields.clone(),
Some(chunk_size),
)
.await
.context(ReadParquetSnafu { file: &file_path })?;
let chunks = RowGroupDeserializer::new(column_chunks, rg.num_rows() as usize, None);
for maybe_chunk in chunks {
let columns_in_chunk =
maybe_chunk.context(ReadParquetSnafu { file: &file_path })?;
yield columns_in_chunk;
}
}
});
ChunkStream::new(projected_schema, Box::pin(chunk_stream))
}
}
type ChunkConverter = Box<dyn Fn(Chunk<Arc<dyn Array>>) -> Result<Batch> + Send + Sync>;
pub type SendableChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk<Arc<dyn Array>>>> + Send>>;
pub struct ChunkStream {
stream: SendableChunkStream,
converter: ChunkConverter,
}
impl ChunkStream {
pub fn new(schema: Schema, stream: SendableChunkStream) -> Result<Self> {
Ok(Self {
converter: batch_converter_factory(schema)?,
stream,
})
}
}
#[async_trait]
impl BatchReader for ChunkStream {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.stream
.try_next()
.await?
.map(&self.converter)
.transpose()
}
}
// TODO(hl): Currently rowkey/values/reserved columns are identified by their position in field:
// all fields before `__sequence` column are rowkeys and fields after `__value_type` are values.
// But it would be better to persist rowkey/value columns' positions to Parquet metadata and
// parse `MemtableSchema` from metadata while building BatchReader.
fn batch_converter_factory(schema: Schema) -> Result<ChunkConverter> {
let ts_idx = schema
.fields
.iter()
.position(|f| f.name == consts::SEQUENCE_COLUMN_NAME)
.with_context(|| SequenceColumnNotFoundSnafu {
msg: format!("Schema: {:?}", schema),
})?;
let field_len = schema.fields.len();
macro_rules! handle_err {
($stmt: expr, $schema: ident) => {
$stmt.with_context(|_| InvalidParquetSchemaSnafu {
msg: format!("Schema type error: {:?}", $schema),
})?
};
}
let converter = move |c: Chunk<Arc<dyn Array>>| {
Ok(Batch {
sequences: handle_err!(
UInt64Vector::try_from_arrow_array(&c.arrays()[ts_idx].clone()),
schema
),
value_types: handle_err!(
UInt8Vector::try_from_arrow_array(&c.arrays()[ts_idx + 1].clone()),
schema
),
keys: handle_err!(
(0..ts_idx)
.into_iter()
.map(|i| Helper::try_into_vector(&c.arrays()[i].clone()))
.collect::<std::result::Result<_, _>>(),
schema
),
values: handle_err!(
(ts_idx + 2..field_len)
.into_iter()
.map(|i| Helper::try_into_vector(&c.arrays()[i].clone()))
.collect::<std::result::Result<_, _>>(),
schema
),
})
};
Ok(Box::new(converter))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -220,7 +389,7 @@ mod tests {
let writer = ParquetWriter::new(sst_file_name, iter, object_store);
writer
.write_sst(sst::WriteOptions::default())
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();

View File

@@ -1,5 +1,6 @@
pub mod config_util;
pub mod descriptor_util;
pub mod read_util;
pub mod schema_util;
pub mod write_batch_util;

View File

@@ -0,0 +1,88 @@
use std::sync::Arc;
use async_trait::async_trait;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector};
use crate::error::Result;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
/// Build a new batch, with 0 sequence and value type.
fn new_kv_batch(key_values: &[(i64, Option<i64>)]) -> Batch {
let key = Arc::new(Int64Vector::from_values(key_values.iter().map(|v| v.0)));
let value = Arc::new(Int64Vector::from_iter(key_values.iter().map(|v| v.1)));
let sequences = UInt64Vector::from_vec(vec![0; key_values.len()]);
let value_types = UInt8Vector::from_vec(vec![0; key_values.len()]);
Batch {
keys: vec![key],
sequences,
value_types,
values: vec![value],
}
}
fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option<i64>)]]) {
for (batch, key_values) in batches.iter().zip(expect.iter()) {
let key = batch.keys[0]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let value = batch.values[0]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
for (i, (k, v)) in key_values.iter().enumerate() {
assert_eq!(key.get_data(i).unwrap(), *k,);
assert_eq!(value.get_data(i), *v,);
}
}
assert_eq!(batches.len(), expect.len());
}
pub async fn check_reader_with_kv_batch(
reader: &mut dyn BatchReader,
expect: &[&[(i64, Option<i64>)]],
) {
let mut result = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
result.push(batch);
}
check_kv_batch(&result, expect);
}
/// A reader for test that takes batch from Vec.
pub struct VecBatchReader {
batches: Vec<Batch>,
}
impl VecBatchReader {
fn new(mut batches: Vec<Batch>) -> VecBatchReader {
batches.reverse();
VecBatchReader { batches }
}
}
#[async_trait]
impl BatchReader for VecBatchReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
Ok(self.batches.pop())
}
}
pub fn build_vec_reader(batches: &[&[(i64, Option<i64>)]]) -> VecBatchReader {
let batches: Vec<_> = batches
.iter()
.filter(|key_values| !key_values.is_empty())
.map(|key_values| new_kv_batch(key_values))
.collect();
VecBatchReader::new(batches)
}
pub fn build_boxed_vec_reader(batches: &[&[(i64, Option<i64>)]]) -> BoxedBatchReader {
Box::new(build_vec_reader(batches))
}

View File

@@ -159,10 +159,16 @@ impl Version {
self.memtables.mutable_memtables()
}
#[inline]
pub fn memtables(&self) -> &MemtableVersionRef {
&self.memtables
}
#[inline]
pub fn ssts(&self) -> &LevelMetasRef {
&self.ssts
}
/// Returns duration used to partition the memtables and ssts by time.
pub fn bucket_duration(&self) -> Duration {
DEFAULT_BUCKET_DURATION

View File

@@ -16,11 +16,14 @@ impl Chunk {
}
}
/// `ChunkReader` is similar to async iterator of [Chunk].
#[async_trait]
pub trait ChunkReader: Send {
type Error: ErrorExt + Send + Sync;
/// Schema of the chunks returned by this reader.
fn schema(&self) -> &SchemaRef;
/// Fetch next chunk from the reader.
async fn next_chunk(&mut self) -> Result<Option<Chunk>, Self::Error>;
}