From e8c2222a76dbe93f16d1af57933b026e6450eb5f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 17 May 2023 17:34:29 +0800 Subject: [PATCH] feat: add WindowedReader (#1532) * feat: add WindowedReader * fix: some cr comments * feat: filter memtable by timestamp range * fix: add source in error variants * fix: some CR comments * refactor: filter memtable in MapIterWrapper * fix: clippy --- src/datatypes/src/value.rs | 18 ++-- src/log-store/src/noop.rs | 4 +- src/storage/src/chunk.rs | 79 +++++++++++------ src/storage/src/error.rs | 20 +++++ src/storage/src/memtable.rs | 5 ++ src/storage/src/memtable/btree.rs | 19 +++- src/storage/src/memtable/tests.rs | 38 ++++++++ src/storage/src/read.rs | 8 ++ src/storage/src/read/windowed.rs | 140 ++++++++++++++++++++++++++++++ src/storage/src/schema/store.rs | 12 +-- src/storage/src/snapshot.rs | 68 ++++++++++----- 11 files changed, 347 insertions(+), 64 deletions(-) create mode 100644 src/storage/src/read/windowed.rs diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 51044c96f0..733b853808 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -180,6 +180,15 @@ impl Value { } } + /// Cast Value to timestamp. Return None if value is not a valid timestamp data type. + pub fn as_timestamp(&self) -> Option { + match self { + Value::Int64(v) => Some(Timestamp::new_millisecond(*v)), + Value::Timestamp(t) => Some(*t), + _ => None, + } + } + /// Returns the logical type of the value. pub fn logical_type_id(&self) -> LogicalTypeId { match self { @@ -250,15 +259,6 @@ impl Value { Ok(scalar_value) } - - /// Casts Value to [Timestamp]. Returns None if it's not a valid timestamp value. - pub fn as_timestamp(&self) -> Option { - match self { - Value::Int64(v) => Some(Timestamp::new_millisecond(*v)), - Value::Timestamp(t) => Some(*t), - _ => None, - } - } } fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue { diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 842156fe4f..1b7ecda840 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -75,7 +75,9 @@ impl LogStore for NoopLogStore { _id: Id, ) -> Result> { - todo!() + Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( + vec![], + ))))) } async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 80416a8106..6d21d6d271 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -24,6 +24,7 @@ use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::error::{self, Error, Result}; use crate::memtable::{IterContext, MemtableRef}; +use crate::read::windowed::WindowedReader; use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions}; @@ -84,6 +85,7 @@ pub struct ChunkReaderBuilder { iter_ctx: IterContext, memtables: Vec, files_to_read: Vec, + time_windows: Option>, } impl ChunkReaderBuilder { @@ -96,6 +98,7 @@ impl ChunkReaderBuilder { iter_ctx: IterContext::default(), memtables: Vec::new(), files_to_read: Vec::new(), + time_windows: None, } } @@ -151,24 +154,38 @@ impl ChunkReaderBuilder { self } - pub async fn build(mut self) -> Result { - let time_range_predicate = self.build_time_range_predicate(); - debug!( - "Time range predicate for chunk reader: {:?}", - time_range_predicate - ); + /// Set time windows for scan. + pub fn time_windows(mut self, windows: Vec) -> Self { + self.time_windows = Some(windows); + self + } - let schema = Arc::new( - ProjectedSchema::new(self.schema, self.projection) - .context(error::InvalidProjectionSnafu)?, - ); + async fn build_windowed( + self, + schema: &ProjectedSchemaRef, + time_range_predicate: &TimestampRange, + windows: Vec, + ) -> Result { + let mut readers = Vec::with_capacity(windows.len()); + for window in windows { + let time_range_predicate = time_range_predicate.and(&window); + let reader = self.build_reader(schema, &time_range_predicate).await?; + readers.push(reader); + } + let windowed_reader = WindowedReader::new(schema.clone(), readers); + Ok(Box::new(windowed_reader) as Box<_>) + } + async fn build_reader( + &self, + schema: &ProjectedSchemaRef, + time_range: &TimestampRange, + ) -> Result { let num_sources = self.memtables.len() + self.files_to_read.len(); let mut reader_builder = MergeReaderBuilder::with_capacity(schema.clone(), num_sources) .batch_size(self.iter_ctx.batch_size); - self.iter_ctx.projected_schema = Some(schema.clone()); - for mem in self.memtables { + for mem in &self.memtables { let iter = mem.iter(&self.iter_ctx)?; reader_builder = reader_builder.push_batch_iter(iter); } @@ -176,26 +193,39 @@ impl ChunkReaderBuilder { let read_opts = ReadOptions { batch_size: self.iter_ctx.batch_size, projected_schema: schema.clone(), - predicate: Predicate::new(self.filters), - time_range: time_range_predicate, + predicate: Predicate::new(self.filters.clone()), + time_range: *time_range, }; for file in &self.files_to_read { - if !Self::file_in_range(file, time_range_predicate) { - debug!( - "Skip file {:?}, predicate: {:?}", - file, time_range_predicate - ); + if !Self::file_in_range(file, time_range) { + debug!("Skip file {:?}, predicate: {:?}", file, time_range); continue; } let reader = self.sst_layer.read_sst(file.clone(), &read_opts).await?; - reader_builder = reader_builder.push_batch_reader(reader); } let reader = reader_builder.build(); let reader = DedupReader::new(schema.clone(), reader); + Ok(Box::new(reader) as Box<_>) + } - Ok(ChunkReaderImpl::new(schema, Box::new(reader))) + pub async fn build(mut self) -> Result { + let time_range_predicate = self.build_time_range_predicate(); + let schema = Arc::new( + ProjectedSchema::new(self.schema.clone(), self.projection.clone()) + .context(error::InvalidProjectionSnafu)?, + ); + self.iter_ctx.projected_schema = Some(schema.clone()); + + let reader = if let Some(windows) = self.time_windows.take() { + self.build_windowed(&schema, &time_range_predicate, windows) + .await? + } else { + self.build_reader(&schema, &time_range_predicate).await? + }; + + Ok(ChunkReaderImpl::new(schema, reader)) } /// Build time range predicate from schema and filters. @@ -205,14 +235,13 @@ impl ChunkReaderBuilder { } /// Check if SST file's time range matches predicate. - #[inline] - fn file_in_range(file: &FileHandle, predicate: TimestampRange) -> bool { - if predicate == TimestampRange::min_to_max() { + fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { + if predicate == &TimestampRange::min_to_max() { return true; } // end_timestamp of sst file is inclusive. let Some((start, end)) = *file.time_range() else { return true; }; let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); - file_ts_range.intersects(&predicate) + file_ts_range.intersects(predicate) } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 786de859bb..1a62cc872f 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -499,6 +499,24 @@ pub enum Error { #[snafu(backtrace)] source: RuntimeError, }, + + #[snafu(display("Failed to convert columns to rows, source: {}", source))] + ConvertColumnsToRows { + source: ArrowError, + location: Location, + }, + + #[snafu(display("Failed to sort arrays, source: {}", source))] + SortArrays { + source: ArrowError, + location: Location, + }, + + #[snafu(display("Failed to sort arrays, source: {}", source))] + SelectRows { + source: ArrowError, + location: Location, + }, } pub type Result = std::result::Result; @@ -596,6 +614,8 @@ impl ErrorExt for Error { | StopPickTask { .. } => StatusCode::Unexpected, TtlCalculation { source, .. } => source.status_code(), + ConvertColumnsToRows { .. } | SortArrays { .. } => StatusCode::Unexpected, + SelectRows { .. } => StatusCode::Unexpected, } } diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index 4ff3d638ef..8f333aa6f7 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -21,6 +21,7 @@ mod version; use std::sync::atomic::{AtomicI64, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; +use common_time::range::TimestampRange; use datatypes::vectors::VectorRef; use store_api::storage::{consts, OpType, SequenceNumber}; @@ -110,6 +111,9 @@ pub struct IterContext { /// /// Set to `None` to read all columns. pub projected_schema: Option, + + /// Timestamp range + pub time_range: Option, } impl Default for IterContext { @@ -120,6 +124,7 @@ impl Default for IterContext { visible_sequence: SequenceNumber::MAX, for_flush: false, projected_schema: None, + time_range: None, } } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 98f4d1c3d1..560fd8d748 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -19,6 +19,7 @@ use std::ops::Bound; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, RwLock}; +use common_time::range::TimestampRange; use datatypes::data_type::DataType; use datatypes::prelude::*; use datatypes::value::Value; @@ -217,7 +218,7 @@ impl BTreeIterator { let (keys, sequences, op_types, values) = if self.ctx.for_flush { collect_iter(iter, self.ctx.batch_size) } else { - let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence); + let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence, self.ctx.time_range); collect_iter(iter, self.ctx.batch_size) }; @@ -289,23 +290,26 @@ struct MapIterWrapper<'a, InnerKey, RowValue> { iter: btree_map::Range<'a, InnerKey, RowValue>, prev_key: Option, visible_sequence: SequenceNumber, + time_range: Option, } impl<'a> MapIterWrapper<'a, InnerKey, RowValue> { fn new( iter: btree_map::Range<'a, InnerKey, RowValue>, visible_sequence: SequenceNumber, + time_range: Option, ) -> MapIterWrapper<'a, InnerKey, RowValue> { MapIterWrapper { iter, prev_key: None, visible_sequence, + time_range, } } fn next_visible_entry(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> { for (k, v) in self.iter.by_ref() { - if k.is_visible(self.visible_sequence) { + if k.is_visible(self.visible_sequence) && k.is_in_time_range(&self.time_range) { return Some((k, v)); } } @@ -447,6 +451,17 @@ impl InnerKey { self.sequence <= sequence } + #[inline] + fn is_in_time_range(&self, range: &Option) -> bool { + let Some(range) = range else { return true; }; + range.contains( + &self + .timestamp() + .as_timestamp() + .expect("Timestamp field must be a valid timestamp value"), + ) + } + /// Reset the `InnerKey` so that we can use it to seek next key that /// has different row key. fn reset_for_seek(&mut self) { diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index f20e1fe8e6..557a700ff5 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_time::Timestamp; use datatypes::prelude::*; use datatypes::timestamp::TimestampMillisecond; use datatypes::type_id::LogicalTypeId; @@ -436,6 +437,7 @@ fn test_sequence_visibility() { visible_sequence: 9, for_flush: false, projected_schema: None, + time_range: None, }; let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); @@ -454,6 +456,7 @@ fn test_sequence_visibility() { visible_sequence: 10, for_flush: false, projected_schema: None, + time_range: None, }; let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); @@ -472,6 +475,7 @@ fn test_sequence_visibility() { visible_sequence: 11, for_flush: false, projected_schema: None, + time_range: None, }; let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); @@ -510,6 +514,40 @@ fn test_iter_after_none() { }); } +#[test] +fn test_filter_memtable() { + let tester = MemtableTester::default(); + tester.run_testcase(|ctx| { + write_kvs( + &*ctx.memtable, + 10, // sequence + OpType::Put, + &[1000, 1001, 1002], // keys + &[(Some(0), None), (Some(1), None), (Some(2), None)], // values + ); + + let iter_ctx = IterContext { + batch_size: 4, + time_range: Some( + TimestampRange::new( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1001), + ) + .unwrap(), + ), + ..Default::default() + }; + + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert_eq!(5, batch.columns.len()); + assert_eq!( + Arc::new(TimestampMillisecondVector::from_slice([1000])) as Arc<_>, + batch.columns[0] + ); + }); +} + #[test] fn test_memtable_projection() { let tester = MemtableTester::default(); diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index 7a84223a32..03d6d64cf8 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -16,6 +16,7 @@ mod dedup; mod merge; +pub(crate) mod windowed; use std::cmp::Ordering; @@ -258,3 +259,10 @@ pub trait BatchReader: Send { /// Pointer to [BatchReader]. pub type BoxedBatchReader = Box; + +#[async_trait::async_trait] +impl BatchReader for Box { + async fn next_batch(&mut self) -> Result> { + (**self).next_batch().await + } +} diff --git a/src/storage/src/read/windowed.rs b/src/storage/src/read/windowed.rs new file mode 100644 index 0000000000..197318b7ab --- /dev/null +++ b/src/storage/src/read/windowed.rs @@ -0,0 +1,140 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::compute::SortOptions; +use arrow::row::{RowConverter, SortField}; +use arrow_array::{Array, ArrayRef}; +use datatypes::data_type::DataType; +use datatypes::vectors::Helper; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::read::{Batch, BatchReader}; +use crate::schema::ProjectedSchemaRef; + +/// [WindowedReader] provides a windowed record batch reader that scans all rows within a window +/// at a time and sort these rows ordered in `[, ]` order. +pub struct WindowedReader { + /// Schema to read + pub schema: ProjectedSchemaRef, + /// Each reader reads a slice of time window + pub readers: Vec, +} + +impl WindowedReader { + pub fn new(schema: ProjectedSchemaRef, readers: Vec) -> Self { + Self { schema, readers } + } +} + +#[async_trait::async_trait] +impl BatchReader for WindowedReader +where + R: BatchReader, +{ + async fn next_batch(&mut self) -> Result> { + let Some(mut reader) = self.readers.pop() else { return Ok(None); }; + + let store_schema = self.schema.schema_to_read(); + let mut batches = vec![]; + while let Some(batch) = reader.next_batch().await? { + batches.push( + batch + .columns + .into_iter() + .map(|v| v.to_arrow_array()) + .collect::>(), + ); + } + + let Some(num_columns) = batches.get(0).map(|b| b.len()) else { + return Ok(Some(Batch::new(vec![]))); + }; + let mut vectors_in_batch = Vec::with_capacity(num_columns); + + for idx in 0..num_columns { + let columns: Vec<&dyn Array> = + batches.iter().map(|b| b[idx].as_ref()).collect::>(); + vectors_in_batch + .push(arrow::compute::concat(&columns).context(error::ConvertColumnsToRowsSnafu)?); + } + let sorted = sort_by_rows(&self.schema, vectors_in_batch)?; + let vectors = sorted + .iter() + .zip(store_schema.columns().iter().map(|c| &c.desc.name)) + .map(|(arr, name)| { + Helper::try_into_vector(arr).context(error::ConvertChunkSnafu { name }) + }) + .collect::>()?; + Ok(Some(Batch::new(vectors))) + } +} + +fn sort_by_rows(schema: &ProjectedSchemaRef, arrays: Vec) -> Result> { + let sort_columns = build_sort_columns(schema); + let store_schema = schema.schema_to_read(); + // Convert columns to rows to speed lexicographic sort + // TODO(hl): maybe optimize to lexsort_to_index when only timestamp column is involved. + let mut row_converter = RowConverter::new( + sort_columns + .iter() + .map(|(idx, descending)| { + SortField::new_with_options( + store_schema.columns()[*idx].desc.data_type.as_arrow_type(), + SortOptions { + descending: *descending, + nulls_first: true, + }, + ) + }) + .collect(), + ) + .context(error::ConvertColumnsToRowsSnafu)?; + + let columns_to_sort = sort_columns + .into_iter() + .map(|(idx, _)| arrays[idx].clone()) + .collect::>(); + + let rows_to_sort = row_converter + .convert_columns(&columns_to_sort) + .context(error::ConvertColumnsToRowsSnafu)?; + + let mut sort_pairs = rows_to_sort.iter().enumerate().collect::>(); + sort_pairs.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); + + let idx = + arrow::array::UInt32Array::from_iter_values(sort_pairs.iter().map(|(i, _)| *i as u32)); + + let sorted = arrays + .iter() + .map(|arr| arrow::compute::take(arr, &idx, None)) + .collect::>>() + .context(error::SortArraysSnafu)?; + + debug_assert_eq!(sorted.len(), store_schema.num_columns()); + + Ok(sorted) +} + +/// [, , TS] to [TS, , ]. +/// Returns a vector of sort column indices and sort orders (true means descending order). +fn build_sort_columns(schema: &ProjectedSchemaRef) -> Vec<(usize, bool)> { + let ts_col_index = schema.schema_to_read().timestamp_index(); + let mut res = (0..(ts_col_index)) + .map(|idx| (idx, false)) + .collect::>(); + res.insert(0, (ts_col_index, true)); + res +} diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index 8b2575b5cd..4fc99f4ee9 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -79,6 +79,13 @@ impl StoreSchema { self.row_key_end } + /// Returns the index of timestamp column. + /// We always assume that timestamp is the last column in [StoreSchema]. + #[inline] + pub fn timestamp_index(&self) -> usize { + self.row_key_end - 1 + } + pub(crate) fn contains_column(&self, name: &str) -> bool { self.schema.column_schema_by_name(name).is_some() } @@ -155,11 +162,6 @@ impl StoreSchema { self.user_column_end + 1 } - #[inline] - pub(crate) fn timestamp_index(&self) -> usize { - self.row_key_end - 1 - } - #[inline] pub(crate) fn row_key_indices(&self) -> impl Iterator { 0..self.row_key_end diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 097bdfbf1c..35cf328239 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -15,6 +15,7 @@ use std::cmp; use async_trait::async_trait; +use common_time::range::TimestampRange; use store_api::storage::{ GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, SequenceNumber, Snapshot, @@ -47,28 +48,7 @@ impl Snapshot for SnapshotImpl { ctx: &ReadContext, request: ScanRequest, ) -> Result> { - let visible_sequence = self.sequence_to_read(request.sequence); - let memtable_version = self.version.memtables(); - - let mutables = memtable_version.mutable_memtable(); - let immutables = memtable_version.immutable_memtables(); - - let mut builder = - ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) - .reserve_num_memtables(memtable_version.num_memtables()) - .projection(request.projection) - .filters(request.filters) - .batch_size(ctx.batch_size) - .visible_sequence(visible_sequence) - .pick_memtables(mutables.clone()); - - for memtable in immutables { - builder = builder.pick_memtables(memtable.clone()); - } - - let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?; - - Ok(ScanResponse { reader }) + self.scan_raw(ctx, request, None).await } async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result { @@ -95,4 +75,48 @@ impl SnapshotImpl { .map(|s| cmp::min(s, self.visible_sequence)) .unwrap_or(self.visible_sequence) } + + #[allow(unused)] + pub(crate) async fn windowed_scan( + &self, + ctx: &ReadContext, + request: ScanRequest, + windows: Vec, + ) -> Result> { + self.scan_raw(ctx, request, Some(windows)).await + } + + async fn scan_raw( + &self, + ctx: &ReadContext, + request: ScanRequest, + windows: Option>, + ) -> Result> { + let visible_sequence = self.sequence_to_read(request.sequence); + let memtable_version = self.version.memtables(); + + let mutables = memtable_version.mutable_memtable(); + let immutables = memtable_version.immutable_memtables(); + + let mut builder = + ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) + .reserve_num_memtables(memtable_version.num_memtables()) + .projection(request.projection) + .filters(request.filters) + .batch_size(ctx.batch_size) + .visible_sequence(visible_sequence) + .pick_memtables(mutables.clone()); + + if let Some(windows) = windows { + builder = builder.time_windows(windows); + } + + for memtable in immutables { + builder = builder.pick_memtables(memtable.clone()); + } + + let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?; + + Ok(ScanResponse { reader }) + } }