feat: Implement a new scan mode using a chain reader (#1857)

* feat: add log

* feat: print more info

* feat: use chain reader

* fix: panic on getting first range

* fix: prev not updated

* fix: reverse readers and iter backward

* chore: don't print windows in log

* feat: consider memtable range

Also fix the issue that using incorrect comparision method to sort time
ranges.

* fix: merge memtable window with sst's

* feat: add use_chain_reader option

* feat: skip empty memtables

* chore: change log level

* fix: memtable range not ordered

* style: fix clippy

* chore: address review comments

* chore: print region id in log
This commit is contained in:
Yingwen
2023-07-04 17:01:34 +09:00
committed by GitHub
parent 746fe8b4fe
commit b8e92292d2
10 changed files with 390 additions and 39 deletions

View File

@@ -17,16 +17,17 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::OrderOption;
use common_telemetry::debug;
use common_telemetry::logging;
use common_time::range::TimestampRange;
use snafu::ResultExt;
use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber};
use store_api::storage::{Chunk, ChunkReader, RegionId, SchemaRef, SequenceNumber};
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef};
use crate::read::windowed::WindowedReader;
use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::read::{
Batch, BoxedBatchReader, ChainReader, DedupReader, MergeReaderBuilder, WindowedReader,
};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions};
use crate::window_infer::{PlainWindowInference, WindowInfer};
@@ -90,6 +91,7 @@ impl ChunkReaderImpl {
/// Builder to create a new [ChunkReaderImpl] from scan request.
pub struct ChunkReaderBuilder {
region_id: RegionId,
schema: RegionSchemaRef,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
@@ -98,11 +100,13 @@ pub struct ChunkReaderBuilder {
memtables: Vec<MemtableRef>,
files_to_read: Vec<FileHandle>,
output_ordering: Option<Vec<OrderOption>>,
use_chain_reader: bool,
}
impl ChunkReaderBuilder {
pub fn new(schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Self {
pub fn new(region_id: RegionId, schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Self {
ChunkReaderBuilder {
region_id,
schema,
projection: None,
filters: vec![],
@@ -111,6 +115,7 @@ impl ChunkReaderBuilder {
memtables: Vec::new(),
files_to_read: Vec::new(),
output_ordering: None,
use_chain_reader: false,
}
}
@@ -150,6 +155,15 @@ impl ChunkReaderBuilder {
self
}
/// Partition files and memtables according to their time windows and scan time windows
/// one by one.
///
/// Note that compaction should not enable this.
pub fn use_chain_reader(mut self, use_chain_reader: bool) -> Self {
self.use_chain_reader = use_chain_reader;
self
}
/// Picks all SSTs in all levels
pub fn pick_all_ssts(mut self, ssts: &LevelMetas) -> Result<Self> {
let files = ssts.levels().iter().flat_map(|level| level.files());
@@ -183,7 +197,12 @@ impl ChunkReaderBuilder {
if name != self.schema.timestamp_column_name() {
return None;
}
let memtable_stats = self.memtables.iter().map(|m| m.stats()).collect::<Vec<_>>();
let memtable_stats = self
.memtables
.iter()
.filter(|m| m.num_rows() > 0) // Skip empty memtables.
.map(|m| m.stats())
.collect::<Vec<_>>();
let files = self
.files_to_read
.iter()
@@ -238,15 +257,32 @@ impl ChunkReaderBuilder {
predicate,
time_range: *time_range,
};
let mut num_read_files = 0;
for file in &self.files_to_read {
if !Self::file_in_range(file, time_range) {
debug!("Skip file {:?}, predicate: {:?}", file, time_range);
logging::debug!(
"Skip region {} file {:?}, predicate: {:?}",
self.region_id,
file,
time_range
);
continue;
}
let reader = self.sst_layer.read_sst(file.clone(), &read_opts).await?;
reader_builder = reader_builder.push_batch_reader(reader);
num_read_files += 1;
}
logging::debug!(
"build reader done, region_id: {}, time_range: {:?}, total_files: {}, num_read_files: {}",
self.region_id,
time_range,
self.files_to_read.len(),
num_read_files,
);
let reader = reader_builder.build();
let reader = DedupReader::new(schema.clone(), reader);
Ok(Box::new(reader) as Box<_>)
@@ -266,6 +302,8 @@ impl ChunkReaderBuilder {
output_ordering = Some(ordering.clone());
self.build_windowed(&schema, &time_range_predicate, windows, ordering)
.await?
} else if self.use_chain_reader {
self.build_chained(&schema, &time_range_predicate).await?
} else {
self.build_reader(&schema, &time_range_predicate).await?
};
@@ -273,8 +311,41 @@ impl ChunkReaderBuilder {
Ok(ChunkReaderImpl::new(schema, reader, output_ordering))
}
async fn build_chained(
&self,
schema: &ProjectedSchemaRef,
time_range: &TimestampRange,
) -> Result<BoxedBatchReader> {
let windows = self.infer_window_for_chain_reader(time_range);
logging::debug!(
"Infer window for chain reader, region_id: {}, memtables: {}, files: {}, num_windows: {}",
self.region_id,
self.memtables.len(),
self.files_to_read.len(),
windows.len(),
);
let mut readers = Vec::with_capacity(windows.len());
for window in &windows {
let time_range = time_range.and(window);
let reader = self.build_reader(schema, &time_range).await?;
readers.push(reader);
}
logging::debug!(
"Build chain reader, region_id: {}, time_range: {:?}, num_readers: {}",
self.region_id,
time_range,
readers.len(),
);
let chain_reader = ChainReader::new(schema.clone(), readers);
Ok(Box::new(chain_reader) as Box<_>)
}
/// Build time range predicate from schema and filters.
pub fn build_time_range_predicate(&self) -> TimestampRange {
fn build_time_range_predicate(&self) -> TimestampRange {
let Some(ts_col) = self.schema.user_schema().timestamp_column() else { return TimestampRange::min_to_max() };
let unit = ts_col
.data_type
@@ -294,4 +365,87 @@ impl ChunkReaderBuilder {
let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
file_ts_range.intersects(predicate)
}
/// Returns the time range of memtables to read.
fn compute_memtable_range(&self) -> Option<TimestampRange> {
let (min_timestamp, max_timestamp) = self
.memtables
.iter()
.filter(|m| m.num_rows() > 0) // Skip empty memtables.
.map(|m| {
let stats = m.stats();
(stats.min_timestamp, stats.max_timestamp)
})
.reduce(|acc, e| (acc.0.min(e.0), acc.1.max(e.1)))?;
logging::debug!(
"Compute memtable range, region_id: {}, min: {:?}, max: {:?}",
self.region_id,
min_timestamp,
max_timestamp,
);
Some(TimestampRange::new_inclusive(
Some(min_timestamp),
Some(max_timestamp),
))
}
/// Infer time window for chain reader according to the time range of memtables and files.
fn infer_window_for_chain_reader(&self, time_range: &TimestampRange) -> Vec<TimestampRange> {
let mut memtable_range = self.compute_memtable_range();
// file ranges: (start, end)
let mut file_ranges = Vec::with_capacity(self.files_to_read.len());
for file in &self.files_to_read {
if !Self::file_in_range(file, time_range) || file.time_range().is_none() {
continue;
}
// Safety: we have skip files whose range is `None`.
let range = file.time_range().unwrap();
// Filter by memtable's time range.
if let Some(mem_range) = &mut memtable_range {
let file_range = TimestampRange::new_inclusive(Some(range.0), Some(range.1));
if mem_range.intersects(&file_range) {
// If the range of the SST intersects with the range of the
// memtable, we merge it into the memtable's range.
*mem_range = mem_range.or(&file_range);
continue;
}
}
file_ranges.push((range.0, range.1));
}
if file_ranges.is_empty() {
return memtable_range.map(|range| vec![range]).unwrap_or_default();
}
// Sort by start times.
file_ranges.sort_unstable_by(|left, right| left.0.cmp(&right.0));
// Compute ranges for all SSTs.
let mut time_ranges = Vec::with_capacity(file_ranges.len() + 1);
// Safety: file_ranges is not empty.
let mut prev =
TimestampRange::new_inclusive(Some(file_ranges[0].0), Some(file_ranges[0].1));
for file_range in &file_ranges[1..] {
let current = TimestampRange::new_inclusive(Some(file_range.0), Some(file_range.1));
if prev.intersects(&current) {
prev = prev.or(&current);
} else {
time_ranges.push(prev);
prev = current;
}
}
time_ranges.push(prev);
if let Some(mem_range) = memtable_range {
time_ranges.push(mem_range);
// We have pushed the memtable range, resort the array.
time_ranges.sort_unstable_by(|left, right| left.start().cmp(right.start()));
}
time_ranges
}
}

View File

@@ -187,6 +187,7 @@ impl CompactionOutput {
sst_write_buffer_size: ReadableSize,
) -> Result<Option<FileMeta>> {
let reader = build_sst_reader(
region_id,
schema,
sst_layer.clone(),
&self.inputs,

View File

@@ -16,6 +16,7 @@ use common_query::logical_plan::{DfExpr, Expr};
use common_time::timestamp::TimeUnit;
use datafusion_expr::Operator;
use datatypes::value::timestamp_to_scalar_value;
use store_api::storage::RegionId;
use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl};
use crate::error;
@@ -24,6 +25,7 @@ use crate::sst::{AccessLayerRef, FileHandle};
/// Builds an SST reader that only reads rows within given time range.
pub(crate) async fn build_sst_reader(
region_id: RegionId,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
files: &[FileHandle],
@@ -38,7 +40,7 @@ pub(crate) async fn build_sst_reader(
let ts_col_unit = ts_col.data_type.as_timestamp().unwrap().unit();
let ts_col_name = ts_col.name.clone();
ChunkReaderBuilder::new(schema, sst_layer)
ChunkReaderBuilder::new(region_id, schema, sst_layer)
.pick_ssts(files)
.filters(
build_time_range_filter(
@@ -139,6 +141,8 @@ mod tests {
use crate::sst::{self, FileId, FileMeta, FsAccessLayer, Source, SstInfo, WriteOptions};
use crate::test_util::descriptor_util::RegionDescBuilder;
const REGION_ID: RegionId = 1;
fn schema_for_test() -> RegionSchemaRef {
// Just build a region desc and use its columns metadata.
let desc = RegionDescBuilder::new("test")
@@ -277,7 +281,9 @@ mod tests {
handle
}
// The region id is only used to build the reader, we don't check its content.
async fn check_reads(
region_id: RegionId,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
files: &[FileHandle],
@@ -286,6 +292,7 @@ mod tests {
expect: &[i64],
) {
let mut reader = build_sst_reader(
region_id,
schema,
sst_layer,
files,
@@ -352,6 +359,7 @@ mod tests {
let files = vec![file1, file2];
// read from two sst files with time range filter,
check_reads(
REGION_ID,
schema.clone(),
sst_layer.clone(),
&files,
@@ -361,7 +369,7 @@ mod tests {
)
.await;
check_reads(schema, sst_layer, &files, 1, 2, &[1000]).await;
check_reads(REGION_ID, schema, sst_layer, &files, 1, 2, &[1000]).await;
}
async fn read_file(
@@ -370,7 +378,7 @@ mod tests {
sst_layer: AccessLayerRef,
) -> Vec<i64> {
let mut timestamps = vec![];
let mut reader = build_sst_reader(schema, sst_layer, files, i64::MIN, i64::MAX)
let mut reader = build_sst_reader(REGION_ID, schema, sst_layer, files, i64::MIN, i64::MAX)
.await
.unwrap();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
@@ -434,15 +442,36 @@ mod tests {
let sst_layer = Arc::new(FsAccessLayer::new("./", object_store.clone()));
let input_files = vec![file2, file1];
let reader1 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 0, 3)
.await
.unwrap();
let reader2 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 3, 6)
.await
.unwrap();
let reader3 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 6, 10)
.await
.unwrap();
let reader1 = build_sst_reader(
REGION_ID,
schema.clone(),
sst_layer.clone(),
&input_files,
0,
3,
)
.await
.unwrap();
let reader2 = build_sst_reader(
REGION_ID,
schema.clone(),
sst_layer.clone(),
&input_files,
3,
6,
)
.await
.unwrap();
let reader3 = build_sst_reader(
REGION_ID,
schema.clone(),
sst_layer.clone(),
&input_files,
6,
10,
)
.await
.unwrap();
let opts = WriteOptions {
sst_write_buffer_size: ReadableSize::mb(8),

View File

@@ -75,9 +75,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Iterates the memtable.
fn iter(&self, ctx: IterContext) -> Result<BoxedBatchIterator>;
/// Returns the estimated bytes allocated by this memtable from heap. Result
/// of this method may be larger than the estimated based on [`num_rows`] because
/// of the implementor's pre-alloc behavior.
/// Returns the number of rows in the memtable.
fn num_rows(&self) -> usize;
/// Returns stats of this memtable.

View File

@@ -14,9 +14,10 @@
//! Common structs and utilities for read.
mod chain;
mod dedup;
mod merge;
pub(crate) mod windowed;
mod windowed;
use std::cmp::Ordering;
@@ -25,11 +26,13 @@ use common_base::BitVec;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{BooleanVector, MutableVector, VectorRef};
pub use dedup::DedupReader;
pub use merge::{MergeReader, MergeReaderBuilder};
use snafu::{ensure, ResultExt};
use crate::error::{self, Result};
pub use crate::read::chain::ChainReader;
pub use crate::read::dedup::DedupReader;
pub use crate::read::merge::{MergeReader, MergeReaderBuilder};
pub use crate::read::windowed::WindowedReader;
/// Storage internal representation of a batch of rows.
// Now the structure of `Batch` is still unstable, all pub fields may be changed.

View File

@@ -0,0 +1,124 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::read::{Batch, BatchReader};
use crate::schema::ProjectedSchemaRef;
/// A reader that simply chain the outputs of input readers.
pub struct ChainReader<R> {
/// Schema to read
pub schema: ProjectedSchemaRef,
/// Each reader reads a slice of time window
pub readers: Vec<R>,
}
impl<R> ChainReader<R> {
/// Returns a new [ChainReader] with specific input `readers`.
pub fn new(schema: ProjectedSchemaRef, mut readers: Vec<R>) -> Self {
// Reverse readers since we iter them backward.
readers.reverse();
Self { schema, readers }
}
}
#[async_trait::async_trait]
impl<R> BatchReader for ChainReader<R>
where
R: BatchReader,
{
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(reader) = self.readers.last_mut() {
if let Some(batch) = reader.next_batch().await? {
return Ok(Some(batch));
} else {
// Remove the exhausted reader.
self.readers.pop();
}
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::read_util::{self, Batches, VecBatchReader};
fn build_chain_reader(sources: &[Batches]) -> ChainReader<VecBatchReader> {
let schema = read_util::new_projected_schema();
let readers = sources
.iter()
.map(|source| read_util::build_vec_reader(source))
.collect();
ChainReader::new(schema, readers)
}
async fn check_chain_reader_result(
mut reader: ChainReader<VecBatchReader>,
input: &[Batches<'_>],
) {
let expect: Vec<_> = input
.iter()
.flat_map(|v| v.iter())
.flat_map(|v| v.iter().copied())
.collect();
let result = read_util::collect_kv_batch(&mut reader).await;
assert_eq!(expect, result);
// Call next_batch() again is allowed.
assert!(reader.next_batch().await.unwrap().is_none());
}
#[tokio::test]
async fn test_chain_empty() {
let mut reader = build_chain_reader(&[]);
assert!(reader.next_batch().await.unwrap().is_none());
// Call next_batch() again is allowed.
assert!(reader.next_batch().await.unwrap().is_none());
}
#[tokio::test]
async fn test_chain_one() {
let input: &[Batches] = &[&[
&[(1, Some(1)), (2, Some(2))],
&[(3, Some(3)), (4, Some(4))],
&[(5, Some(5))],
]];
let reader = build_chain_reader(input);
check_chain_reader_result(reader, input).await;
}
#[tokio::test]
async fn test_chain_multi() {
let input: &[Batches] = &[
&[
&[(1, Some(1)), (2, Some(2))],
&[(3, Some(3)), (4, Some(4))],
&[(5, Some(5))],
],
&[&[(6, Some(3)), (7, Some(4)), (8, Some(8))], &[(9, Some(9))]],
&[&[(10, Some(10)), (11, Some(11))], &[(12, Some(12))]],
];
let reader = build_chain_reader(input);
check_chain_reader_result(reader, input).await;
}
}

View File

@@ -608,7 +608,7 @@ mod tests {
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use super::*;
use crate::test_util::read_util;
use crate::test_util::read_util::{self, Batches};
#[tokio::test]
async fn test_merge_reader_empty() {
@@ -653,8 +653,6 @@ mod tests {
assert!(output.contains("pos: 1"));
}
type Batches<'a> = &'a [&'a [(i64, Option<i64>)]];
fn build_merge_reader(sources: &[Batches], num_iter: usize, batch_size: usize) -> MergeReader {
let schema = read_util::new_projected_schema();
let mut builder =

View File

@@ -252,7 +252,7 @@ async fn test_flush_empty() {
}
#[tokio::test]
async fn test_read_after_flush() {
async fn test_read_after_flush_across_window() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("read-flush");
@@ -289,6 +289,44 @@ async fn test_read_after_flush() {
assert_eq!(expect, output);
}
#[tokio::test]
async fn test_read_after_flush_same_window() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("read-flush");
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
// Put elements so we have content to flush.
tester.put(&[(1000, Some(100))]).await;
tester.put(&[(2000, Some(200))]).await;
// Flush.
tester.flush(None).await;
// Put element again.
tester.put(&[(1003, Some(300))]).await;
let expect = vec![
(1000, Some(100.to_string())),
(1003, Some(300.to_string())),
(2000, Some(200.to_string())),
];
let output = tester.full_scan().await;
assert_eq!(expect, output);
// Reopen
let mut tester = tester;
tester.reopen().await;
// Scan after reopen.
let output = tester.full_scan().await;
assert_eq!(expect, output);
}
#[tokio::test]
async fn test_merge_read_after_flush() {
let dir = create_temp_dir("merge-read-flush");

View File

@@ -53,15 +53,19 @@ impl Snapshot for SnapshotImpl {
let mutables = memtable_version.mutable_memtable();
let immutables = memtable_version.immutable_memtables();
let mut builder =
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
.reserve_num_memtables(memtable_version.num_memtables())
.projection(request.projection)
.filters(request.filters)
.batch_size(ctx.batch_size)
.output_ordering(request.output_ordering)
.visible_sequence(visible_sequence)
.pick_memtables(mutables.clone());
let mut builder = ChunkReaderBuilder::new(
self.version.metadata().id(),
self.version.schema().clone(),
self.sst_layer.clone(),
)
.reserve_num_memtables(memtable_version.num_memtables())
.projection(request.projection)
.filters(request.filters)
.batch_size(ctx.batch_size)
.output_ordering(request.output_ordering)
.visible_sequence(visible_sequence)
.pick_memtables(mutables.clone())
.use_chain_reader(true);
for memtable in immutables {
builder = builder.pick_memtables(memtable.clone());

View File

@@ -92,6 +92,8 @@ pub async fn collect_kv_batch(reader: &mut dyn BatchReader) -> Vec<(i64, Option<
result
}
pub type Batches<'a> = &'a [&'a [(i64, Option<i64>)]];
/// A reader for test that pop batch from Vec.
pub struct VecBatchReader {
schema: ProjectedSchemaRef,