From 03e965954a6fbae6480a198bb60bb08242152b63 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 28 Jul 2022 11:46:51 +0800 Subject: [PATCH] feat: implement read framework (#108) * feat: implement read framework feat: chunk reader builder refactor: rename BatchIteratorPtr to BoxedBatchIterator feat: BatchReader to read batch from ssts feat: Add a ConcatReader to concat sst readers test: Add tests for concat reader chore: Fix clippy * feat: implement SST parquet reader (#109) * feat: implement parquet sst reader * chores: fix some CR comments * gst * fix sst writer flush issue * feat: Implement FsAccessLayer::read_sst * fix: remove lifetime from ChunkStream * refactor: Store file name in FileMeta - Store file name instead of path (`region-name/file-name`) in FileMeta. - `AccessLayer::read()` takes file name instead of path, so the read/write api are consistent Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com> Co-authored-by: Lei, HUANG --- Cargo.lock | 1 + src/object-store/src/lib.rs | 4 +- src/storage/Cargo.toml | 1 + src/storage/src/chunk.rs | 144 +++++++++++++-- src/storage/src/error.rs | 33 +++- src/storage/src/flush.rs | 13 +- src/storage/src/lib.rs | 1 + src/storage/src/manifest/action.rs | 8 +- src/storage/src/memtable.rs | 18 +- src/storage/src/memtable/btree.rs | 5 +- src/storage/src/read.rs | 126 +++++++++++++ src/storage/src/region.rs | 6 +- src/storage/src/region/tests/flush.rs | 40 ++++- src/storage/src/region/tests/read_write.rs | 5 +- src/storage/src/region/writer.rs | 16 ++ src/storage/src/snapshot.rs | 43 +++-- src/storage/src/sst.rs | 94 ++++++++-- src/storage/src/sst/parquet.rs | 195 +++++++++++++++++++-- src/storage/src/test_util.rs | 1 + src/storage/src/test_util/read_util.rs | 88 ++++++++++ src/storage/src/version.rs | 6 + src/store-api/src/storage/chunk.rs | 3 + 22 files changed, 751 insertions(+), 100 deletions(-) create mode 100644 src/storage/src/read.rs create mode 100644 src/storage/src/test_util/read_util.rs diff --git a/Cargo.lock b/Cargo.lock index 610c342ce4..5bf9194052 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3483,6 +3483,7 @@ version = "0.1.0" dependencies = [ "arc-swap", "arrow-format", + "async-stream", "async-trait", "atomic_float", "bit-vec", diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index c9e8aff58a..d648ba2697 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -1,6 +1,6 @@ pub use opendal::{ - Accessor, DirEntry, DirStreamer, Layer, Metadata, Object, ObjectMetadata, ObjectMode, - Operator as ObjectStore, + io_util::SeekableReader, Accessor, DirEntry, DirStreamer, Layer, Metadata, Object, + ObjectMetadata, ObjectMode, Operator as ObjectStore, }; pub mod backend; pub mod util; diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 479b7f4261..0ceaff3232 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" arc-swap = "1.0" arrow-format = { version = "0.4", features = ["ipc"] } async-trait = "0.1" +async-stream = "0.3" bit-vec = "0.6" bytes = "1.1" common-error = { path = "../common/error" } diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 99728285a0..ef5bba399d 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -2,14 +2,20 @@ use async_trait::async_trait; use store_api::storage::{Chunk, ChunkReader, SchemaRef}; use crate::error::{Error, Result}; -use crate::memtable::Batch; +use crate::memtable::{BoxedBatchIterator, IterContext, MemtableSet}; +use crate::read::{Batch, BatchReader, ConcatReader}; +use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor}; -type IteratorPtr = Box> + Send>; +type BoxedIterator = Box> + Send>; +/// Chunk reader implementation. +// Now we use async-trait to implement the chunk reader, which is easier to implement than +// using `Stream`, maybe change to `Stream` if we find out it is more efficient and have +// necessary to do so. pub struct ChunkReaderImpl { schema: SchemaRef, - // Now we only read data from memtables, so we just holds the iterator here. - iter: IteratorPtr, + iter: Option, + sst_reader: ConcatReader, } #[async_trait] @@ -21,23 +27,133 @@ impl ChunkReader for ChunkReaderImpl { } async fn next_chunk(&mut self) -> Result> { - let mut batch = match self.iter.next() { - Some(b) => b?, + let batch = match self.fetch_batch().await? { + Some(b) => b, None => return Ok(None), }; - // TODO(yingwen): Check schema, now we assumes the schema is the same as key columns - // combine with value columns. - let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len()); - columns.append(&mut batch.keys); - columns.append(&mut batch.values); + // TODO(yingwen): Check schema. + let chunk = batch_to_chunk(batch); - Ok(Some(Chunk::new(columns))) + Ok(Some(chunk)) } } impl ChunkReaderImpl { - pub fn new(schema: SchemaRef, iter: IteratorPtr) -> ChunkReaderImpl { - ChunkReaderImpl { schema, iter } + pub fn new( + schema: SchemaRef, + iter: BoxedIterator, + sst_reader: ConcatReader, + ) -> ChunkReaderImpl { + ChunkReaderImpl { + schema, + iter: Some(iter), + sst_reader, + } + } + + async fn fetch_batch(&mut self) -> Result> { + if let Some(iter) = &mut self.iter { + match iter.next() { + Some(b) => return Ok(Some(b?)), + None => self.iter = None, + } + } + + self.sst_reader.next_batch().await + } +} + +// Assumes the schema is the same as key columns combine with value columns. +fn batch_to_chunk(mut batch: Batch) -> Chunk { + let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len()); + columns.append(&mut batch.keys); + columns.append(&mut batch.values); + + Chunk::new(columns) +} + +/// Builder to create a new [ChunkReaderImpl] from scan request. +pub struct ChunkReaderBuilder { + schema: SchemaRef, + sst_layer: AccessLayerRef, + iter_ctx: IterContext, + iters: Vec, + files_to_read: Vec, +} + +impl ChunkReaderBuilder { + pub fn new(schema: SchemaRef, sst_layer: AccessLayerRef) -> Self { + ChunkReaderBuilder { + schema, + iter_ctx: IterContext::default(), + iters: Vec::new(), + sst_layer, + files_to_read: Vec::new(), + } + } + + /// Reserve space for iterating `num` memtables. + pub fn reserve_num_memtables(mut self, num: usize) -> Self { + self.iters.reserve(num); + self + } + + pub fn iter_ctx(mut self, iter_ctx: IterContext) -> Self { + self.iter_ctx = iter_ctx; + self + } + + pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Result { + for (_range, mem) in memtables.iter() { + let iter = mem.iter(self.iter_ctx.clone())?; + + self.iters.push(iter); + } + + Ok(self) + } + + pub fn pick_ssts(mut self, ssts: &LevelMetas) -> Result { + ssts.visit_levels(&mut self)?; + + Ok(self) + } + + pub async fn build(self) -> Result { + // Now we just simply chain all iterators together, ignore duplications/ordering. + let iter = Box::new(self.iters.into_iter().flatten()); + + let read_opts = ReadOptions { + batch_size: self.iter_ctx.batch_size, + }; + let mut sst_readers = Vec::with_capacity(self.files_to_read.len()); + for file in &self.files_to_read { + let reader = self + .sst_layer + .read_sst(file.file_name(), &read_opts) + .await?; + + sst_readers.push(reader); + } + let reader = ConcatReader::new(sst_readers); + + Ok(ChunkReaderImpl::new(self.schema, iter, reader)) + } +} + +impl Visitor for ChunkReaderBuilder { + fn visit(&mut self, _level: usize, files: &[FileHandle]) -> Result<()> { + // TODO(yingwen): Filter files by time range. + + // Now we read all files, so just reserve enough space to hold all files. + self.files_to_read.reserve(files.len()); + for file in files { + // We can't invoke async functions here, so we collects all files first, and + // create the batch reader later in `ChunkReaderBuilder`. + self.files_to_read.push(file.clone()); + } + + Ok(()) } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 207a70a767..8ea8373ad8 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -4,6 +4,7 @@ use std::str::Utf8Error; use common_error::prelude::*; use datatypes::arrow; +use datatypes::arrow::error::ArrowError; use serde_json::error::Error as JsonError; use store_api::manifest::action::ProtocolVersion; use store_api::manifest::ManifestVersion; @@ -171,6 +172,30 @@ pub enum Error { #[snafu(display("Failed to read line, err: {}", source))] Readline { source: IoError }, + + #[snafu(display("Failed to read Parquet file: {}, source: {}", file, source))] + ReadParquet { + file: String, + source: ArrowError, + backtrace: Backtrace, + }, + + #[snafu(display("IO failed while reading Parquet file: {}, source: {}", file, source))] + ReadParquetIo { + file: String, + source: std::io::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Parquet file schema is not valid: {}", msg))] + SequenceColumnNotFound { msg: String, backtrace: Backtrace }, + + #[snafu(display("Parquet file schema is not valid, msg: {}, source: {}", msg, source))] + InvalidParquetSchema { + msg: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -193,7 +218,9 @@ impl ErrorExt for Error { | JoinTask { .. } | Cancelled { .. } | DecodeRegionMetaActionList { .. } - | Readline { .. } => StatusCode::Unexpected, + | Readline { .. } + | InvalidParquetSchema { .. } + | SequenceColumnNotFound { .. } => StatusCode::Unexpected, FlushIo { .. } | InitBackend { .. } @@ -206,7 +233,9 @@ impl ErrorExt for Error { | DecodeWalHeader { .. } | EncodeWalHeader { .. } | ManifestProtocolForbidRead { .. } - | ManifestProtocolForbidWrite { .. } => StatusCode::StorageUnavailable, + | ManifestProtocolForbidWrite { .. } + | ReadParquet { .. } + | ReadParquetIo { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 73b7aa70be..86c20f0871 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -177,8 +177,13 @@ impl FlushJob { let iter = m.memtable.iter(iter_ctx)?; futures.push(async move { self.sst_layer - .write_sst(&file_name, iter, WriteOptions::default()) - .await + .write_sst(&file_name, iter, &WriteOptions::default()) + .await?; + + Ok(FileMeta { + file_name, + level: 0, + }) }); } @@ -187,10 +192,6 @@ impl FlushJob { .into_iter() .collect::>>()? .into_iter() - .map(|f| FileMeta { - file_path: f, - level: 0, - }) .collect(); logging::info!("Successfully flush memtables to files: {:?}", metas); diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 27ba91a6c1..6d500c0af5 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -11,6 +11,7 @@ mod manifest; pub mod memtable; pub mod metadata; mod proto; +mod read; mod region; mod snapshot; mod sst; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 9a237002a2..0d45e72fb0 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -183,16 +183,16 @@ mod tests { flush_sequence: 99, files_to_add: vec![ FileMeta { - file_path: "test1".to_string(), + file_name: "test1".to_string(), level: 1, }, FileMeta { - file_path: "test2".to_string(), + file_name: "test2".to_string(), level: 2, }, ], files_to_remove: vec![FileMeta { - file_path: "test0".to_string(), + file_name: "test0".to_string(), level: 0, }], }), @@ -202,7 +202,7 @@ mod tests { let bs = action_list.encode().unwrap(); // {"prev_version":3} // {"Protocol":{"min_reader_version":1,"min_writer_version":0}} - // {"Edit":{"region_id":1,"region_version":10,"flush_sequence":99,"files_to_add":[{"file_path":"test1","level":1},{"file_path":"test2","level":2}],"files_to_remove":[{"file_path":"test0","level":0}]}} + // {"Edit":{"region_id":1,"region_version":10,"flush_sequence":99,"files_to_add":[{"file_name":"test1","level":1},{"file_name":"test2","level":2}],"files_to_remove":[{"file_name":"test0","level":0}]}} logging::debug!( "Encoded action list: \r\n{}", diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index 7d0efd9af6..d8aac2523e 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -7,7 +7,7 @@ mod version; use std::sync::Arc; -use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef}; +use datatypes::vectors::VectorRef; use store_api::storage::{consts, SequenceNumber, ValueType}; use crate::error::Result; @@ -15,6 +15,7 @@ use crate::memtable::btree::BTreeMemtable; pub use crate::memtable::inserter::Inserter; pub use crate::memtable::schema::MemtableSchema; pub use crate::memtable::version::{MemtableSet, MemtableVersion}; +use crate::read::Batch; /// Unique id for memtables under same region. pub type MemtableId = u32; @@ -33,7 +34,7 @@ pub trait Memtable: Send + Sync + std::fmt::Debug { /// Iterates the memtable. // TODO(yingwen): 1. Use reference of IterContext? 2. Consider passing a projector (does column projection). - fn iter(&self, ctx: IterContext) -> Result; + fn iter(&self, ctx: IterContext) -> Result; /// Returns the estimated bytes allocated by this memtable from heap. fn bytes_allocated(&self) -> usize; @@ -76,15 +77,10 @@ pub enum RowOrdering { Key, } -// TODO(yingwen): Maybe pack value_type with sequence (reserve 8bits in u64 for value type) like RocksDB. -pub struct Batch { - pub keys: Vec, - pub sequences: UInt64Vector, - pub value_types: UInt8Vector, - pub values: Vec, -} - /// Iterator of memtable. +/// +/// Since data of memtable are stored in memory, so avoid defining this trait +/// as an async trait. pub trait BatchIterator: Iterator> + Send + Sync { /// Returns the schema of this iterator. fn schema(&self) -> &MemtableSchema; @@ -93,7 +89,7 @@ pub trait BatchIterator: Iterator> + Send + Sync { fn ordering(&self) -> RowOrdering; } -pub type BatchIteratorPtr = Box; +pub type BoxedBatchIterator = Box; pub trait MemtableBuilder: Send + Sync + std::fmt::Debug { fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef; diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 4523a47308..24911045c6 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -15,9 +15,10 @@ use store_api::storage::{SequenceNumber, ValueType}; use crate::error::Result; use crate::memtable::{ - Batch, BatchIterator, BatchIteratorPtr, IterContext, KeyValues, Memtable, MemtableId, + BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, MemtableSchema, RowOrdering, }; +use crate::read::Batch; type RwLockMap = RwLock>; @@ -65,7 +66,7 @@ impl Memtable for BTreeMemtable { Ok(()) } - fn iter(&self, ctx: IterContext) -> Result { + fn iter(&self, ctx: IterContext) -> Result { assert!(ctx.batch_size > 0); let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone()); diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs new file mode 100644 index 0000000000..4e39bc7bfa --- /dev/null +++ b/src/storage/src/read.rs @@ -0,0 +1,126 @@ +//! Common structs and utilities for read. + +use async_trait::async_trait; +use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef}; + +use crate::error::Result; + +// TODO(yingwen): Maybe pack value_type with sequence (reserve 8bits in u64 for value type) like RocksDB. +/// Storage internal representation of a batch of rows. +pub struct Batch { + // Now the structure of `Batch` is still unstable, all pub fields may be changed. + pub keys: Vec, + pub sequences: UInt64Vector, + pub value_types: UInt8Vector, + pub values: Vec, +} + +/// Async batch reader. +#[async_trait] +pub trait BatchReader: Send { + // TODO(yingwen): Schema of batch. + + /// Fetch next [Batch]. + /// + /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()` + /// again won't return batch again. + /// + /// If `Err` is returned, caller should not call this method again, the implementor + /// may or may not panic in such case. + async fn next_batch(&mut self) -> Result>; +} + +/// Pointer to [BatchReader]. +pub type BoxedBatchReader = Box; + +/// Concat reader inputs. +pub struct ConcatReader { + readers: Vec, + curr_idx: usize, +} + +impl ConcatReader { + pub fn new(readers: Vec) -> ConcatReader { + ConcatReader { + readers, + curr_idx: 0, + } + } +} + +#[async_trait] +impl BatchReader for ConcatReader { + async fn next_batch(&mut self) -> Result> { + loop { + if self.curr_idx >= self.readers.len() { + return Ok(None); + } + + let reader = &mut self.readers[self.curr_idx]; + match reader.next_batch().await? { + Some(batch) => return Ok(Some(batch)), + None => self.curr_idx += 1, + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::read::BatchReader; + use crate::test_util::read_util; + + #[tokio::test] + async fn test_concat_reader_empty() { + let mut reader = ConcatReader::new(Vec::new()); + + assert!(reader.next_batch().await.unwrap().is_none()); + // Call next_batch() again is allowed. + assert!(reader.next_batch().await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_concat_multiple_readers() { + let readers = vec![ + read_util::build_boxed_vec_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]), + read_util::build_boxed_vec_reader(&[&[(4, None)]]), + read_util::build_boxed_vec_reader(&[&[(5, Some(5)), (6, Some(6))]]), + ]; + + let mut reader = ConcatReader::new(readers); + + read_util::check_reader_with_kv_batch( + &mut reader, + &[ + &[(1, Some(1)), (2, Some(2))], + &[(3, None)], + &[(4, None)], + &[(5, Some(5)), (6, Some(6))], + ], + ) + .await; + } + + #[tokio::test] + async fn test_concat_reader_with_empty_reader() { + let readers = vec![ + read_util::build_boxed_vec_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]), + // Empty reader. + read_util::build_boxed_vec_reader(&[&[]]), + read_util::build_boxed_vec_reader(&[&[(5, Some(5)), (6, Some(6))]]), + ]; + + let mut reader = ConcatReader::new(readers); + + read_util::check_reader_with_kv_batch( + &mut reader, + &[ + &[(1, Some(1)), (2, Some(2))], + &[(3, None)], + &[(5, Some(5)), (6, Some(6))], + ], + ) + .await; + } +} diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index cf85676312..7ebf02e881 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -108,6 +108,10 @@ impl RegionImpl { fn committed_sequence(&self) -> store_api::storage::SequenceNumber { self.inner.version_control().committed_sequence() } + + async fn wait_flush_done(&self) -> Result<()> { + self.inner.writer.wait_flush_done().await + } } /// Shared data of region. @@ -148,7 +152,7 @@ impl RegionInner { let version = self.version_control().current(); let sequence = self.version_control().committed_sequence(); - SnapshotImpl::new(version, sequence) + SnapshotImpl::new(version, sequence, self.sst_layer.clone()) } async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result { diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 5f7a2a7c51..7c0e76ee04 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -45,6 +45,14 @@ impl FlushTester { async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { self.tester.put(data).await } + + async fn full_scan(&self) -> Vec<(i64, Option)> { + self.tester.full_scan().await + } + + async fn wait_flush_done(&self) { + self.tester.region.wait_flush_done().await.unwrap(); + } } #[derive(Debug, Default)] @@ -70,10 +78,10 @@ impl FlushStrategy for FlushSwitch { } #[tokio::test] -async fn test_flush() { +async fn test_flush_and_stall() { common_telemetry::init_default_ut_logging(); - let dir = TempDir::new("flush").unwrap(); + let dir = TempDir::new("flush-stall").unwrap(); let store_dir = dir.path().to_str().unwrap(); let flush_switch = Arc::new(FlushSwitch::default()); @@ -106,3 +114,31 @@ async fn test_flush() { } assert!(has_parquet_file); } + +#[tokio::test] +async fn test_read_after_flush() { + common_telemetry::init_default_ut_logging(); + + let dir = TempDir::new("read-flush").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + // Always trigger flush before write. + let tester = FlushTester::new(store_dir, flush_switch.clone()).await; + + // Put elements so we have content to flush. + tester.put(&[(1000, Some(100))]).await; + tester.put(&[(2000, Some(200))]).await; + + // Now set should flush to true to trigger flush. + flush_switch.set_should_flush(true); + + // Put element to trigger flush. + tester.put(&[(3000, Some(300))]).await; + tester.wait_flush_done().await; + + let expect = vec![(3000, Some(300)), (1000, Some(100)), (2000, Some(200))]; + + let output = tester.full_scan().await; + assert_eq!(expect, output); +} diff --git a/src/storage/src/region/tests/read_write.rs b/src/storage/src/region/tests/read_write.rs index 7628b2ee4f..6af5c26b55 100644 --- a/src/storage/src/region/tests/read_write.rs +++ b/src/storage/src/region/tests/read_write.rs @@ -92,7 +92,7 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option)>) { /// Test region without considering version column. pub struct Tester { - region: RegionImpl, + pub region: RegionImpl, write_ctx: WriteContext, read_ctx: ReadContext, } @@ -124,7 +124,8 @@ impl Tester { self.region.write(&self.write_ctx, batch).await.unwrap() } - async fn full_scan(&self) -> Vec<(i64, Option)> { + /// Scan all data. + pub async fn full_scan(&self) -> Vec<(i64, Option)> { let snapshot = self.region.snapshot(&self.read_ctx).unwrap(); let resp = snapshot diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index f8e6ff0041..f8e59f26f1 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -79,6 +79,9 @@ impl RegionWriter { version_control.set_committed_sequence(next_sequence); + // TODO(yingwen): We should set the flush handle to `None`, but we can't acquire + // write lock here. + Ok(()) } @@ -96,6 +99,19 @@ impl RegionWriter { } } +// Private methods for tests. +#[cfg(test)] +impl RegionWriter { + pub async fn wait_flush_done(&self) -> Result<()> { + let mut inner = self.inner.lock().await; + if let Some(handle) = inner.flush_handle.take() { + handle.join().await?; + } + + Ok(()) + } +} + pub struct WriterContext<'a, S: LogStore> { pub shared: &'a SharedDataRef, pub flush_strategy: &'a FlushStrategyRef, diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 3603dc18f1..ca861a9f96 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -6,9 +6,10 @@ use store_api::storage::{ Snapshot, }; -use crate::chunk::ChunkReaderImpl; +use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl}; use crate::error::{Error, Result}; use crate::memtable::IterContext; +use crate::sst::AccessLayerRef; use crate::version::VersionRef; /// [Snapshot] implementation. @@ -16,6 +17,7 @@ pub struct SnapshotImpl { version: VersionRef, /// Max sequence number (inclusive) visible to user. visible_sequence: SequenceNumber, + sst_layer: AccessLayerRef, } #[async_trait] @@ -37,32 +39,22 @@ impl Snapshot for SnapshotImpl { let mutables = memtable_version.mutable_memtables(); let immutables = memtable_version.immutable_memtables(); - let mut batch_iters = Vec::with_capacity(memtable_version.num_memtables()); - let iter_ctx = IterContext { - batch_size: ctx.batch_size, - visible_sequence, - ..Default::default() - }; - - for (_range, mem) in mutables.iter() { - let iter = mem.iter(iter_ctx.clone())?; - - batch_iters.push(iter); - } + let mut builder = + ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) + .reserve_num_memtables(memtable_version.num_memtables()) + .iter_ctx(IterContext { + batch_size: ctx.batch_size, + visible_sequence, + ..Default::default() + }) + .pick_memtables(mutables)?; for mem_set in immutables { - for (_range, mem) in mem_set.iter() { - let iter = mem.iter(iter_ctx.clone())?; - - batch_iters.push(iter); - } + builder = builder.pick_memtables(mem_set)?; } - // Now we just simply chain all iterators together, ignore duplications/ordering. - let iter = Box::new(batch_iters.into_iter().flatten()); - - let reader = ChunkReaderImpl::new(self.version.schema().clone(), iter); + let reader = builder.pick_ssts(&**self.version.ssts())?.build().await?; Ok(ScanResponse { reader }) } @@ -73,10 +65,15 @@ impl Snapshot for SnapshotImpl { } impl SnapshotImpl { - pub fn new(version: VersionRef, visible_sequence: SequenceNumber) -> SnapshotImpl { + pub fn new( + version: VersionRef, + visible_sequence: SequenceNumber, + sst_layer: AccessLayerRef, + ) -> SnapshotImpl { SnapshotImpl { version, visible_sequence, + sst_layer, } } diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 0c85a69072..673a81da23 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -7,17 +7,26 @@ use object_store::{util, ObjectStore}; use serde::{Deserialize, Serialize}; use crate::error::Result; -use crate::memtable::BatchIteratorPtr; -use crate::sst::parquet::ParquetWriter; +use crate::memtable::BoxedBatchIterator; +use crate::read::BoxedBatchReader; +use crate::sst::parquet::{ParquetReader, ParquetWriter}; -/// Maximum level of ssts. +/// Maximum level of SSTs. pub const MAX_LEVEL: usize = 1; // We only has fixed number of level, so we array to hold elements. This implement // detail of LevelMetaVec should not be exposed to the user of [LevelMetas]. type LevelMetaVec = [LevelMeta; MAX_LEVEL]; -/// Metadata of all ssts under a region. +/// Visitor to access file in each level. +pub trait Visitor { + /// Visit all `files` in `level`. + /// + /// Now the input `files` are unordered. + fn visit(&mut self, level: usize, files: &[FileHandle]) -> Result<()>; +} + +/// Metadata of all SSTs under a region. /// /// Files are organized into multiple level, though there may be only one level. #[derive(Debug, Clone)] @@ -29,7 +38,7 @@ impl LevelMetas { /// Create a new LevelMetas and initialized each level. pub fn new() -> LevelMetas { LevelMetas { - levels: [LevelMeta::default(); MAX_LEVEL], + levels: new_level_meta_vec(), } } @@ -49,6 +58,18 @@ impl LevelMetas { merged } + + /// Visit all SST files. + /// + /// Stop visiting remaining files if the visitor returns `Err`, and the `Err` + /// will be returned to caller. + pub fn visit_levels(&self, visitor: &mut V) -> Result<()> { + for level in &self.levels { + level.visit_level(visitor)?; + } + + Ok(()) + } } impl Default for LevelMetas { @@ -57,9 +78,10 @@ impl Default for LevelMetas { } } -/// Metadata of files in same sst level. +/// Metadata of files in same SST level. #[derive(Debug, Default, Clone)] pub struct LevelMeta { + level: u8, /// Handles to the files in this level. // TODO(yingwen): Now for simplicity, files are unordered, maybe sort the files by time range // or use another structure to hold them. @@ -70,6 +92,19 @@ impl LevelMeta { fn add_file(&mut self, file: FileHandle) { self.files.push(file); } + + fn visit_level(&self, visitor: &mut V) -> Result<()> { + visitor.visit(self.level.into(), &self.files) + } +} + +fn new_level_meta_vec() -> LevelMetaVec { + let mut levels = [LevelMeta::default(); MAX_LEVEL]; + for (i, level) in levels.iter_mut().enumerate() { + level.level = i as u8; + } + + levels } /// In-memory handle to a file. @@ -90,6 +125,11 @@ impl FileHandle { pub fn level_index(&self) -> usize { self.inner.meta.level.into() } + + #[inline] + pub fn file_name(&self) -> &str { + &self.inner.meta.file_name + } } /// Actually data of [FileHandle]. @@ -107,9 +147,9 @@ impl FileHandleInner { } /// Immutable metadata of a sst file. -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct FileMeta { - pub file_path: String, + pub file_name: String, /// SST level of the file. pub level: u8, } @@ -119,16 +159,26 @@ pub struct WriteOptions { // TODO(yingwen): [flush] row group size. } -/// Sst access layer. +#[derive(Debug)] +pub struct ReadOptions { + /// Suggested size of each batch. + pub batch_size: usize, +} + +/// SST access layer. #[async_trait] pub trait AccessLayer: Send + Sync + std::fmt::Debug { - // Writes SST file with given name and returns the full path. + /// Writes SST file with given `file_name`. async fn write_sst( &self, file_name: &str, - iter: BatchIteratorPtr, - opts: WriteOptions, - ) -> Result; + iter: BoxedBatchIterator, + opts: &WriteOptions, + ) -> Result<()>; + + /// Read SST file with given `file_name`. + // TODO(yingwen): Read SST according to scan request and returns a chunk stream. + async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result; } pub type AccessLayerRef = Arc; @@ -159,15 +209,23 @@ impl AccessLayer for FsAccessLayer { async fn write_sst( &self, file_name: &str, - iter: BatchIteratorPtr, - opts: WriteOptions, - ) -> Result { - // Now we only supports parquet format. We may allow caller to specific sst format in + iter: BoxedBatchIterator, + opts: &WriteOptions, + ) -> Result<()> { + // Now we only supports parquet format. We may allow caller to specific SST format in // WriteOptions in the future. let file_path = self.sst_file_path(file_name); let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone()); writer.write_sst(opts).await?; - Ok(file_path) + Ok(()) + } + + async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result { + let file_path = self.sst_file_path(file_name); + let reader = ParquetReader::new(&file_path, self.object_store.clone()); + + let stream = reader.chunk_stream(None, opts.batch_size).await?; + Ok(Box::new(stream)) } } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 8448050a04..8db579f36d 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -1,45 +1,59 @@ //! Parquet sst format. use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use async_stream::try_stream; +use async_trait::async_trait; +use datatypes::arrow::array::Array; use datatypes::arrow::chunk::Chunk; use datatypes::arrow::datatypes::{DataType, Field, Schema}; +use datatypes::arrow::io::parquet::read::{ + infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer, +}; use datatypes::arrow::io::parquet::write::{ Compression, Encoding, FileSink, Version, WriteOptions, }; use datatypes::prelude::{ConcreteDataType, Vector}; use datatypes::schema::ColumnSchema; +use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector}; use futures_util::sink::SinkExt; -use object_store::ObjectStore; -use snafu::ResultExt; +use futures_util::{Stream, TryStreamExt}; +use object_store::{ObjectStore, SeekableReader}; +use snafu::{OptionExt, ResultExt}; use store_api::storage::consts; -use crate::error::{FlushIoSnafu, Result, WriteParquetSnafu}; -use crate::memtable::{BatchIteratorPtr, MemtableSchema}; +use crate::error::{ + FlushIoSnafu, InvalidParquetSchemaSnafu, ReadParquetIoSnafu, ReadParquetSnafu, Result, + SequenceColumnNotFoundSnafu, WriteParquetSnafu, +}; +use crate::memtable::{BoxedBatchIterator, MemtableSchema}; use crate::metadata::ColumnMetadata; +use crate::read::{Batch, BatchReader}; use crate::sst; /// Parquet sst writer. pub struct ParquetWriter<'a> { - file_name: &'a str, - iter: BatchIteratorPtr, + file_path: &'a str, + iter: BoxedBatchIterator, object_store: ObjectStore, } impl<'a> ParquetWriter<'a> { pub fn new( - file_name: &'a str, - iter: BatchIteratorPtr, + file_path: &'a str, + iter: BoxedBatchIterator, object_store: ObjectStore, ) -> ParquetWriter { ParquetWriter { - file_name, + file_path, iter, object_store, } } - pub async fn write_sst(self, _opts: sst::WriteOptions) -> Result<()> { + pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<()> { self.write_rows(None).await } @@ -48,7 +62,7 @@ impl<'a> ParquetWriter<'a> { /// in config will be written to a single row group. async fn write_rows(self, extra_meta: Option>) -> Result<()> { let schema = memtable_schema_to_arrow_schema(self.iter.schema()); - let object = self.object_store.object(self.file_name); + let object = self.object_store.object(self.file_path); // FIXME(hl): writer size is not used in fs backend so just leave it to 0, // but in s3/azblob backend the Content-Length field of HTTP request is set @@ -91,11 +105,15 @@ impl<'a> ParquetWriter<'a> { sink.metadata.insert(k, Some(v)); } } - sink.close().await.context(WriteParquetSnafu) + sink.close().await.context(WriteParquetSnafu)?; + // FIXME(yingwen): Hack to workaround an [arrow2 BUG](https://github.com/jorgecarleitao/parquet2/issues/162), + // upgrading to latest arrow2 can fixed this, but now datafusion is still using an old arrow2 version. + sink.flush().await.context(WriteParquetSnafu) } } /// Assembles arrow schema from memtable schema info. +/// TODO(hl): implement `From` for `MemtableSchema`/`Physical schema` fn memtable_schema_to_arrow_schema(schema: &MemtableSchema) -> Schema { let col_meta_to_field: fn(&ColumnMetadata) -> Field = |col_meta| { Field::from(&ColumnSchema::new( @@ -177,6 +195,157 @@ fn transverse_recursive T + Clone>( } } +pub struct ParquetReader<'a> { + file_path: &'a str, + object_store: ObjectStore, +} + +type ReaderFactoryFuture<'a, R> = + Pin> + Send + 'a>>; + +pub type FieldProjection = Box Vec + Send + Sync>; + +impl<'a> ParquetReader<'a> { + pub fn new(file_path: &str, object_store: ObjectStore) -> ParquetReader { + ParquetReader { + file_path, + object_store, + } + } + + pub async fn chunk_stream( + &self, + projection: Option, + chunk_size: usize, + ) -> Result { + let file_path = self.file_path.to_string(); + let operator = self.object_store.clone(); + let reader_factory = move || -> ReaderFactoryFuture { + let file_path = file_path.clone(); + let operator = operator.clone(); + Box::pin(async move { Ok(operator.object(&file_path).seekable_reader(..)) }) + }; + + let file_path = self.file_path.to_string(); + let mut reader = reader_factory() + .await + .context(ReadParquetIoSnafu { file: &file_path })?; + let metadata = read_metadata_async(&mut reader) + .await + .context(ReadParquetSnafu { file: &file_path })?; + let schema = infer_schema(&metadata).context(ReadParquetSnafu { file: &file_path })?; + + let projected_fields = projection.map_or_else(|| schema.fields.clone(), |p| p(&schema)); + let projected_schema = Schema { + fields: projected_fields.clone(), + metadata: schema.metadata, + }; + + let chunk_stream = try_stream!({ + for rg in metadata.row_groups { + let column_chunks = read_columns_many_async( + &reader_factory, + &rg, + projected_fields.clone(), + Some(chunk_size), + ) + .await + .context(ReadParquetSnafu { file: &file_path })?; + + let chunks = RowGroupDeserializer::new(column_chunks, rg.num_rows() as usize, None); + for maybe_chunk in chunks { + let columns_in_chunk = + maybe_chunk.context(ReadParquetSnafu { file: &file_path })?; + yield columns_in_chunk; + } + } + }); + + ChunkStream::new(projected_schema, Box::pin(chunk_stream)) + } +} + +type ChunkConverter = Box>) -> Result + Send + Sync>; + +pub type SendableChunkStream = Pin>>> + Send>>; + +pub struct ChunkStream { + stream: SendableChunkStream, + converter: ChunkConverter, +} + +impl ChunkStream { + pub fn new(schema: Schema, stream: SendableChunkStream) -> Result { + Ok(Self { + converter: batch_converter_factory(schema)?, + stream, + }) + } +} + +#[async_trait] +impl BatchReader for ChunkStream { + async fn next_batch(&mut self) -> Result> { + self.stream + .try_next() + .await? + .map(&self.converter) + .transpose() + } +} + +// TODO(hl): Currently rowkey/values/reserved columns are identified by their position in field: +// all fields before `__sequence` column are rowkeys and fields after `__value_type` are values. +// But it would be better to persist rowkey/value columns' positions to Parquet metadata and +// parse `MemtableSchema` from metadata while building BatchReader. +fn batch_converter_factory(schema: Schema) -> Result { + let ts_idx = schema + .fields + .iter() + .position(|f| f.name == consts::SEQUENCE_COLUMN_NAME) + .with_context(|| SequenceColumnNotFoundSnafu { + msg: format!("Schema: {:?}", schema), + })?; + + let field_len = schema.fields.len(); + + macro_rules! handle_err { + ($stmt: expr, $schema: ident) => { + $stmt.with_context(|_| InvalidParquetSchemaSnafu { + msg: format!("Schema type error: {:?}", $schema), + })? + }; + } + + let converter = move |c: Chunk>| { + Ok(Batch { + sequences: handle_err!( + UInt64Vector::try_from_arrow_array(&c.arrays()[ts_idx].clone()), + schema + ), + value_types: handle_err!( + UInt8Vector::try_from_arrow_array(&c.arrays()[ts_idx + 1].clone()), + schema + ), + keys: handle_err!( + (0..ts_idx) + .into_iter() + .map(|i| Helper::try_into_vector(&c.arrays()[i].clone())) + .collect::>(), + schema + ), + values: handle_err!( + (ts_idx + 2..field_len) + .into_iter() + .map(|i| Helper::try_into_vector(&c.arrays()[i].clone())) + .collect::>(), + schema + ), + }) + }; + Ok(Box::new(converter)) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -220,7 +389,7 @@ mod tests { let writer = ParquetWriter::new(sst_file_name, iter, object_store); writer - .write_sst(sst::WriteOptions::default()) + .write_sst(&sst::WriteOptions::default()) .await .unwrap(); diff --git a/src/storage/src/test_util.rs b/src/storage/src/test_util.rs index c18e61b19e..7b67ac4886 100644 --- a/src/storage/src/test_util.rs +++ b/src/storage/src/test_util.rs @@ -1,5 +1,6 @@ pub mod config_util; pub mod descriptor_util; +pub mod read_util; pub mod schema_util; pub mod write_batch_util; diff --git a/src/storage/src/test_util/read_util.rs b/src/storage/src/test_util/read_util.rs new file mode 100644 index 0000000000..0b473e08d4 --- /dev/null +++ b/src/storage/src/test_util/read_util.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use datatypes::prelude::ScalarVector; +use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector}; + +use crate::error::Result; +use crate::read::{Batch, BatchReader, BoxedBatchReader}; + +/// Build a new batch, with 0 sequence and value type. +fn new_kv_batch(key_values: &[(i64, Option)]) -> Batch { + let key = Arc::new(Int64Vector::from_values(key_values.iter().map(|v| v.0))); + let value = Arc::new(Int64Vector::from_iter(key_values.iter().map(|v| v.1))); + let sequences = UInt64Vector::from_vec(vec![0; key_values.len()]); + let value_types = UInt8Vector::from_vec(vec![0; key_values.len()]); + + Batch { + keys: vec![key], + sequences, + value_types, + values: vec![value], + } +} + +fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option)]]) { + for (batch, key_values) in batches.iter().zip(expect.iter()) { + let key = batch.keys[0] + .as_any() + .downcast_ref::() + .unwrap(); + let value = batch.values[0] + .as_any() + .downcast_ref::() + .unwrap(); + + for (i, (k, v)) in key_values.iter().enumerate() { + assert_eq!(key.get_data(i).unwrap(), *k,); + assert_eq!(value.get_data(i), *v,); + } + } + assert_eq!(batches.len(), expect.len()); +} + +pub async fn check_reader_with_kv_batch( + reader: &mut dyn BatchReader, + expect: &[&[(i64, Option)]], +) { + let mut result = Vec::new(); + while let Some(batch) = reader.next_batch().await.unwrap() { + result.push(batch); + } + + check_kv_batch(&result, expect); +} + +/// A reader for test that takes batch from Vec. +pub struct VecBatchReader { + batches: Vec, +} + +impl VecBatchReader { + fn new(mut batches: Vec) -> VecBatchReader { + batches.reverse(); + + VecBatchReader { batches } + } +} + +#[async_trait] +impl BatchReader for VecBatchReader { + async fn next_batch(&mut self) -> Result> { + Ok(self.batches.pop()) + } +} + +pub fn build_vec_reader(batches: &[&[(i64, Option)]]) -> VecBatchReader { + let batches: Vec<_> = batches + .iter() + .filter(|key_values| !key_values.is_empty()) + .map(|key_values| new_kv_batch(key_values)) + .collect(); + + VecBatchReader::new(batches) +} + +pub fn build_boxed_vec_reader(batches: &[&[(i64, Option)]]) -> BoxedBatchReader { + Box::new(build_vec_reader(batches)) +} diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 763730ffe8..2e8462664d 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -159,10 +159,16 @@ impl Version { self.memtables.mutable_memtables() } + #[inline] pub fn memtables(&self) -> &MemtableVersionRef { &self.memtables } + #[inline] + pub fn ssts(&self) -> &LevelMetasRef { + &self.ssts + } + /// Returns duration used to partition the memtables and ssts by time. pub fn bucket_duration(&self) -> Duration { DEFAULT_BUCKET_DURATION diff --git a/src/store-api/src/storage/chunk.rs b/src/store-api/src/storage/chunk.rs index f2eb031f65..1c43825330 100644 --- a/src/store-api/src/storage/chunk.rs +++ b/src/store-api/src/storage/chunk.rs @@ -16,11 +16,14 @@ impl Chunk { } } +/// `ChunkReader` is similar to async iterator of [Chunk]. #[async_trait] pub trait ChunkReader: Send { type Error: ErrorExt + Send + Sync; + /// Schema of the chunks returned by this reader. fn schema(&self) -> &SchemaRef; + /// Fetch next chunk from the reader. async fn next_chunk(&mut self) -> Result, Self::Error>; }