diff --git a/Cargo.lock b/Cargo.lock index 410172c750..742c37f090 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4820,7 +4820,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ce65659d95a4a11c5d668d27edb4f1c1eed36824#ce65659d95a4a11c5d668d27edb4f1c1eed36824" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=159e92d30b4c0116a7ef376b535d880c6d580fb9#159e92d30b4c0116a7ef376b535d880c6d580fb9" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index f427f5e699..16548c30fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ce65659d95a4a11c5d668d27edb4f1c1eed36824" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "159e92d30b4c0116a7ef376b535d880c6d580fb9" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/query/src/request.rs b/src/common/query/src/request.rs index a91f72eba0..260a43e79d 100644 --- a/src/common/query/src/request.rs +++ b/src/common/query/src/request.rs @@ -17,6 +17,7 @@ use datafusion_expr::LogicalPlan; use store_api::storage::RegionId; /// The query request to be handled by the RegionServer (Datanode). +#[derive(Clone, Debug)] pub struct QueryRequest { /// The header of this request. Often to store some context of the query. None means all to defaults. pub header: Option, diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 7e7bae095f..5e6532517a 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -347,11 +347,7 @@ impl MetadataRegion { ScanRequest { projection: Some(projection), filters: vec![filter_expr], - output_ordering: None, - limit: None, - series_row_selector: None, - sequence: None, - distribution: None, + ..Default::default() } } @@ -537,11 +533,7 @@ impl MetadataRegion { let scan_req = ScanRequest { projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), filters: vec![filter_expr], - output_ordering: None, - limit: None, - series_row_selector: None, - sequence: None, - distribution: None, + ..Default::default() }; let record_batch_stream = self .mito diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index fbedef3c35..6a09780ea0 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -53,6 +53,8 @@ mod prune_test; #[cfg(test)] mod row_selector_test; #[cfg(test)] +mod scan_test; +#[cfg(test)] mod set_role_state_test; #[cfg(test)] mod sync_test; diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index c7880d298a..ef1f180e9e 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -76,11 +76,7 @@ async fn test_scan_projection() { let request = ScanRequest { projection: Some(vec![1, 3, 4]), filters: Vec::new(), - output_ordering: None, - limit: None, - series_row_selector: None, - sequence: None, - distribution: None, + ..Default::default() }; let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs new file mode 100644 index 0000000000..088759ffa8 --- /dev/null +++ b/src/mito2/src/engine/scan_test.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util; +use crate::test_util::{CreateRequestBuilder, TestEnv}; + +#[tokio::test] +async fn test_scan_with_min_sst_sequence() { + let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let put_rows = async |start, end| { + let rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(start, end), + }; + test_util::put_rows(&engine, region_id, rows).await; + test_util::flush_region(&engine, region_id, None).await; + }; + // generates 3 SST files + put_rows(0, 3).await; + put_rows(3, 6).await; + put_rows(6, 9).await; + + let scan_engine = async |file_min_sequence, expected_files, expected_data| { + let request = ScanRequest { + sst_min_sequence: file_min_sequence, + ..Default::default() + }; + let scanner = engine.scanner(region_id, request).unwrap(); + assert_eq!(scanner.num_files(), expected_files); + + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.pretty_print().unwrap(), expected_data); + }; + + // scans with no sst minimal sequence limit + scan_engine( + None, + 3, + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | ++-------+---------+---------------------+", + ) + .await; + + // scans with sst minimal sequence > 2 + scan_engine( + Some(2), + 3, + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | ++-------+---------+---------------------+", + ) + .await; + + // scans with sst minimal sequence > 3 + scan_engine( + Some(3), + 2, + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | ++-------+---------+---------------------+", + ) + .await; + + // scans with sst minimal sequence > 7 + scan_engine( + Some(7), + 1, + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | ++-------+---------+---------------------+", + ) + .await; + + // scans with sst minimal sequence > 9 (no sst files will be selected to scan) + scan_engine( + Some(9), + 0, + "\ +++ +++", + ) + .await; +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 311b8ba4bf..dd40cfc622 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::fmt; +use std::num::NonZeroU64; use std::sync::Arc; use std::time::Instant; @@ -301,14 +302,26 @@ impl ScanRegion { /// Creates a scan input. fn scan_input(mut self, filter_deleted: bool) -> Result { + let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new); let time_range = self.build_time_range_predicate(); let ssts = &self.version.ssts; let mut files = Vec::new(); for level in ssts.levels() { for file in level.files.values() { + let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) { + (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence, + // If the file's sequence is None (or actually is zero), it could mean the file + // is generated and added to the region "directly". In this case, its data should + // be considered as fresh as the memtable. So its sequence is treated greater than + // the min_sequence, whatever the value of min_sequence is. Hence the default + // "true" in this arm. + (Some(_), None) => true, + (None, _) => true, + }; + // Finds SST files in range. - if file_in_range(file, &time_range) { + if exceed_min_sequence && file_in_range(file, &time_range) { files.push(file.clone()); } // There is no need to check and prune for file's sequence here as the sequence number is usually very new, diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 0e3b3616a9..337722ced3 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -256,14 +256,13 @@ impl DummyTableProvider { pub struct DummyTableProviderFactory; -#[async_trait] -impl TableProviderFactory for DummyTableProviderFactory { - async fn create( +impl DummyTableProviderFactory { + pub async fn create_table_provider( &self, region_id: RegionId, engine: RegionEngineRef, ctx: Option<&session::context::QueryContext>, - ) -> Result> { + ) -> Result { let metadata = engine .get_metadata(region_id) @@ -274,19 +273,32 @@ impl TableProviderFactory for DummyTableProviderFactory { })?; let scan_request = ctx - .and_then(|c| c.get_snapshot(region_id.as_u64())) - .map(|seq| ScanRequest { - sequence: Some(seq), + .map(|ctx| ScanRequest { + sequence: ctx.get_snapshot(region_id.as_u64()), + sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()), ..Default::default() }) .unwrap_or_default(); - Ok(Arc::new(DummyTableProvider { + Ok(DummyTableProvider { region_id, engine, metadata, scan_request: Arc::new(Mutex::new(scan_request)), - })) + }) + } +} + +#[async_trait] +impl TableProviderFactory for DummyTableProviderFactory { + async fn create( + &self, + region_id: RegionId, + engine: RegionEngineRef, + ctx: Option<&session::context::QueryContext>, + ) -> Result> { + let provider = self.create_table_provider(region_id, engine, ctx).await?; + Ok(Arc::new(provider)) } } diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 2366ee1c8b..3866e77912 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -48,6 +48,8 @@ pub struct QueryContext { /// container data that was committed before(and include) the given sequence number /// this field will only be filled if extensions contains a pair of "snapshot_read" and "true" snapshot_seqs: Arc>>, + /// Mappings of the RegionId to the minimal sequence of SST file to scan. + sst_min_sequences: Arc>>, // we use Arc> for modifiable fields #[builder(default)] mutable_session_data: Arc>, @@ -127,25 +129,17 @@ impl QueryContextBuilder { impl From<&RegionRequestHeader> for QueryContext { fn from(value: &RegionRequestHeader) -> Self { - let mut builder = QueryContextBuilder::default(); if let Some(ctx) = &value.query_context { - builder = builder - .current_catalog(ctx.current_catalog.clone()) - .current_schema(ctx.current_schema.clone()) - .timezone(parse_timezone(Some(&ctx.timezone))) - .extensions(ctx.extensions.clone()) - .channel(ctx.channel.into()) - .snapshot_seqs(Arc::new(RwLock::new( - ctx.snapshot_seqs.clone().unwrap_or_default().snapshot_seqs, - ))) - .explain_options(ctx.explain); + ctx.clone().into() + } else { + QueryContextBuilder::default().build() } - builder.build() } } impl From for QueryContext { fn from(ctx: api::v1::QueryContext) -> Self { + let sequences = ctx.snapshot_seqs.as_ref(); QueryContextBuilder::default() .current_catalog(ctx.current_catalog) .current_schema(ctx.current_schema) @@ -153,7 +147,14 @@ impl From for QueryContext { .extensions(ctx.extensions) .channel(ctx.channel.into()) .snapshot_seqs(Arc::new(RwLock::new( - ctx.snapshot_seqs.clone().unwrap_or_default().snapshot_seqs, + sequences + .map(|x| x.snapshot_seqs.clone()) + .unwrap_or_default(), + ))) + .sst_min_sequences(Arc::new(RwLock::new( + sequences + .map(|x| x.sst_min_sequences.clone()) + .unwrap_or_default(), ))) .explain_options(ctx.explain) .build() @@ -168,6 +169,7 @@ impl From for api::v1::QueryContext { extensions, channel, snapshot_seqs, + sst_min_sequences, mutable_query_context_data, .. }: QueryContext, @@ -182,6 +184,7 @@ impl From for api::v1::QueryContext { channel: channel as u32, snapshot_seqs: Some(api::v1::SnapshotSequences { snapshot_seqs: snapshot_seqs.read().unwrap().clone(), + sst_min_sequences: sst_min_sequences.read().unwrap().clone(), }), explain, } @@ -411,6 +414,15 @@ impl QueryContext { pub fn auto_string_to_numeric(&self) -> bool { matches!(self.channel, Channel::Mysql) } + + /// Finds the minimal sequence of SST files to scan of a Region. + pub fn sst_min_sequence(&self, region_id: u64) -> Option { + self.sst_min_sequences + .read() + .unwrap() + .get(®ion_id) + .copied() + } } impl QueryContextBuilder { @@ -421,6 +433,7 @@ impl QueryContextBuilder { .current_catalog .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()), snapshot_seqs: self.snapshot_seqs.unwrap_or_default(), + sst_min_sequences: self.sst_min_sequences.unwrap_or_default(), mutable_session_data: self.mutable_session_data.unwrap_or_default(), mutable_query_context_data: self.mutable_query_context_data.unwrap_or_default(), sql_dialect: self diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index c9a440eaea..3adff232a8 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -56,6 +56,9 @@ pub struct ScanRequest { /// If set, only rows with a sequence number lesser or equal to this value /// will be returned. pub sequence: Option, + /// Optional constraint on the minimal sequence number in the SST files. + /// If set, only the SST files that contain sequences greater than this value will be scanned. + pub sst_min_sequence: Option, /// Optional hint for the distribution of time-series data. pub distribution: Option, }