From b8e92292d261e68d37ba9e0371e51a39ffd22d9d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 4 Jul 2023 17:01:34 +0900 Subject: [PATCH] feat: Implement a new scan mode using a chain reader (#1857) * feat: add log * feat: print more info * feat: use chain reader * fix: panic on getting first range * fix: prev not updated * fix: reverse readers and iter backward * chore: don't print windows in log * feat: consider memtable range Also fix the issue that using incorrect comparision method to sort time ranges. * fix: merge memtable window with sst's * feat: add use_chain_reader option * feat: skip empty memtables * chore: change log level * fix: memtable range not ordered * style: fix clippy * chore: address review comments * chore: print region id in log --- src/storage/src/chunk.rs | 170 +++++++++++++++++++++++-- src/storage/src/compaction/task.rs | 1 + src/storage/src/compaction/writer.rs | 53 ++++++-- src/storage/src/memtable.rs | 4 +- src/storage/src/read.rs | 9 +- src/storage/src/read/chain.rs | 124 ++++++++++++++++++ src/storage/src/read/merge.rs | 4 +- src/storage/src/region/tests/flush.rs | 40 +++++- src/storage/src/snapshot.rs | 22 ++-- src/storage/src/test_util/read_util.rs | 2 + 10 files changed, 390 insertions(+), 39 deletions(-) create mode 100644 src/storage/src/read/chain.rs diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index f6318bd5ac..1f0dccada7 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -17,16 +17,17 @@ use std::sync::Arc; use async_trait::async_trait; use common_query::logical_plan::Expr; use common_recordbatch::OrderOption; -use common_telemetry::debug; +use common_telemetry::logging; use common_time::range::TimestampRange; use snafu::ResultExt; -use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber}; +use store_api::storage::{Chunk, ChunkReader, RegionId, SchemaRef, SequenceNumber}; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::error::{self, Error, Result}; use crate::memtable::{IterContext, MemtableRef}; -use crate::read::windowed::WindowedReader; -use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder}; +use crate::read::{ + Batch, BoxedBatchReader, ChainReader, DedupReader, MergeReaderBuilder, WindowedReader, +}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions}; use crate::window_infer::{PlainWindowInference, WindowInfer}; @@ -90,6 +91,7 @@ impl ChunkReaderImpl { /// Builder to create a new [ChunkReaderImpl] from scan request. pub struct ChunkReaderBuilder { + region_id: RegionId, schema: RegionSchemaRef, projection: Option>, filters: Vec, @@ -98,11 +100,13 @@ pub struct ChunkReaderBuilder { memtables: Vec, files_to_read: Vec, output_ordering: Option>, + use_chain_reader: bool, } impl ChunkReaderBuilder { - pub fn new(schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Self { + pub fn new(region_id: RegionId, schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Self { ChunkReaderBuilder { + region_id, schema, projection: None, filters: vec![], @@ -111,6 +115,7 @@ impl ChunkReaderBuilder { memtables: Vec::new(), files_to_read: Vec::new(), output_ordering: None, + use_chain_reader: false, } } @@ -150,6 +155,15 @@ impl ChunkReaderBuilder { self } + /// Partition files and memtables according to their time windows and scan time windows + /// one by one. + /// + /// Note that compaction should not enable this. + pub fn use_chain_reader(mut self, use_chain_reader: bool) -> Self { + self.use_chain_reader = use_chain_reader; + self + } + /// Picks all SSTs in all levels pub fn pick_all_ssts(mut self, ssts: &LevelMetas) -> Result { let files = ssts.levels().iter().flat_map(|level| level.files()); @@ -183,7 +197,12 @@ impl ChunkReaderBuilder { if name != self.schema.timestamp_column_name() { return None; } - let memtable_stats = self.memtables.iter().map(|m| m.stats()).collect::>(); + let memtable_stats = self + .memtables + .iter() + .filter(|m| m.num_rows() > 0) // Skip empty memtables. + .map(|m| m.stats()) + .collect::>(); let files = self .files_to_read .iter() @@ -238,15 +257,32 @@ impl ChunkReaderBuilder { predicate, time_range: *time_range, }; + + let mut num_read_files = 0; for file in &self.files_to_read { if !Self::file_in_range(file, time_range) { - debug!("Skip file {:?}, predicate: {:?}", file, time_range); + logging::debug!( + "Skip region {} file {:?}, predicate: {:?}", + self.region_id, + file, + time_range + ); continue; } + let reader = self.sst_layer.read_sst(file.clone(), &read_opts).await?; reader_builder = reader_builder.push_batch_reader(reader); + num_read_files += 1; } + logging::debug!( + "build reader done, region_id: {}, time_range: {:?}, total_files: {}, num_read_files: {}", + self.region_id, + time_range, + self.files_to_read.len(), + num_read_files, + ); + let reader = reader_builder.build(); let reader = DedupReader::new(schema.clone(), reader); Ok(Box::new(reader) as Box<_>) @@ -266,6 +302,8 @@ impl ChunkReaderBuilder { output_ordering = Some(ordering.clone()); self.build_windowed(&schema, &time_range_predicate, windows, ordering) .await? + } else if self.use_chain_reader { + self.build_chained(&schema, &time_range_predicate).await? } else { self.build_reader(&schema, &time_range_predicate).await? }; @@ -273,8 +311,41 @@ impl ChunkReaderBuilder { Ok(ChunkReaderImpl::new(schema, reader, output_ordering)) } + async fn build_chained( + &self, + schema: &ProjectedSchemaRef, + time_range: &TimestampRange, + ) -> Result { + let windows = self.infer_window_for_chain_reader(time_range); + + logging::debug!( + "Infer window for chain reader, region_id: {}, memtables: {}, files: {}, num_windows: {}", + self.region_id, + self.memtables.len(), + self.files_to_read.len(), + windows.len(), + ); + + let mut readers = Vec::with_capacity(windows.len()); + for window in &windows { + let time_range = time_range.and(window); + let reader = self.build_reader(schema, &time_range).await?; + readers.push(reader); + } + + logging::debug!( + "Build chain reader, region_id: {}, time_range: {:?}, num_readers: {}", + self.region_id, + time_range, + readers.len(), + ); + + let chain_reader = ChainReader::new(schema.clone(), readers); + Ok(Box::new(chain_reader) as Box<_>) + } + /// Build time range predicate from schema and filters. - pub fn build_time_range_predicate(&self) -> TimestampRange { + fn build_time_range_predicate(&self) -> TimestampRange { let Some(ts_col) = self.schema.user_schema().timestamp_column() else { return TimestampRange::min_to_max() }; let unit = ts_col .data_type @@ -294,4 +365,87 @@ impl ChunkReaderBuilder { let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); file_ts_range.intersects(predicate) } + + /// Returns the time range of memtables to read. + fn compute_memtable_range(&self) -> Option { + let (min_timestamp, max_timestamp) = self + .memtables + .iter() + .filter(|m| m.num_rows() > 0) // Skip empty memtables. + .map(|m| { + let stats = m.stats(); + (stats.min_timestamp, stats.max_timestamp) + }) + .reduce(|acc, e| (acc.0.min(e.0), acc.1.max(e.1)))?; + + logging::debug!( + "Compute memtable range, region_id: {}, min: {:?}, max: {:?}", + self.region_id, + min_timestamp, + max_timestamp, + ); + + Some(TimestampRange::new_inclusive( + Some(min_timestamp), + Some(max_timestamp), + )) + } + + /// Infer time window for chain reader according to the time range of memtables and files. + fn infer_window_for_chain_reader(&self, time_range: &TimestampRange) -> Vec { + let mut memtable_range = self.compute_memtable_range(); + // file ranges: (start, end) + let mut file_ranges = Vec::with_capacity(self.files_to_read.len()); + for file in &self.files_to_read { + if !Self::file_in_range(file, time_range) || file.time_range().is_none() { + continue; + } + // Safety: we have skip files whose range is `None`. + let range = file.time_range().unwrap(); + + // Filter by memtable's time range. + if let Some(mem_range) = &mut memtable_range { + let file_range = TimestampRange::new_inclusive(Some(range.0), Some(range.1)); + if mem_range.intersects(&file_range) { + // If the range of the SST intersects with the range of the + // memtable, we merge it into the memtable's range. + *mem_range = mem_range.or(&file_range); + continue; + } + } + + file_ranges.push((range.0, range.1)); + } + + if file_ranges.is_empty() { + return memtable_range.map(|range| vec![range]).unwrap_or_default(); + } + + // Sort by start times. + file_ranges.sort_unstable_by(|left, right| left.0.cmp(&right.0)); + + // Compute ranges for all SSTs. + let mut time_ranges = Vec::with_capacity(file_ranges.len() + 1); + // Safety: file_ranges is not empty. + let mut prev = + TimestampRange::new_inclusive(Some(file_ranges[0].0), Some(file_ranges[0].1)); + for file_range in &file_ranges[1..] { + let current = TimestampRange::new_inclusive(Some(file_range.0), Some(file_range.1)); + if prev.intersects(¤t) { + prev = prev.or(¤t); + } else { + time_ranges.push(prev); + prev = current; + } + } + time_ranges.push(prev); + + if let Some(mem_range) = memtable_range { + time_ranges.push(mem_range); + // We have pushed the memtable range, resort the array. + time_ranges.sort_unstable_by(|left, right| left.start().cmp(right.start())); + } + + time_ranges + } } diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 18670327f4..392d703886 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -187,6 +187,7 @@ impl CompactionOutput { sst_write_buffer_size: ReadableSize, ) -> Result> { let reader = build_sst_reader( + region_id, schema, sst_layer.clone(), &self.inputs, diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index fcf6f3bf42..0d62cc6212 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -16,6 +16,7 @@ use common_query::logical_plan::{DfExpr, Expr}; use common_time::timestamp::TimeUnit; use datafusion_expr::Operator; use datatypes::value::timestamp_to_scalar_value; +use store_api::storage::RegionId; use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl}; use crate::error; @@ -24,6 +25,7 @@ use crate::sst::{AccessLayerRef, FileHandle}; /// Builds an SST reader that only reads rows within given time range. pub(crate) async fn build_sst_reader( + region_id: RegionId, schema: RegionSchemaRef, sst_layer: AccessLayerRef, files: &[FileHandle], @@ -38,7 +40,7 @@ pub(crate) async fn build_sst_reader( let ts_col_unit = ts_col.data_type.as_timestamp().unwrap().unit(); let ts_col_name = ts_col.name.clone(); - ChunkReaderBuilder::new(schema, sst_layer) + ChunkReaderBuilder::new(region_id, schema, sst_layer) .pick_ssts(files) .filters( build_time_range_filter( @@ -139,6 +141,8 @@ mod tests { use crate::sst::{self, FileId, FileMeta, FsAccessLayer, Source, SstInfo, WriteOptions}; use crate::test_util::descriptor_util::RegionDescBuilder; + const REGION_ID: RegionId = 1; + fn schema_for_test() -> RegionSchemaRef { // Just build a region desc and use its columns metadata. let desc = RegionDescBuilder::new("test") @@ -277,7 +281,9 @@ mod tests { handle } + // The region id is only used to build the reader, we don't check its content. async fn check_reads( + region_id: RegionId, schema: RegionSchemaRef, sst_layer: AccessLayerRef, files: &[FileHandle], @@ -286,6 +292,7 @@ mod tests { expect: &[i64], ) { let mut reader = build_sst_reader( + region_id, schema, sst_layer, files, @@ -352,6 +359,7 @@ mod tests { let files = vec![file1, file2]; // read from two sst files with time range filter, check_reads( + REGION_ID, schema.clone(), sst_layer.clone(), &files, @@ -361,7 +369,7 @@ mod tests { ) .await; - check_reads(schema, sst_layer, &files, 1, 2, &[1000]).await; + check_reads(REGION_ID, schema, sst_layer, &files, 1, 2, &[1000]).await; } async fn read_file( @@ -370,7 +378,7 @@ mod tests { sst_layer: AccessLayerRef, ) -> Vec { let mut timestamps = vec![]; - let mut reader = build_sst_reader(schema, sst_layer, files, i64::MIN, i64::MAX) + let mut reader = build_sst_reader(REGION_ID, schema, sst_layer, files, i64::MIN, i64::MAX) .await .unwrap(); while let Some(chunk) = reader.next_chunk().await.unwrap() { @@ -434,15 +442,36 @@ mod tests { let sst_layer = Arc::new(FsAccessLayer::new("./", object_store.clone())); let input_files = vec![file2, file1]; - let reader1 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 0, 3) - .await - .unwrap(); - let reader2 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 3, 6) - .await - .unwrap(); - let reader3 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 6, 10) - .await - .unwrap(); + let reader1 = build_sst_reader( + REGION_ID, + schema.clone(), + sst_layer.clone(), + &input_files, + 0, + 3, + ) + .await + .unwrap(); + let reader2 = build_sst_reader( + REGION_ID, + schema.clone(), + sst_layer.clone(), + &input_files, + 3, + 6, + ) + .await + .unwrap(); + let reader3 = build_sst_reader( + REGION_ID, + schema.clone(), + sst_layer.clone(), + &input_files, + 6, + 10, + ) + .await + .unwrap(); let opts = WriteOptions { sst_write_buffer_size: ReadableSize::mb(8), diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index 8057069c66..d89e06dcb0 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -75,9 +75,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Iterates the memtable. fn iter(&self, ctx: IterContext) -> Result; - /// Returns the estimated bytes allocated by this memtable from heap. Result - /// of this method may be larger than the estimated based on [`num_rows`] because - /// of the implementor's pre-alloc behavior. + /// Returns the number of rows in the memtable. fn num_rows(&self) -> usize; /// Returns stats of this memtable. diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index 03d6d64cf8..f3a0e182ba 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -14,9 +14,10 @@ //! Common structs and utilities for read. +mod chain; mod dedup; mod merge; -pub(crate) mod windowed; +mod windowed; use std::cmp::Ordering; @@ -25,11 +26,13 @@ use common_base::BitVec; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::vectors::{BooleanVector, MutableVector, VectorRef}; -pub use dedup::DedupReader; -pub use merge::{MergeReader, MergeReaderBuilder}; use snafu::{ensure, ResultExt}; use crate::error::{self, Result}; +pub use crate::read::chain::ChainReader; +pub use crate::read::dedup::DedupReader; +pub use crate::read::merge::{MergeReader, MergeReaderBuilder}; +pub use crate::read::windowed::WindowedReader; /// Storage internal representation of a batch of rows. // Now the structure of `Batch` is still unstable, all pub fields may be changed. diff --git a/src/storage/src/read/chain.rs b/src/storage/src/read/chain.rs new file mode 100644 index 0000000000..5701682325 --- /dev/null +++ b/src/storage/src/read/chain.rs @@ -0,0 +1,124 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Result; +use crate::read::{Batch, BatchReader}; +use crate::schema::ProjectedSchemaRef; + +/// A reader that simply chain the outputs of input readers. +pub struct ChainReader { + /// Schema to read + pub schema: ProjectedSchemaRef, + /// Each reader reads a slice of time window + pub readers: Vec, +} + +impl ChainReader { + /// Returns a new [ChainReader] with specific input `readers`. + pub fn new(schema: ProjectedSchemaRef, mut readers: Vec) -> Self { + // Reverse readers since we iter them backward. + readers.reverse(); + Self { schema, readers } + } +} + +#[async_trait::async_trait] +impl BatchReader for ChainReader +where + R: BatchReader, +{ + async fn next_batch(&mut self) -> Result> { + while let Some(reader) = self.readers.last_mut() { + if let Some(batch) = reader.next_batch().await? { + return Ok(Some(batch)); + } else { + // Remove the exhausted reader. + self.readers.pop(); + } + } + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::read_util::{self, Batches, VecBatchReader}; + + fn build_chain_reader(sources: &[Batches]) -> ChainReader { + let schema = read_util::new_projected_schema(); + let readers = sources + .iter() + .map(|source| read_util::build_vec_reader(source)) + .collect(); + + ChainReader::new(schema, readers) + } + + async fn check_chain_reader_result( + mut reader: ChainReader, + input: &[Batches<'_>], + ) { + let expect: Vec<_> = input + .iter() + .flat_map(|v| v.iter()) + .flat_map(|v| v.iter().copied()) + .collect(); + + let result = read_util::collect_kv_batch(&mut reader).await; + assert_eq!(expect, result); + + // Call next_batch() again is allowed. + assert!(reader.next_batch().await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_chain_empty() { + let mut reader = build_chain_reader(&[]); + + assert!(reader.next_batch().await.unwrap().is_none()); + // Call next_batch() again is allowed. + assert!(reader.next_batch().await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_chain_one() { + let input: &[Batches] = &[&[ + &[(1, Some(1)), (2, Some(2))], + &[(3, Some(3)), (4, Some(4))], + &[(5, Some(5))], + ]]; + + let reader = build_chain_reader(input); + + check_chain_reader_result(reader, input).await; + } + + #[tokio::test] + async fn test_chain_multi() { + let input: &[Batches] = &[ + &[ + &[(1, Some(1)), (2, Some(2))], + &[(3, Some(3)), (4, Some(4))], + &[(5, Some(5))], + ], + &[&[(6, Some(3)), (7, Some(4)), (8, Some(8))], &[(9, Some(9))]], + &[&[(10, Some(10)), (11, Some(11))], &[(12, Some(12))]], + ]; + + let reader = build_chain_reader(input); + + check_chain_reader_result(reader, input).await; + } +} diff --git a/src/storage/src/read/merge.rs b/src/storage/src/read/merge.rs index 205e71cef1..713cb038d8 100644 --- a/src/storage/src/read/merge.rs +++ b/src/storage/src/read/merge.rs @@ -608,7 +608,7 @@ mod tests { use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; use super::*; - use crate::test_util::read_util; + use crate::test_util::read_util::{self, Batches}; #[tokio::test] async fn test_merge_reader_empty() { @@ -653,8 +653,6 @@ mod tests { assert!(output.contains("pos: 1")); } - type Batches<'a> = &'a [&'a [(i64, Option)]]; - fn build_merge_reader(sources: &[Batches], num_iter: usize, batch_size: usize) -> MergeReader { let schema = read_util::new_projected_schema(); let mut builder = diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 606ab8bc76..7095e12688 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -252,7 +252,7 @@ async fn test_flush_empty() { } #[tokio::test] -async fn test_read_after_flush() { +async fn test_read_after_flush_across_window() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("read-flush"); @@ -289,6 +289,44 @@ async fn test_read_after_flush() { assert_eq!(expect, output); } +#[tokio::test] +async fn test_read_after_flush_same_window() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("read-flush"); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = FlushTester::new(store_dir, flush_switch.clone()).await; + + // Put elements so we have content to flush. + tester.put(&[(1000, Some(100))]).await; + tester.put(&[(2000, Some(200))]).await; + + // Flush. + tester.flush(None).await; + + // Put element again. + tester.put(&[(1003, Some(300))]).await; + + let expect = vec![ + (1000, Some(100.to_string())), + (1003, Some(300.to_string())), + (2000, Some(200.to_string())), + ]; + + let output = tester.full_scan().await; + assert_eq!(expect, output); + + // Reopen + let mut tester = tester; + tester.reopen().await; + + // Scan after reopen. + let output = tester.full_scan().await; + assert_eq!(expect, output); +} + #[tokio::test] async fn test_merge_read_after_flush() { let dir = create_temp_dir("merge-read-flush"); diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 0399463275..4c3daac86a 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -53,15 +53,19 @@ impl Snapshot for SnapshotImpl { let mutables = memtable_version.mutable_memtable(); let immutables = memtable_version.immutable_memtables(); - let mut builder = - ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) - .reserve_num_memtables(memtable_version.num_memtables()) - .projection(request.projection) - .filters(request.filters) - .batch_size(ctx.batch_size) - .output_ordering(request.output_ordering) - .visible_sequence(visible_sequence) - .pick_memtables(mutables.clone()); + let mut builder = ChunkReaderBuilder::new( + self.version.metadata().id(), + self.version.schema().clone(), + self.sst_layer.clone(), + ) + .reserve_num_memtables(memtable_version.num_memtables()) + .projection(request.projection) + .filters(request.filters) + .batch_size(ctx.batch_size) + .output_ordering(request.output_ordering) + .visible_sequence(visible_sequence) + .pick_memtables(mutables.clone()) + .use_chain_reader(true); for memtable in immutables { builder = builder.pick_memtables(memtable.clone()); diff --git a/src/storage/src/test_util/read_util.rs b/src/storage/src/test_util/read_util.rs index d777b34f11..2d5476243c 100644 --- a/src/storage/src/test_util/read_util.rs +++ b/src/storage/src/test_util/read_util.rs @@ -92,6 +92,8 @@ pub async fn collect_kv_batch(reader: &mut dyn BatchReader) -> Vec<(i64, Option< result } +pub type Batches<'a> = &'a [&'a [(i64, Option)]]; + /// A reader for test that pop batch from Vec. pub struct VecBatchReader { schema: ProjectedSchemaRef,