feat: adapt window reader to order rules (#1671)

* feat: adapt window reader to order rules

* fix: add asc sort test case
This commit is contained in:
Lei, HUANG
2023-05-31 11:36:17 +08:00
committed by GitHub
parent 6b08a5f94e
commit 72b6bd11f7
7 changed files with 272 additions and 107 deletions

View File

@@ -236,6 +236,7 @@ impl<R: Region> Table for MitoTable<R> {
let scan_request = ScanRequest {
projection,
filters,
output_ordering: request.output_ordering.clone(),
..Default::default()
};

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::OrderOption;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use snafu::ResultExt;
@@ -28,6 +29,7 @@ use crate::read::windowed::WindowedReader;
use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions};
use crate::window_infer::{PlainWindowInference, WindowInfer};
/// Chunk reader implementation.
// Now we use async-trait to implement the chunk reader, which is easier to implement than
@@ -85,7 +87,7 @@ pub struct ChunkReaderBuilder {
iter_ctx: IterContext,
memtables: Vec<MemtableRef>,
files_to_read: Vec<FileHandle>,
time_windows: Option<Vec<TimestampRange>>,
output_ordering: Option<Vec<OrderOption>>,
}
impl ChunkReaderBuilder {
@@ -98,7 +100,7 @@ impl ChunkReaderBuilder {
iter_ctx: IterContext::default(),
memtables: Vec::new(),
files_to_read: Vec::new(),
time_windows: None,
output_ordering: None,
}
}
@@ -118,6 +120,11 @@ impl ChunkReaderBuilder {
self
}
pub fn output_ordering(mut self, ordering: Option<Vec<OrderOption>>) -> Self {
self.output_ordering = ordering;
self
}
pub fn batch_size(mut self, batch_size: usize) -> Self {
self.iter_ctx.batch_size = batch_size;
self
@@ -154,10 +161,22 @@ impl ChunkReaderBuilder {
self
}
/// Set time windows for scan.
pub fn time_windows(mut self, windows: Vec<TimestampRange>) -> Self {
self.time_windows = Some(windows);
self
fn infer_time_windows(&self, output_ordering: &[OrderOption]) -> Option<Vec<TimestampRange>> {
if output_ordering.is_empty() {
return None;
}
let OrderOption { index, options } = &output_ordering[0];
if *index != self.schema.timestamp_index() {
return None;
}
let memtable_stats = self.memtables.iter().map(|m| m.stats()).collect::<Vec<_>>();
let files = self
.files_to_read
.iter()
.map(FileHandle::meta)
.collect::<Vec<_>>();
Some(PlainWindowInference {}.infer_window(&files, &memtable_stats, options.descending))
}
async fn build_windowed(
@@ -165,6 +184,7 @@ impl ChunkReaderBuilder {
schema: &ProjectedSchemaRef,
time_range_predicate: &TimestampRange,
windows: Vec<TimestampRange>,
order_options: Vec<OrderOption>,
) -> Result<BoxedBatchReader> {
let mut readers = Vec::with_capacity(windows.len());
for window in windows {
@@ -172,7 +192,7 @@ impl ChunkReaderBuilder {
let reader = self.build_reader(schema, &time_range_predicate).await?;
readers.push(reader);
}
let windowed_reader = WindowedReader::new(schema.clone(), readers);
let windowed_reader = WindowedReader::new(schema.clone(), readers, order_options);
Ok(Box::new(windowed_reader) as Box<_>)
}
@@ -218,8 +238,9 @@ impl ChunkReaderBuilder {
);
self.iter_ctx.projected_schema = Some(schema.clone());
let reader = if let Some(windows) = self.time_windows.take() {
self.build_windowed(&schema, &time_range_predicate, windows)
let reader = if let Some(ordering) = self.output_ordering.take() &&
let Some(windows) = self.infer_time_windows(&ordering) {
self.build_windowed(&schema, &time_range_predicate, windows, ordering)
.await?
} else {
self.build_reader(&schema, &time_range_predicate).await?

View File

@@ -14,6 +14,8 @@
//! Storage engine implementation.
#![feature(let_chains)]
mod chunk;
pub mod codec;
pub mod compaction;

View File

@@ -15,6 +15,7 @@
use arrow::compute::SortOptions;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, ArrayRef};
use common_recordbatch::OrderOption;
use datatypes::data_type::DataType;
use datatypes::vectors::Helper;
use snafu::ResultExt;
@@ -30,6 +31,8 @@ pub struct WindowedReader<R> {
pub schema: ProjectedSchemaRef,
/// Each reader reads a slice of time window
pub readers: Vec<R>,
/// `order_options` defines how records within windows are sorted.
pub order_options: Vec<OrderOption>,
}
impl<R> WindowedReader<R> {
@@ -38,8 +41,16 @@ impl<R> WindowedReader<R> {
/// ### Note
/// [WindowedReader] always reads the readers in a reverse order. The last reader in `readers`
/// gets polled first.
pub fn new(schema: ProjectedSchemaRef, readers: Vec<R>) -> Self {
Self { schema, readers }
pub fn new(
schema: ProjectedSchemaRef,
readers: Vec<R>,
order_options: Vec<OrderOption>,
) -> Self {
Self {
schema,
readers,
order_options,
}
}
}
@@ -74,7 +85,7 @@ where
vectors_in_batch
.push(arrow::compute::concat(&columns).context(error::ConvertColumnsToRowsSnafu)?);
}
let sorted = sort_by_rows(&self.schema, vectors_in_batch)?;
let sorted = sort_by_rows(&self.schema, vectors_in_batch, &self.order_options)?;
let vectors = sorted
.iter()
.zip(store_schema.columns().iter().map(|c| &c.desc.name))
@@ -86,8 +97,12 @@ where
}
}
fn sort_by_rows(schema: &ProjectedSchemaRef, arrays: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
let sort_columns = build_sort_columns(schema);
fn sort_by_rows(
schema: &ProjectedSchemaRef,
arrays: Vec<ArrayRef>,
order_options: &[OrderOption],
) -> Result<Vec<ArrayRef>> {
let sort_columns = build_sorted_columns(order_options);
let store_schema = schema.schema_to_read();
// Convert columns to rows to speed lexicographic sort
// TODO(hl): maybe optimize to lexsort_to_index when only timestamp column is involved.
@@ -133,13 +148,11 @@ fn sort_by_rows(schema: &ProjectedSchemaRef, arrays: Vec<ArrayRef>) -> Result<Ve
Ok(sorted)
}
/// [<PK_1>, <PK_2>, TS] to [TS, <PK_1>, <PK_2>].
/// Returns a vector of sort column indices and sort orders (true means descending order).
fn build_sort_columns(schema: &ProjectedSchemaRef) -> Vec<(usize, bool)> {
let ts_col_index = schema.schema_to_read().timestamp_index();
let mut res = (0..(ts_col_index))
.map(|idx| (idx, false))
.collect::<Vec<_>>();
res.insert(0, (ts_col_index, true));
res
/// Builds sorted columns from `order_options`.
/// Returns a vector of columns indices to sort and sort orders (true means descending order).
fn build_sorted_columns(order_options: &[OrderOption]) -> Vec<(usize, bool)> {
order_options
.iter()
.map(|o| (o.index, o.options.descending))
.collect()
}

View File

@@ -16,12 +16,12 @@
use std::collections::{HashMap, HashSet};
use arrow::compute::SortOptions;
use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_recordbatch::OrderOption;
use common_telemetry::logging;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::{LogicalTypeId, ScalarVector, WrapperType};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::vectors::{
@@ -543,8 +543,7 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig<NoopL
struct WindowedReaderTester {
data_written: Vec<Vec<(i64, i64, String, bool)>>,
time_windows: Vec<(i64, i64)>,
expected_ts: Vec<i64>,
expected: Vec<(i64, i64, String, bool)>,
region: RegionImpl<NoopLogStore>,
_temp_dir: TempDir,
}
@@ -553,8 +552,7 @@ impl WindowedReaderTester {
async fn new(
region_name: &'static str,
data_written: Vec<Vec<(i64, i64, String, bool)>>,
time_windows: Vec<(i64, i64)>,
expected_ts: Vec<i64>,
expected: Vec<(i64, i64, String, bool)>,
) -> Self {
let temp_dir = create_temp_dir(&format!("write_and_read_windowed_{}", region_name));
let root = temp_dir.path().to_str().unwrap();
@@ -564,8 +562,7 @@ impl WindowedReaderTester {
let tester = Self {
data_written,
time_windows,
expected_ts,
expected,
region,
_temp_dir: temp_dir,
};
@@ -611,33 +608,27 @@ impl WindowedReaderTester {
}
}
async fn check(&self) {
let windows = self
.time_windows
.iter()
.map(|(start, end)| {
TimestampRange::with_unit(*start, *end, TimeUnit::Millisecond).unwrap()
})
.collect::<Vec<_>>();
async fn check(&self, order_options: Vec<OrderOption>) {
let read_context = ReadContext::default();
let snapshot = self.region.snapshot(&read_context).unwrap();
let response = snapshot
.windowed_scan(
.scan(
&read_context,
ScanRequest {
sequence: None,
projection: None,
filters: vec![],
limit: None,
output_ordering: None,
output_ordering: Some(order_options),
},
windows,
)
.await
.unwrap();
let mut timestamps = Vec::with_capacity(self.expected_ts.len());
let mut timestamps = Vec::with_capacity(self.expected.len());
let mut col1 = Vec::with_capacity(self.expected.len());
let mut col2 = Vec::with_capacity(self.expected.len());
let mut col3 = Vec::with_capacity(self.expected.len());
let mut reader = response.reader;
let ts_index = reader.user_schema().timestamp_index().unwrap();
@@ -647,11 +638,61 @@ impl WindowedReaderTester {
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
let v1_col = chunk.columns[1]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let v2_col = chunk.columns[2]
.as_any()
.downcast_ref::<StringVector>()
.unwrap();
let v3_col = chunk.columns[3]
.as_any()
.downcast_ref::<BooleanVector>()
.unwrap();
for ts in ts_col.iter_data() {
timestamps.push(ts.unwrap().0.value());
}
for v in v1_col.iter_data() {
col1.push(v.unwrap());
}
for v in v2_col.iter_data() {
col2.push(v.unwrap().to_string());
}
for v in v3_col.iter_data() {
col3.push(v.unwrap());
}
}
assert_eq!(timestamps, self.expected_ts);
assert_eq!(
timestamps,
self.expected
.iter()
.map(|(v, _, _, _)| *v)
.collect::<Vec<_>>()
);
assert_eq!(
col1,
self.expected
.iter()
.map(|(_, v, _, _)| *v)
.collect::<Vec<_>>()
);
assert_eq!(
col2,
self.expected
.iter()
.map(|(_, _, v, _)| v.clone())
.collect::<Vec<_>>()
);
assert_eq!(
col3,
self.expected
.iter()
.map(|(_, _, _, v)| *v)
.collect::<Vec<_>>()
);
}
}
@@ -662,11 +703,16 @@ async fn test_read_by_chunk_reader() {
WindowedReaderTester::new(
"test_region",
vec![vec![(1, 1, "1".to_string(), false)]],
vec![(1, 2)],
vec![1],
vec![(1, 1, "1".to_string(), false)],
)
.await
.check()
.check(vec![OrderOption {
index: 0,
options: SortOptions {
descending: true,
nulls_first: true,
},
}])
.await;
WindowedReaderTester::new(
@@ -681,11 +727,21 @@ async fn test_read_by_chunk_reader() {
(4, 4, "4".to_string(), false),
],
],
vec![(1, 2), (2, 3), (3, 4), (4, 5)],
vec![4, 3, 2, 1],
vec![
(4, 4, "4".to_string(), false),
(3, 3, "3".to_string(), false),
(2, 2, "2".to_string(), false),
(1, 1, "1".to_string(), false),
],
)
.await
.check()
.check(vec![OrderOption {
index: 0,
options: SortOptions {
descending: true,
nulls_first: true,
},
}])
.await;
WindowedReaderTester::new(
@@ -694,16 +750,59 @@ async fn test_read_by_chunk_reader() {
vec![
(1, 1, "1".to_string(), false),
(2, 2, "2".to_string(), false),
(60000, 60000, "60".to_string(), false),
],
vec![
(3, 3, "3".to_string(), false),
(4, 4, "4".to_string(), false),
(61000, 61000, "61".to_string(), false),
],
],
vec![(1, 2), (2, 3), (4, 5), (3, 4)],
vec![3, 4, 2, 1],
vec![
(61000, 61000, "61".to_string(), false),
(60000, 60000, "60".to_string(), false),
(3, 3, "3".to_string(), false),
(2, 2, "2".to_string(), false),
(1, 1, "1".to_string(), false),
],
)
.await
.check()
.check(vec![OrderOption {
index: 0,
options: SortOptions {
descending: true,
nulls_first: true,
},
}])
.await;
WindowedReaderTester::new(
"test_region",
vec![
vec![
(1, 1, "1".to_string(), false),
(2, 2, "2".to_string(), false),
(60000, 60000, "60".to_string(), false),
],
vec![
(3, 3, "3".to_string(), false),
(61000, 61000, "61".to_string(), false),
],
],
vec![
(1, 1, "1".to_string(), false),
(2, 2, "2".to_string(), false),
(3, 3, "3".to_string(), false),
(60000, 60000, "60".to_string(), false),
(61000, 61000, "61".to_string(), false),
],
)
.await
.check(vec![OrderOption {
index: 0,
options: SortOptions {
descending: false,
nulls_first: true,
},
}])
.await;
}

View File

@@ -15,7 +15,6 @@
use std::cmp;
use async_trait::async_trait;
use common_time::range::TimestampRange;
use store_api::storage::{
GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, SequenceNumber,
Snapshot,
@@ -48,7 +47,29 @@ impl Snapshot for SnapshotImpl {
ctx: &ReadContext,
request: ScanRequest,
) -> Result<ScanResponse<ChunkReaderImpl>> {
self.scan_raw(ctx, request, None).await
let visible_sequence = self.sequence_to_read(request.sequence);
let memtable_version = self.version.memtables();
let mutables = memtable_version.mutable_memtable();
let immutables = memtable_version.immutable_memtables();
let mut builder =
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
.reserve_num_memtables(memtable_version.num_memtables())
.projection(request.projection)
.filters(request.filters)
.batch_size(ctx.batch_size)
.output_ordering(request.output_ordering)
.visible_sequence(visible_sequence)
.pick_memtables(mutables.clone());
for memtable in immutables {
builder = builder.pick_memtables(memtable.clone());
}
let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?;
Ok(ScanResponse { reader })
}
async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result<GetResponse> {
@@ -75,48 +96,4 @@ impl SnapshotImpl {
.map(|s| cmp::min(s, self.visible_sequence))
.unwrap_or(self.visible_sequence)
}
#[allow(unused)]
pub(crate) async fn windowed_scan(
&self,
ctx: &ReadContext,
request: ScanRequest,
windows: Vec<TimestampRange>,
) -> Result<ScanResponse<ChunkReaderImpl>> {
self.scan_raw(ctx, request, Some(windows)).await
}
async fn scan_raw(
&self,
ctx: &ReadContext,
request: ScanRequest,
windows: Option<Vec<TimestampRange>>,
) -> Result<ScanResponse<ChunkReaderImpl>> {
let visible_sequence = self.sequence_to_read(request.sequence);
let memtable_version = self.version.memtables();
let mutables = memtable_version.mutable_memtable();
let immutables = memtable_version.immutable_memtables();
let mut builder =
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
.reserve_num_memtables(memtable_version.num_memtables())
.projection(request.projection)
.filters(request.filters)
.batch_size(ctx.batch_size)
.visible_sequence(visible_sequence)
.pick_memtables(mutables.clone());
if let Some(windows) = windows {
builder = builder.time_windows(windows);
}
for memtable in immutables {
builder = builder.pick_memtables(memtable.clone());
}
let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?;
Ok(ScanResponse { reader })
}
}

View File

@@ -43,8 +43,12 @@ pub(crate) trait WindowInfer {
///
/// ### Note
/// The order of returned vector defines how records are yielded.
fn infer_window(&self, files: &[FileMeta], mem_tables: &[MemtableStats])
-> Vec<TimestampRange>;
fn infer_window(
&self,
files: &[FileMeta],
mem_tables: &[MemtableStats],
ts_desc: bool,
) -> Vec<TimestampRange>;
}
/// [PlainWindowInference] simply finds the minimum time span within all SST files in level 0 and
@@ -56,6 +60,7 @@ impl WindowInfer for PlainWindowInference {
&self,
files: &[FileMeta],
mem_tables: &[MemtableStats],
ts_desc: bool,
) -> Vec<TimestampRange> {
let mut min_duration_sec = i64::MAX;
let mut durations = Vec::with_capacity(files.len() + mem_tables.len());
@@ -94,7 +99,13 @@ impl WindowInfer for PlainWindowInference {
let window_size = min_duration_to_window_size(min_duration_sec);
align_time_spans_to_windows(&durations, window_size)
.into_iter()
.sorted_by(|(l_start, _), (r_start, _)| r_start.cmp(l_start)) // sort time windows in descending order
.sorted_by(|(l_start, _), (r_start, _)| {
if ts_desc {
l_start.cmp(r_start)
} else {
r_start.cmp(l_start)
}
}) // sort time windows in descending order
// unwrap safety: we ensure that end>=start so that TimestampRange::with_unit won't return None
.map(|(start, end)| TimestampRange::with_unit(start, end, TimeUnit::Second).unwrap())
.collect()
@@ -230,6 +241,7 @@ mod tests {
min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond),
..Default::default()
}],
true,
);
assert_eq!(
vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap()],
@@ -249,11 +261,12 @@ mod tests {
min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond),
..Default::default()
}],
true,
);
assert_eq!(
vec![
TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(),
],
res
);
@@ -280,12 +293,13 @@ mod tests {
min_timestamp: Timestamp::new(2001, TimeUnit::Millisecond),
..Default::default()
}],
true,
);
assert_eq!(
vec![
TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(),
],
res
);
@@ -313,9 +327,47 @@ mod tests {
min_timestamp: Timestamp::new(0, TimeUnit::Millisecond),
..Default::default()
}],
true,
);
// inferred window size should be 600 sec
assert_eq!(
vec![
TimestampRange::with_unit(0, 600, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(600, 1200, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(1200, 1800, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(1800, 2400, TimeUnit::Second).unwrap(),
],
res
);
let res = window_inference.infer_window(
&[
FileMeta {
time_range: Some((
Timestamp::new(0, TimeUnit::Millisecond),
Timestamp::new(60 * 1000, TimeUnit::Millisecond),
)),
level: 1, // this SST will be ignored
..Default::default()
},
FileMeta {
time_range: Some((
Timestamp::new(0, TimeUnit::Millisecond),
Timestamp::new(10 * 60 * 1000, TimeUnit::Millisecond),
)),
..Default::default()
},
],
&[MemtableStats {
max_timestamp: Timestamp::new(60 * 30 * 1000 + 1, TimeUnit::Millisecond),
min_timestamp: Timestamp::new(0, TimeUnit::Millisecond),
..Default::default()
}],
false,
);
// timestamp asc order
assert_eq!(
vec![
TimestampRange::with_unit(1800, 2400, TimeUnit::Second).unwrap(),