From 72b6bd11f73f78ab60b1098f2f91aedcf6582570 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 31 May 2023 11:36:17 +0800 Subject: [PATCH] feat: adapt window reader to order rules (#1671) * feat: adapt window reader to order rules * fix: add asc sort test case --- src/mito/src/table.rs | 1 + src/storage/src/chunk.rs | 39 ++++++-- src/storage/src/lib.rs | 2 + src/storage/src/read/windowed.rs | 41 +++++--- src/storage/src/region/tests.rs | 163 +++++++++++++++++++++++++------ src/storage/src/snapshot.rs | 69 +++++-------- src/storage/src/window_infer.rs | 64 ++++++++++-- 7 files changed, 272 insertions(+), 107 deletions(-) diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 8be42c4c86..abf2cec451 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -236,6 +236,7 @@ impl Table for MitoTable { let scan_request = ScanRequest { projection, filters, + output_ordering: request.output_ordering.clone(), ..Default::default() }; diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 6d21d6d271..ef60b8cd3a 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_query::logical_plan::Expr; +use common_recordbatch::OrderOption; use common_telemetry::debug; use common_time::range::TimestampRange; use snafu::ResultExt; @@ -28,6 +29,7 @@ use crate::read::windowed::WindowedReader; use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions}; +use crate::window_infer::{PlainWindowInference, WindowInfer}; /// Chunk reader implementation. // Now we use async-trait to implement the chunk reader, which is easier to implement than @@ -85,7 +87,7 @@ pub struct ChunkReaderBuilder { iter_ctx: IterContext, memtables: Vec, files_to_read: Vec, - time_windows: Option>, + output_ordering: Option>, } impl ChunkReaderBuilder { @@ -98,7 +100,7 @@ impl ChunkReaderBuilder { iter_ctx: IterContext::default(), memtables: Vec::new(), files_to_read: Vec::new(), - time_windows: None, + output_ordering: None, } } @@ -118,6 +120,11 @@ impl ChunkReaderBuilder { self } + pub fn output_ordering(mut self, ordering: Option>) -> Self { + self.output_ordering = ordering; + self + } + pub fn batch_size(mut self, batch_size: usize) -> Self { self.iter_ctx.batch_size = batch_size; self @@ -154,10 +161,22 @@ impl ChunkReaderBuilder { self } - /// Set time windows for scan. - pub fn time_windows(mut self, windows: Vec) -> Self { - self.time_windows = Some(windows); - self + fn infer_time_windows(&self, output_ordering: &[OrderOption]) -> Option> { + if output_ordering.is_empty() { + return None; + } + let OrderOption { index, options } = &output_ordering[0]; + if *index != self.schema.timestamp_index() { + return None; + } + let memtable_stats = self.memtables.iter().map(|m| m.stats()).collect::>(); + let files = self + .files_to_read + .iter() + .map(FileHandle::meta) + .collect::>(); + + Some(PlainWindowInference {}.infer_window(&files, &memtable_stats, options.descending)) } async fn build_windowed( @@ -165,6 +184,7 @@ impl ChunkReaderBuilder { schema: &ProjectedSchemaRef, time_range_predicate: &TimestampRange, windows: Vec, + order_options: Vec, ) -> Result { let mut readers = Vec::with_capacity(windows.len()); for window in windows { @@ -172,7 +192,7 @@ impl ChunkReaderBuilder { let reader = self.build_reader(schema, &time_range_predicate).await?; readers.push(reader); } - let windowed_reader = WindowedReader::new(schema.clone(), readers); + let windowed_reader = WindowedReader::new(schema.clone(), readers, order_options); Ok(Box::new(windowed_reader) as Box<_>) } @@ -218,8 +238,9 @@ impl ChunkReaderBuilder { ); self.iter_ctx.projected_schema = Some(schema.clone()); - let reader = if let Some(windows) = self.time_windows.take() { - self.build_windowed(&schema, &time_range_predicate, windows) + let reader = if let Some(ordering) = self.output_ordering.take() && + let Some(windows) = self.infer_time_windows(&ordering) { + self.build_windowed(&schema, &time_range_predicate, windows, ordering) .await? } else { self.build_reader(&schema, &time_range_predicate).await? diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index ae7e955c60..743ff02622 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -14,6 +14,8 @@ //! Storage engine implementation. +#![feature(let_chains)] + mod chunk; pub mod codec; pub mod compaction; diff --git a/src/storage/src/read/windowed.rs b/src/storage/src/read/windowed.rs index 33619f3eb1..a7a0025f42 100644 --- a/src/storage/src/read/windowed.rs +++ b/src/storage/src/read/windowed.rs @@ -15,6 +15,7 @@ use arrow::compute::SortOptions; use arrow::row::{RowConverter, SortField}; use arrow_array::{Array, ArrayRef}; +use common_recordbatch::OrderOption; use datatypes::data_type::DataType; use datatypes::vectors::Helper; use snafu::ResultExt; @@ -30,6 +31,8 @@ pub struct WindowedReader { pub schema: ProjectedSchemaRef, /// Each reader reads a slice of time window pub readers: Vec, + /// `order_options` defines how records within windows are sorted. + pub order_options: Vec, } impl WindowedReader { @@ -38,8 +41,16 @@ impl WindowedReader { /// ### Note /// [WindowedReader] always reads the readers in a reverse order. The last reader in `readers` /// gets polled first. - pub fn new(schema: ProjectedSchemaRef, readers: Vec) -> Self { - Self { schema, readers } + pub fn new( + schema: ProjectedSchemaRef, + readers: Vec, + order_options: Vec, + ) -> Self { + Self { + schema, + readers, + order_options, + } } } @@ -74,7 +85,7 @@ where vectors_in_batch .push(arrow::compute::concat(&columns).context(error::ConvertColumnsToRowsSnafu)?); } - let sorted = sort_by_rows(&self.schema, vectors_in_batch)?; + let sorted = sort_by_rows(&self.schema, vectors_in_batch, &self.order_options)?; let vectors = sorted .iter() .zip(store_schema.columns().iter().map(|c| &c.desc.name)) @@ -86,8 +97,12 @@ where } } -fn sort_by_rows(schema: &ProjectedSchemaRef, arrays: Vec) -> Result> { - let sort_columns = build_sort_columns(schema); +fn sort_by_rows( + schema: &ProjectedSchemaRef, + arrays: Vec, + order_options: &[OrderOption], +) -> Result> { + let sort_columns = build_sorted_columns(order_options); let store_schema = schema.schema_to_read(); // Convert columns to rows to speed lexicographic sort // TODO(hl): maybe optimize to lexsort_to_index when only timestamp column is involved. @@ -133,13 +148,11 @@ fn sort_by_rows(schema: &ProjectedSchemaRef, arrays: Vec) -> Result, , TS] to [TS, , ]. -/// Returns a vector of sort column indices and sort orders (true means descending order). -fn build_sort_columns(schema: &ProjectedSchemaRef) -> Vec<(usize, bool)> { - let ts_col_index = schema.schema_to_read().timestamp_index(); - let mut res = (0..(ts_col_index)) - .map(|idx| (idx, false)) - .collect::>(); - res.insert(0, (ts_col_index, true)); - res +/// Builds sorted columns from `order_options`. +/// Returns a vector of columns indices to sort and sort orders (true means descending order). +fn build_sorted_columns(order_options: &[OrderOption]) -> Vec<(usize, bool)> { + order_options + .iter() + .map(|o| (o.index, o.options.descending)) + .collect() } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index ae24775f29..69da635e6c 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -16,12 +16,12 @@ use std::collections::{HashMap, HashSet}; +use arrow::compute::SortOptions; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; +use common_recordbatch::OrderOption; use common_telemetry::logging; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use common_time::range::TimestampRange; -use common_time::timestamp::TimeUnit; use datatypes::prelude::{LogicalTypeId, ScalarVector, WrapperType}; use datatypes::timestamp::TimestampMillisecond; use datatypes::vectors::{ @@ -543,8 +543,7 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig>, - time_windows: Vec<(i64, i64)>, - expected_ts: Vec, + expected: Vec<(i64, i64, String, bool)>, region: RegionImpl, _temp_dir: TempDir, } @@ -553,8 +552,7 @@ impl WindowedReaderTester { async fn new( region_name: &'static str, data_written: Vec>, - time_windows: Vec<(i64, i64)>, - expected_ts: Vec, + expected: Vec<(i64, i64, String, bool)>, ) -> Self { let temp_dir = create_temp_dir(&format!("write_and_read_windowed_{}", region_name)); let root = temp_dir.path().to_str().unwrap(); @@ -564,8 +562,7 @@ impl WindowedReaderTester { let tester = Self { data_written, - time_windows, - expected_ts, + expected, region, _temp_dir: temp_dir, }; @@ -611,33 +608,27 @@ impl WindowedReaderTester { } } - async fn check(&self) { - let windows = self - .time_windows - .iter() - .map(|(start, end)| { - TimestampRange::with_unit(*start, *end, TimeUnit::Millisecond).unwrap() - }) - .collect::>(); - + async fn check(&self, order_options: Vec) { let read_context = ReadContext::default(); let snapshot = self.region.snapshot(&read_context).unwrap(); let response = snapshot - .windowed_scan( + .scan( &read_context, ScanRequest { sequence: None, projection: None, filters: vec![], limit: None, - output_ordering: None, + output_ordering: Some(order_options), }, - windows, ) .await .unwrap(); - let mut timestamps = Vec::with_capacity(self.expected_ts.len()); + let mut timestamps = Vec::with_capacity(self.expected.len()); + let mut col1 = Vec::with_capacity(self.expected.len()); + let mut col2 = Vec::with_capacity(self.expected.len()); + let mut col3 = Vec::with_capacity(self.expected.len()); let mut reader = response.reader; let ts_index = reader.user_schema().timestamp_index().unwrap(); @@ -647,11 +638,61 @@ impl WindowedReaderTester { .as_any() .downcast_ref::() .unwrap(); + let v1_col = chunk.columns[1] + .as_any() + .downcast_ref::() + .unwrap(); + let v2_col = chunk.columns[2] + .as_any() + .downcast_ref::() + .unwrap(); + let v3_col = chunk.columns[3] + .as_any() + .downcast_ref::() + .unwrap(); + for ts in ts_col.iter_data() { timestamps.push(ts.unwrap().0.value()); } + for v in v1_col.iter_data() { + col1.push(v.unwrap()); + } + for v in v2_col.iter_data() { + col2.push(v.unwrap().to_string()); + } + for v in v3_col.iter_data() { + col3.push(v.unwrap()); + } } - assert_eq!(timestamps, self.expected_ts); + + assert_eq!( + timestamps, + self.expected + .iter() + .map(|(v, _, _, _)| *v) + .collect::>() + ); + assert_eq!( + col1, + self.expected + .iter() + .map(|(_, v, _, _)| *v) + .collect::>() + ); + assert_eq!( + col2, + self.expected + .iter() + .map(|(_, _, v, _)| v.clone()) + .collect::>() + ); + assert_eq!( + col3, + self.expected + .iter() + .map(|(_, _, _, v)| *v) + .collect::>() + ); } } @@ -662,11 +703,16 @@ async fn test_read_by_chunk_reader() { WindowedReaderTester::new( "test_region", vec![vec![(1, 1, "1".to_string(), false)]], - vec![(1, 2)], - vec![1], + vec![(1, 1, "1".to_string(), false)], ) .await - .check() + .check(vec![OrderOption { + index: 0, + options: SortOptions { + descending: true, + nulls_first: true, + }, + }]) .await; WindowedReaderTester::new( @@ -681,11 +727,21 @@ async fn test_read_by_chunk_reader() { (4, 4, "4".to_string(), false), ], ], - vec![(1, 2), (2, 3), (3, 4), (4, 5)], - vec![4, 3, 2, 1], + vec![ + (4, 4, "4".to_string(), false), + (3, 3, "3".to_string(), false), + (2, 2, "2".to_string(), false), + (1, 1, "1".to_string(), false), + ], ) .await - .check() + .check(vec![OrderOption { + index: 0, + options: SortOptions { + descending: true, + nulls_first: true, + }, + }]) .await; WindowedReaderTester::new( @@ -694,16 +750,59 @@ async fn test_read_by_chunk_reader() { vec![ (1, 1, "1".to_string(), false), (2, 2, "2".to_string(), false), + (60000, 60000, "60".to_string(), false), ], vec![ (3, 3, "3".to_string(), false), - (4, 4, "4".to_string(), false), + (61000, 61000, "61".to_string(), false), ], ], - vec![(1, 2), (2, 3), (4, 5), (3, 4)], - vec![3, 4, 2, 1], + vec![ + (61000, 61000, "61".to_string(), false), + (60000, 60000, "60".to_string(), false), + (3, 3, "3".to_string(), false), + (2, 2, "2".to_string(), false), + (1, 1, "1".to_string(), false), + ], ) .await - .check() + .check(vec![OrderOption { + index: 0, + options: SortOptions { + descending: true, + nulls_first: true, + }, + }]) + .await; + + WindowedReaderTester::new( + "test_region", + vec![ + vec![ + (1, 1, "1".to_string(), false), + (2, 2, "2".to_string(), false), + (60000, 60000, "60".to_string(), false), + ], + vec![ + (3, 3, "3".to_string(), false), + (61000, 61000, "61".to_string(), false), + ], + ], + vec![ + (1, 1, "1".to_string(), false), + (2, 2, "2".to_string(), false), + (3, 3, "3".to_string(), false), + (60000, 60000, "60".to_string(), false), + (61000, 61000, "61".to_string(), false), + ], + ) + .await + .check(vec![OrderOption { + index: 0, + options: SortOptions { + descending: false, + nulls_first: true, + }, + }]) .await; } diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 35cf328239..0399463275 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -15,7 +15,6 @@ use std::cmp; use async_trait::async_trait; -use common_time::range::TimestampRange; use store_api::storage::{ GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, SequenceNumber, Snapshot, @@ -48,7 +47,29 @@ impl Snapshot for SnapshotImpl { ctx: &ReadContext, request: ScanRequest, ) -> Result> { - self.scan_raw(ctx, request, None).await + let visible_sequence = self.sequence_to_read(request.sequence); + let memtable_version = self.version.memtables(); + + let mutables = memtable_version.mutable_memtable(); + let immutables = memtable_version.immutable_memtables(); + + let mut builder = + ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) + .reserve_num_memtables(memtable_version.num_memtables()) + .projection(request.projection) + .filters(request.filters) + .batch_size(ctx.batch_size) + .output_ordering(request.output_ordering) + .visible_sequence(visible_sequence) + .pick_memtables(mutables.clone()); + + for memtable in immutables { + builder = builder.pick_memtables(memtable.clone()); + } + + let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?; + + Ok(ScanResponse { reader }) } async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result { @@ -75,48 +96,4 @@ impl SnapshotImpl { .map(|s| cmp::min(s, self.visible_sequence)) .unwrap_or(self.visible_sequence) } - - #[allow(unused)] - pub(crate) async fn windowed_scan( - &self, - ctx: &ReadContext, - request: ScanRequest, - windows: Vec, - ) -> Result> { - self.scan_raw(ctx, request, Some(windows)).await - } - - async fn scan_raw( - &self, - ctx: &ReadContext, - request: ScanRequest, - windows: Option>, - ) -> Result> { - let visible_sequence = self.sequence_to_read(request.sequence); - let memtable_version = self.version.memtables(); - - let mutables = memtable_version.mutable_memtable(); - let immutables = memtable_version.immutable_memtables(); - - let mut builder = - ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) - .reserve_num_memtables(memtable_version.num_memtables()) - .projection(request.projection) - .filters(request.filters) - .batch_size(ctx.batch_size) - .visible_sequence(visible_sequence) - .pick_memtables(mutables.clone()); - - if let Some(windows) = windows { - builder = builder.time_windows(windows); - } - - for memtable in immutables { - builder = builder.pick_memtables(memtable.clone()); - } - - let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?; - - Ok(ScanResponse { reader }) - } } diff --git a/src/storage/src/window_infer.rs b/src/storage/src/window_infer.rs index 334794ddf4..e3393af287 100644 --- a/src/storage/src/window_infer.rs +++ b/src/storage/src/window_infer.rs @@ -43,8 +43,12 @@ pub(crate) trait WindowInfer { /// /// ### Note /// The order of returned vector defines how records are yielded. - fn infer_window(&self, files: &[FileMeta], mem_tables: &[MemtableStats]) - -> Vec; + fn infer_window( + &self, + files: &[FileMeta], + mem_tables: &[MemtableStats], + ts_desc: bool, + ) -> Vec; } /// [PlainWindowInference] simply finds the minimum time span within all SST files in level 0 and @@ -56,6 +60,7 @@ impl WindowInfer for PlainWindowInference { &self, files: &[FileMeta], mem_tables: &[MemtableStats], + ts_desc: bool, ) -> Vec { let mut min_duration_sec = i64::MAX; let mut durations = Vec::with_capacity(files.len() + mem_tables.len()); @@ -94,7 +99,13 @@ impl WindowInfer for PlainWindowInference { let window_size = min_duration_to_window_size(min_duration_sec); align_time_spans_to_windows(&durations, window_size) .into_iter() - .sorted_by(|(l_start, _), (r_start, _)| r_start.cmp(l_start)) // sort time windows in descending order + .sorted_by(|(l_start, _), (r_start, _)| { + if ts_desc { + l_start.cmp(r_start) + } else { + r_start.cmp(l_start) + } + }) // sort time windows in descending order // unwrap safety: we ensure that end>=start so that TimestampRange::with_unit won't return None .map(|(start, end)| TimestampRange::with_unit(start, end, TimeUnit::Second).unwrap()) .collect() @@ -230,6 +241,7 @@ mod tests { min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond), ..Default::default() }], + true, ); assert_eq!( vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap()], @@ -249,11 +261,12 @@ mod tests { min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond), ..Default::default() }], + true, ); assert_eq!( vec![ - TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(), TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(), ], res ); @@ -280,12 +293,13 @@ mod tests { min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond), ..Default::default() }], + true, ); assert_eq!( vec![ - TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(), - TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(), TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(), ], res ); @@ -313,9 +327,47 @@ mod tests { min_timestamp: Timestamp::new(0, TimeUnit::Millisecond), ..Default::default() }], + true, ); // inferred window size should be 600 sec + assert_eq!( + vec![ + TimestampRange::with_unit(0, 600, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(600, 1200, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(1200, 1800, TimeUnit::Second).unwrap(), + TimestampRange::with_unit(1800, 2400, TimeUnit::Second).unwrap(), + ], + res + ); + + let res = window_inference.infer_window( + &[ + FileMeta { + time_range: Some(( + Timestamp::new(0, TimeUnit::Millisecond), + Timestamp::new(60 * 1000, TimeUnit::Millisecond), + )), + level: 1, // this SST will be ignored + ..Default::default() + }, + FileMeta { + time_range: Some(( + Timestamp::new(0, TimeUnit::Millisecond), + Timestamp::new(10 * 60 * 1000, TimeUnit::Millisecond), + )), + ..Default::default() + }, + ], + &[MemtableStats { + max_timestamp: Timestamp::new(60 * 30 * 1000 + 1, TimeUnit::Millisecond), + min_timestamp: Timestamp::new(0, TimeUnit::Millisecond), + ..Default::default() + }], + false, + ); + + // timestamp asc order assert_eq!( vec![ TimestampRange::with_unit(1800, 2400, TimeUnit::Second).unwrap(),