feat: Implement reader that returns the last row of each series (#4354)

* feat: last row reader

* feat: scan use last row reader

* test: test last row selector

* chore: update comment
This commit is contained in:
Yingwen
2024-07-12 22:40:06 +08:00
committed by GitHub
parent 16075ada67
commit 5a1732279b
6 changed files with 276 additions and 10 deletions

View File

@@ -49,6 +49,8 @@ mod projection_test;
#[cfg(test)]
mod prune_test;
#[cfg(test)]
mod row_selector_test;
#[cfg(test)]
mod set_readonly_test;
#[cfg(test)]
mod truncate_test;

View File

@@ -0,0 +1,104 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector};
use crate::config::MitoConfig;
use crate::test_util::batch_util::sort_batches_and_print;
use crate::test_util::{
build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
async fn test_last_row(append_mode: bool) {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("append_mode", &append_mode.to_string())
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 3 SSTs.
// a, field 1, 2
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 1, 3, 1),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
// a, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
// b, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
// Memtable.
// a, field 2, 3
let rows = Rows {
schema: column_schemas,
rows: build_rows_for_key("a", 2, 4, 2),
};
put_rows(&engine, region_id, rows).await;
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 3.0 | 1970-01-01T00:00:03 |
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
let scanner = engine
.scanner(
region_id,
ScanRequest {
series_row_selector: Some(TimeSeriesRowSelector::LastRow),
..Default::default()
},
)
.unwrap();
assert_eq!(3, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}
#[tokio::test]
async fn test_last_row_append_mode_disabled() {
test_last_row(false).await;
}
#[tokio::test]
async fn test_last_row_append_mode_enabled() {
test_last_row(true).await;
}

View File

@@ -16,6 +16,7 @@
pub mod compat;
pub mod dedup;
pub(crate) mod last_row;
pub mod merge;
pub mod projection;
pub(crate) mod scan_region;

View File

@@ -0,0 +1,153 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to read the last row of each time series.
use async_trait::async_trait;
use crate::error::Result;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
/// Reader to keep the last row for each time series.
/// It assumes that batches from the input reader are
/// - sorted
/// - all deleted rows has been filtered.
/// - not empty
///
/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
/// it focus on time series (the same key).
pub(crate) struct LastRowReader {
/// Inner reader.
reader: BoxedBatchReader,
/// The last batch pending to return.
last_batch: Option<Batch>,
}
impl LastRowReader {
/// Creates a new `LastRowReader`.
pub(crate) fn new(reader: BoxedBatchReader) -> Self {
Self {
reader,
last_batch: None,
}
}
/// Returns the last row of the next key.
pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.reader.next_batch().await? {
if let Some(last) = &self.last_batch {
if last.primary_key() == batch.primary_key() {
// Same key, update last batch.
self.last_batch = Some(batch);
} else {
// Different key, return the last row in `last` and update `last_batch` by
// current batch.
debug_assert!(!last.is_empty());
let last_row = last.slice(last.num_rows() - 1, 1);
self.last_batch = Some(batch);
return Ok(Some(last_row));
}
} else {
self.last_batch = Some(batch);
}
}
if let Some(last) = self.last_batch.take() {
// This is the last key.
let last_row = last.slice(last.num_rows() - 1, 1);
return Ok(Some(last_row));
}
Ok(None)
}
}
#[async_trait]
impl BatchReader for LastRowReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.next_last_row().await
}
}
#[cfg(test)]
mod tests {
use api::v1::OpType;
use super::*;
use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
#[tokio::test]
async fn test_last_row_one_batch() {
let input = [new_batch(
b"k1",
&[1, 2],
&[11, 11],
&[OpType::Put, OpType::Put],
&[21, 22],
)];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
check_reader_result(
&mut reader,
&[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
)
.await;
// Only one row.
let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
check_reader_result(
&mut reader,
&[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
)
.await;
}
#[tokio::test]
async fn test_last_row_multi_batch() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[11, 11],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[3, 4],
&[11, 11],
&[OpType::Put, OpType::Put],
&[23, 24],
),
new_batch(
b"k2",
&[1, 2],
&[11, 11],
&[OpType::Put, OpType::Put],
&[31, 32],
),
];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
],
)
.await;
}
}

View File

@@ -209,8 +209,8 @@ impl ScanRegion {
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
if self.version.options.append_mode {
// If table uses append mode, we use unordered scan in query.
if self.version.options.append_mode && self.request.series_row_selector.is_none() {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.unordered_scan().map(Scanner::Unordered)
} else {

View File

@@ -29,13 +29,14 @@ use datatypes::schema::SchemaRef;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, TimeSeriesRowSelector};
use table::predicate::Predicate;
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::memtable::MemtableRef;
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::scan_region::{
FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext,
@@ -210,8 +211,8 @@ impl SeqScan {
let reader = builder.build().await?;
let dedup = !stream_ctx.input.append_mode;
if dedup {
let reader = match stream_ctx.input.merge_mode {
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::new(DedupReader::new(
reader,
LastRow::new(stream_ctx.input.filter_deleted),
@@ -220,12 +221,17 @@ impl SeqScan {
reader,
LastNonNull::new(stream_ctx.input.filter_deleted),
)) as _,
};
Ok(Some(reader))
}
} else {
let reader = Box::new(reader);
Ok(Some(reader))
}
Box::new(reader) as _
};
let reader = match &stream_ctx.input.series_row_selector {
Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
None => reader,
};
Ok(Some(reader))
}
/// Scans the given partition when the part list is set properly.