feat: add WindowedReader (#1532)

* feat: add WindowedReader

* fix: some cr comments

* feat: filter memtable by timestamp range

* fix: add source in error variants

* fix: some CR comments

* refactor: filter memtable in MapIterWrapper

* fix: clippy
This commit is contained in:
Lei, HUANG
2023-05-17 17:34:29 +08:00
committed by GitHub
parent eb95a9e78b
commit e8c2222a76
11 changed files with 347 additions and 64 deletions

View File

@@ -180,6 +180,15 @@ impl Value {
}
}
/// Cast Value to timestamp. Return None if value is not a valid timestamp data type.
pub fn as_timestamp(&self) -> Option<Timestamp> {
match self {
Value::Int64(v) => Some(Timestamp::new_millisecond(*v)),
Value::Timestamp(t) => Some(*t),
_ => None,
}
}
/// Returns the logical type of the value.
pub fn logical_type_id(&self) -> LogicalTypeId {
match self {
@@ -250,15 +259,6 @@ impl Value {
Ok(scalar_value)
}
/// Casts Value to [Timestamp]. Returns None if it's not a valid timestamp value.
pub fn as_timestamp(&self) -> Option<Timestamp> {
match self {
Value::Int64(v) => Some(Timestamp::new_millisecond(*v)),
Value::Timestamp(t) => Some(*t),
_ => None,
}
}
}
fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue {

View File

@@ -75,7 +75,9 @@ impl LogStore for NoopLogStore {
_id: Id,
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
{
todo!()
Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
vec![],
)))))
}
async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {

View File

@@ -24,6 +24,7 @@ use table::predicate::{Predicate, TimeRangePredicateBuilder};
use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef};
use crate::read::windowed::WindowedReader;
use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions};
@@ -84,6 +85,7 @@ pub struct ChunkReaderBuilder {
iter_ctx: IterContext,
memtables: Vec<MemtableRef>,
files_to_read: Vec<FileHandle>,
time_windows: Option<Vec<TimestampRange>>,
}
impl ChunkReaderBuilder {
@@ -96,6 +98,7 @@ impl ChunkReaderBuilder {
iter_ctx: IterContext::default(),
memtables: Vec::new(),
files_to_read: Vec::new(),
time_windows: None,
}
}
@@ -151,24 +154,38 @@ impl ChunkReaderBuilder {
self
}
pub async fn build(mut self) -> Result<ChunkReaderImpl> {
let time_range_predicate = self.build_time_range_predicate();
debug!(
"Time range predicate for chunk reader: {:?}",
time_range_predicate
);
/// Set time windows for scan.
pub fn time_windows(mut self, windows: Vec<TimestampRange>) -> Self {
self.time_windows = Some(windows);
self
}
let schema = Arc::new(
ProjectedSchema::new(self.schema, self.projection)
.context(error::InvalidProjectionSnafu)?,
);
async fn build_windowed(
self,
schema: &ProjectedSchemaRef,
time_range_predicate: &TimestampRange,
windows: Vec<TimestampRange>,
) -> Result<BoxedBatchReader> {
let mut readers = Vec::with_capacity(windows.len());
for window in windows {
let time_range_predicate = time_range_predicate.and(&window);
let reader = self.build_reader(schema, &time_range_predicate).await?;
readers.push(reader);
}
let windowed_reader = WindowedReader::new(schema.clone(), readers);
Ok(Box::new(windowed_reader) as Box<_>)
}
async fn build_reader(
&self,
schema: &ProjectedSchemaRef,
time_range: &TimestampRange,
) -> Result<BoxedBatchReader> {
let num_sources = self.memtables.len() + self.files_to_read.len();
let mut reader_builder = MergeReaderBuilder::with_capacity(schema.clone(), num_sources)
.batch_size(self.iter_ctx.batch_size);
self.iter_ctx.projected_schema = Some(schema.clone());
for mem in self.memtables {
for mem in &self.memtables {
let iter = mem.iter(&self.iter_ctx)?;
reader_builder = reader_builder.push_batch_iter(iter);
}
@@ -176,26 +193,39 @@ impl ChunkReaderBuilder {
let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,
projected_schema: schema.clone(),
predicate: Predicate::new(self.filters),
time_range: time_range_predicate,
predicate: Predicate::new(self.filters.clone()),
time_range: *time_range,
};
for file in &self.files_to_read {
if !Self::file_in_range(file, time_range_predicate) {
debug!(
"Skip file {:?}, predicate: {:?}",
file, time_range_predicate
);
if !Self::file_in_range(file, time_range) {
debug!("Skip file {:?}, predicate: {:?}", file, time_range);
continue;
}
let reader = self.sst_layer.read_sst(file.clone(), &read_opts).await?;
reader_builder = reader_builder.push_batch_reader(reader);
}
let reader = reader_builder.build();
let reader = DedupReader::new(schema.clone(), reader);
Ok(Box::new(reader) as Box<_>)
}
Ok(ChunkReaderImpl::new(schema, Box::new(reader)))
pub async fn build(mut self) -> Result<ChunkReaderImpl> {
let time_range_predicate = self.build_time_range_predicate();
let schema = Arc::new(
ProjectedSchema::new(self.schema.clone(), self.projection.clone())
.context(error::InvalidProjectionSnafu)?,
);
self.iter_ctx.projected_schema = Some(schema.clone());
let reader = if let Some(windows) = self.time_windows.take() {
self.build_windowed(&schema, &time_range_predicate, windows)
.await?
} else {
self.build_reader(&schema, &time_range_predicate).await?
};
Ok(ChunkReaderImpl::new(schema, reader))
}
/// Build time range predicate from schema and filters.
@@ -205,14 +235,13 @@ impl ChunkReaderBuilder {
}
/// Check if SST file's time range matches predicate.
#[inline]
fn file_in_range(file: &FileHandle, predicate: TimestampRange) -> bool {
if predicate == TimestampRange::min_to_max() {
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
return true;
}
// end_timestamp of sst file is inclusive.
let Some((start, end)) = *file.time_range() else { return true; };
let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
file_ts_range.intersects(&predicate)
file_ts_range.intersects(predicate)
}
}

View File

@@ -499,6 +499,24 @@ pub enum Error {
#[snafu(backtrace)]
source: RuntimeError,
},
#[snafu(display("Failed to convert columns to rows, source: {}", source))]
ConvertColumnsToRows {
source: ArrowError,
location: Location,
},
#[snafu(display("Failed to sort arrays, source: {}", source))]
SortArrays {
source: ArrowError,
location: Location,
},
#[snafu(display("Failed to sort arrays, source: {}", source))]
SelectRows {
source: ArrowError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -596,6 +614,8 @@ impl ErrorExt for Error {
| StopPickTask { .. } => StatusCode::Unexpected,
TtlCalculation { source, .. } => source.status_code(),
ConvertColumnsToRows { .. } | SortArrays { .. } => StatusCode::Unexpected,
SelectRows { .. } => StatusCode::Unexpected,
}
}

View File

@@ -21,6 +21,7 @@ mod version;
use std::sync::atomic::{AtomicI64, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use common_time::range::TimestampRange;
use datatypes::vectors::VectorRef;
use store_api::storage::{consts, OpType, SequenceNumber};
@@ -110,6 +111,9 @@ pub struct IterContext {
///
/// Set to `None` to read all columns.
pub projected_schema: Option<ProjectedSchemaRef>,
/// Timestamp range
pub time_range: Option<TimestampRange>,
}
impl Default for IterContext {
@@ -120,6 +124,7 @@ impl Default for IterContext {
visible_sequence: SequenceNumber::MAX,
for_flush: false,
projected_schema: None,
time_range: None,
}
}
}

View File

@@ -19,6 +19,7 @@ use std::ops::Bound;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, RwLock};
use common_time::range::TimestampRange;
use datatypes::data_type::DataType;
use datatypes::prelude::*;
use datatypes::value::Value;
@@ -217,7 +218,7 @@ impl BTreeIterator {
let (keys, sequences, op_types, values) = if self.ctx.for_flush {
collect_iter(iter, self.ctx.batch_size)
} else {
let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence);
let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence, self.ctx.time_range);
collect_iter(iter, self.ctx.batch_size)
};
@@ -289,23 +290,26 @@ struct MapIterWrapper<'a, InnerKey, RowValue> {
iter: btree_map::Range<'a, InnerKey, RowValue>,
prev_key: Option<InnerKey>,
visible_sequence: SequenceNumber,
time_range: Option<TimestampRange>,
}
impl<'a> MapIterWrapper<'a, InnerKey, RowValue> {
fn new(
iter: btree_map::Range<'a, InnerKey, RowValue>,
visible_sequence: SequenceNumber,
time_range: Option<TimestampRange>,
) -> MapIterWrapper<'a, InnerKey, RowValue> {
MapIterWrapper {
iter,
prev_key: None,
visible_sequence,
time_range,
}
}
fn next_visible_entry(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> {
for (k, v) in self.iter.by_ref() {
if k.is_visible(self.visible_sequence) {
if k.is_visible(self.visible_sequence) && k.is_in_time_range(&self.time_range) {
return Some((k, v));
}
}
@@ -447,6 +451,17 @@ impl InnerKey {
self.sequence <= sequence
}
#[inline]
fn is_in_time_range(&self, range: &Option<TimestampRange>) -> bool {
let Some(range) = range else { return true; };
range.contains(
&self
.timestamp()
.as_timestamp()
.expect("Timestamp field must be a valid timestamp value"),
)
}
/// Reset the `InnerKey` so that we can use it to seek next key that
/// has different row key.
fn reset_for_seek(&mut self) {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::Timestamp;
use datatypes::prelude::*;
use datatypes::timestamp::TimestampMillisecond;
use datatypes::type_id::LogicalTypeId;
@@ -436,6 +437,7 @@ fn test_sequence_visibility() {
visible_sequence: 9,
for_flush: false,
projected_schema: None,
time_range: None,
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
@@ -454,6 +456,7 @@ fn test_sequence_visibility() {
visible_sequence: 10,
for_flush: false,
projected_schema: None,
time_range: None,
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
@@ -472,6 +475,7 @@ fn test_sequence_visibility() {
visible_sequence: 11,
for_flush: false,
projected_schema: None,
time_range: None,
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
@@ -510,6 +514,40 @@ fn test_iter_after_none() {
});
}
#[test]
fn test_filter_memtable() {
let tester = MemtableTester::default();
tester.run_testcase(|ctx| {
write_kvs(
&*ctx.memtable,
10, // sequence
OpType::Put,
&[1000, 1001, 1002], // keys
&[(Some(0), None), (Some(1), None), (Some(2), None)], // values
);
let iter_ctx = IterContext {
batch_size: 4,
time_range: Some(
TimestampRange::new(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1001),
)
.unwrap(),
),
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(5, batch.columns.len());
assert_eq!(
Arc::new(TimestampMillisecondVector::from_slice([1000])) as Arc<_>,
batch.columns[0]
);
});
}
#[test]
fn test_memtable_projection() {
let tester = MemtableTester::default();

View File

@@ -16,6 +16,7 @@
mod dedup;
mod merge;
pub(crate) mod windowed;
use std::cmp::Ordering;
@@ -258,3 +259,10 @@ pub trait BatchReader: Send {
/// Pointer to [BatchReader].
pub type BoxedBatchReader = Box<dyn BatchReader>;
#[async_trait::async_trait]
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
(**self).next_batch().await
}
}

View File

@@ -0,0 +1,140 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::compute::SortOptions;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, ArrayRef};
use datatypes::data_type::DataType;
use datatypes::vectors::Helper;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::read::{Batch, BatchReader};
use crate::schema::ProjectedSchemaRef;
/// [WindowedReader] provides a windowed record batch reader that scans all rows within a window
/// at a time and sort these rows ordered in `[<timestamp>, <PK>]` order.
pub struct WindowedReader<R> {
/// Schema to read
pub schema: ProjectedSchemaRef,
/// Each reader reads a slice of time window
pub readers: Vec<R>,
}
impl<R> WindowedReader<R> {
pub fn new(schema: ProjectedSchemaRef, readers: Vec<R>) -> Self {
Self { schema, readers }
}
}
#[async_trait::async_trait]
impl<R> BatchReader for WindowedReader<R>
where
R: BatchReader,
{
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let Some(mut reader) = self.readers.pop() else { return Ok(None); };
let store_schema = self.schema.schema_to_read();
let mut batches = vec![];
while let Some(batch) = reader.next_batch().await? {
batches.push(
batch
.columns
.into_iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
);
}
let Some(num_columns) = batches.get(0).map(|b| b.len()) else {
return Ok(Some(Batch::new(vec![])));
};
let mut vectors_in_batch = Vec::with_capacity(num_columns);
for idx in 0..num_columns {
let columns: Vec<&dyn Array> =
batches.iter().map(|b| b[idx].as_ref()).collect::<Vec<_>>();
vectors_in_batch
.push(arrow::compute::concat(&columns).context(error::ConvertColumnsToRowsSnafu)?);
}
let sorted = sort_by_rows(&self.schema, vectors_in_batch)?;
let vectors = sorted
.iter()
.zip(store_schema.columns().iter().map(|c| &c.desc.name))
.map(|(arr, name)| {
Helper::try_into_vector(arr).context(error::ConvertChunkSnafu { name })
})
.collect::<Result<_>>()?;
Ok(Some(Batch::new(vectors)))
}
}
fn sort_by_rows(schema: &ProjectedSchemaRef, arrays: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
let sort_columns = build_sort_columns(schema);
let store_schema = schema.schema_to_read();
// Convert columns to rows to speed lexicographic sort
// TODO(hl): maybe optimize to lexsort_to_index when only timestamp column is involved.
let mut row_converter = RowConverter::new(
sort_columns
.iter()
.map(|(idx, descending)| {
SortField::new_with_options(
store_schema.columns()[*idx].desc.data_type.as_arrow_type(),
SortOptions {
descending: *descending,
nulls_first: true,
},
)
})
.collect(),
)
.context(error::ConvertColumnsToRowsSnafu)?;
let columns_to_sort = sort_columns
.into_iter()
.map(|(idx, _)| arrays[idx].clone())
.collect::<Vec<_>>();
let rows_to_sort = row_converter
.convert_columns(&columns_to_sort)
.context(error::ConvertColumnsToRowsSnafu)?;
let mut sort_pairs = rows_to_sort.iter().enumerate().collect::<Vec<_>>();
sort_pairs.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
let idx =
arrow::array::UInt32Array::from_iter_values(sort_pairs.iter().map(|(i, _)| *i as u32));
let sorted = arrays
.iter()
.map(|arr| arrow::compute::take(arr, &idx, None))
.collect::<arrow::error::Result<Vec<_>>>()
.context(error::SortArraysSnafu)?;
debug_assert_eq!(sorted.len(), store_schema.num_columns());
Ok(sorted)
}
/// [<PK_1>, <PK_2>, TS] to [TS, <PK_1>, <PK_2>].
/// Returns a vector of sort column indices and sort orders (true means descending order).
fn build_sort_columns(schema: &ProjectedSchemaRef) -> Vec<(usize, bool)> {
let ts_col_index = schema.schema_to_read().timestamp_index();
let mut res = (0..(ts_col_index))
.map(|idx| (idx, false))
.collect::<Vec<_>>();
res.insert(0, (ts_col_index, true));
res
}

View File

@@ -79,6 +79,13 @@ impl StoreSchema {
self.row_key_end
}
/// Returns the index of timestamp column.
/// We always assume that timestamp is the last column in [StoreSchema].
#[inline]
pub fn timestamp_index(&self) -> usize {
self.row_key_end - 1
}
pub(crate) fn contains_column(&self, name: &str) -> bool {
self.schema.column_schema_by_name(name).is_some()
}
@@ -155,11 +162,6 @@ impl StoreSchema {
self.user_column_end + 1
}
#[inline]
pub(crate) fn timestamp_index(&self) -> usize {
self.row_key_end - 1
}
#[inline]
pub(crate) fn row_key_indices(&self) -> impl Iterator<Item = usize> {
0..self.row_key_end

View File

@@ -15,6 +15,7 @@
use std::cmp;
use async_trait::async_trait;
use common_time::range::TimestampRange;
use store_api::storage::{
GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, SequenceNumber,
Snapshot,
@@ -47,28 +48,7 @@ impl Snapshot for SnapshotImpl {
ctx: &ReadContext,
request: ScanRequest,
) -> Result<ScanResponse<ChunkReaderImpl>> {
let visible_sequence = self.sequence_to_read(request.sequence);
let memtable_version = self.version.memtables();
let mutables = memtable_version.mutable_memtable();
let immutables = memtable_version.immutable_memtables();
let mut builder =
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
.reserve_num_memtables(memtable_version.num_memtables())
.projection(request.projection)
.filters(request.filters)
.batch_size(ctx.batch_size)
.visible_sequence(visible_sequence)
.pick_memtables(mutables.clone());
for memtable in immutables {
builder = builder.pick_memtables(memtable.clone());
}
let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?;
Ok(ScanResponse { reader })
self.scan_raw(ctx, request, None).await
}
async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result<GetResponse> {
@@ -95,4 +75,48 @@ impl SnapshotImpl {
.map(|s| cmp::min(s, self.visible_sequence))
.unwrap_or(self.visible_sequence)
}
#[allow(unused)]
pub(crate) async fn windowed_scan(
&self,
ctx: &ReadContext,
request: ScanRequest,
windows: Vec<TimestampRange>,
) -> Result<ScanResponse<ChunkReaderImpl>> {
self.scan_raw(ctx, request, Some(windows)).await
}
async fn scan_raw(
&self,
ctx: &ReadContext,
request: ScanRequest,
windows: Option<Vec<TimestampRange>>,
) -> Result<ScanResponse<ChunkReaderImpl>> {
let visible_sequence = self.sequence_to_read(request.sequence);
let memtable_version = self.version.memtables();
let mutables = memtable_version.mutable_memtable();
let immutables = memtable_version.immutable_memtables();
let mut builder =
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
.reserve_num_memtables(memtable_version.num_memtables())
.projection(request.projection)
.filters(request.filters)
.batch_size(ctx.batch_size)
.visible_sequence(visible_sequence)
.pick_memtables(mutables.clone());
if let Some(windows) = windows {
builder = builder.time_windows(windows);
}
for memtable in immutables {
builder = builder.pick_memtables(memtable.clone());
}
let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?;
Ok(ScanResponse { reader })
}
}