diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index bbb9cfe36d..8cd3da32f7 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -49,6 +49,8 @@ mod projection_test; #[cfg(test)] mod prune_test; #[cfg(test)] +mod row_selector_test; +#[cfg(test)] mod set_readonly_test; #[cfg(test)] mod truncate_test; diff --git a/src/mito2/src/engine/row_selector_test.rs b/src/mito2/src/engine/row_selector_test.rs new file mode 100644 index 0000000000..001d0f2f6a --- /dev/null +++ b/src/mito2/src/engine/row_selector_test.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector}; + +use crate::config::MitoConfig; +use crate::test_util::batch_util::sort_batches_and_print; +use crate::test_util::{ + build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; + +async fn test_last_row(append_mode: bool) { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + + let request = CreateRequestBuilder::new() + .insert_option("append_mode", &append_mode.to_string()) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Flush 3 SSTs. + // a, field 1, 2 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 1, 3, 1), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + // a, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + // b, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + // Memtable. + // a, field 2, 3 + let rows = Rows { + schema: column_schemas, + rows: build_rows_for_key("a", 2, 4, 2), + }; + put_rows(&engine, region_id, rows).await; + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 3.0 | 1970-01-01T00:00:03 | +| b | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + // Scans in parallel. + let scanner = engine + .scanner( + region_id, + ScanRequest { + series_row_selector: Some(TimeSeriesRowSelector::LastRow), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(3, scanner.num_files()); + assert_eq!(1, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); +} + +#[tokio::test] +async fn test_last_row_append_mode_disabled() { + test_last_row(false).await; +} + +#[tokio::test] +async fn test_last_row_append_mode_enabled() { + test_last_row(true).await; +} diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 8b9c549207..5c3d17119a 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,6 +16,7 @@ pub mod compat; pub mod dedup; +pub(crate) mod last_row; pub mod merge; pub mod projection; pub(crate) mod scan_region; diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs new file mode 100644 index 0000000000..85f8276061 --- /dev/null +++ b/src/mito2/src/read/last_row.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to read the last row of each time series. +use async_trait::async_trait; + +use crate::error::Result; +use crate::read::{Batch, BatchReader, BoxedBatchReader}; + +/// Reader to keep the last row for each time series. +/// It assumes that batches from the input reader are +/// - sorted +/// - all deleted rows has been filtered. +/// - not empty +/// +/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as +/// it focus on time series (the same key). +pub(crate) struct LastRowReader { + /// Inner reader. + reader: BoxedBatchReader, + /// The last batch pending to return. + last_batch: Option, +} + +impl LastRowReader { + /// Creates a new `LastRowReader`. + pub(crate) fn new(reader: BoxedBatchReader) -> Self { + Self { + reader, + last_batch: None, + } + } + + /// Returns the last row of the next key. + pub(crate) async fn next_last_row(&mut self) -> Result> { + while let Some(batch) = self.reader.next_batch().await? { + if let Some(last) = &self.last_batch { + if last.primary_key() == batch.primary_key() { + // Same key, update last batch. + self.last_batch = Some(batch); + } else { + // Different key, return the last row in `last` and update `last_batch` by + // current batch. + debug_assert!(!last.is_empty()); + let last_row = last.slice(last.num_rows() - 1, 1); + self.last_batch = Some(batch); + return Ok(Some(last_row)); + } + } else { + self.last_batch = Some(batch); + } + } + + if let Some(last) = self.last_batch.take() { + // This is the last key. + let last_row = last.slice(last.num_rows() - 1, 1); + return Ok(Some(last_row)); + } + + Ok(None) + } +} + +#[async_trait] +impl BatchReader for LastRowReader { + async fn next_batch(&mut self) -> Result> { + self.next_last_row().await + } +} + +#[cfg(test)] +mod tests { + use api::v1::OpType; + + use super::*; + use crate::test_util::{check_reader_result, new_batch, VecBatchReader}; + + #[tokio::test] + async fn test_last_row_one_batch() { + let input = [new_batch( + b"k1", + &[1, 2], + &[11, 11], + &[OpType::Put, OpType::Put], + &[21, 22], + )]; + let reader = VecBatchReader::new(&input); + let mut reader = LastRowReader::new(Box::new(reader)); + check_reader_result( + &mut reader, + &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])], + ) + .await; + + // Only one row. + let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])]; + let reader = VecBatchReader::new(&input); + let mut reader = LastRowReader::new(Box::new(reader)); + check_reader_result( + &mut reader, + &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])], + ) + .await; + } + + #[tokio::test] + async fn test_last_row_multi_batch() { + let input = [ + new_batch( + b"k1", + &[1, 2], + &[11, 11], + &[OpType::Put, OpType::Put], + &[21, 22], + ), + new_batch( + b"k1", + &[3, 4], + &[11, 11], + &[OpType::Put, OpType::Put], + &[23, 24], + ), + new_batch( + b"k2", + &[1, 2], + &[11, 11], + &[OpType::Put, OpType::Put], + &[31, 32], + ), + ]; + let reader = VecBatchReader::new(&input); + let mut reader = LastRowReader::new(Box::new(reader)); + check_reader_result( + &mut reader, + &[ + new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]), + new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]), + ], + ) + .await; + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 3ba32250bf..0fe4c7efa2 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -209,8 +209,8 @@ impl ScanRegion { /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { - if self.version.options.append_mode { - // If table uses append mode, we use unordered scan in query. + if self.version.options.append_mode && self.request.series_row_selector.is_none() { + // If table is append only and there is no series row selector, we use unordered scan in query. // We still use seq scan in compaction. self.unordered_scan().map(Scanner::Unordered) } else { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 7204bf87e7..ca5de75058 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -29,13 +29,14 @@ use datatypes::schema::SchemaRef; use smallvec::smallvec; use snafu::ResultExt; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, TimeSeriesRowSelector}; use table::predicate::Predicate; use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::memtable::MemtableRef; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; +use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; use crate::read::scan_region::{ FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, @@ -210,8 +211,8 @@ impl SeqScan { let reader = builder.build().await?; let dedup = !stream_ctx.input.append_mode; - if dedup { - let reader = match stream_ctx.input.merge_mode { + let reader = if dedup { + match stream_ctx.input.merge_mode { MergeMode::LastRow => Box::new(DedupReader::new( reader, LastRow::new(stream_ctx.input.filter_deleted), @@ -220,12 +221,17 @@ impl SeqScan { reader, LastNonNull::new(stream_ctx.input.filter_deleted), )) as _, - }; - Ok(Some(reader)) + } } else { - let reader = Box::new(reader); - Ok(Some(reader)) - } + Box::new(reader) as _ + }; + + let reader = match &stream_ctx.input.series_row_selector { + Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _, + None => reader, + }; + + Ok(Some(reader)) } /// Scans the given partition when the part list is set properly.