feat: scan with sst minimal sequence (#6051)

* feat: scan with sst minimal sequence

* Update src/store-api/src/storage/requests.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* update proto

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
LFC
2025-05-08 09:34:51 +08:00
committed by GitHub
parent 60acf28f3c
commit e787007eb5
11 changed files with 221 additions and 40 deletions

2
Cargo.lock generated
View File

@@ -4820,7 +4820,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ce65659d95a4a11c5d668d27edb4f1c1eed36824#ce65659d95a4a11c5d668d27edb4f1c1eed36824"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=159e92d30b4c0116a7ef376b535d880c6d580fb9#159e92d30b4c0116a7ef376b535d880c6d580fb9"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -130,7 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ce65659d95a4a11c5d668d27edb4f1c1eed36824" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "159e92d30b4c0116a7ef376b535d880c6d580fb9" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -17,6 +17,7 @@ use datafusion_expr::LogicalPlan;
use store_api::storage::RegionId;
/// The query request to be handled by the RegionServer (Datanode).
#[derive(Clone, Debug)]
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,

View File

@@ -347,11 +347,7 @@ impl MetadataRegion {
ScanRequest {
projection: Some(projection),
filters: vec![filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
distribution: None,
..Default::default()
}
}
@@ -537,11 +533,7 @@ impl MetadataRegion {
let scan_req = ScanRequest {
projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
filters: vec![filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
distribution: None,
..Default::default()
};
let record_batch_stream = self
.mito

View File

@@ -53,6 +53,8 @@ mod prune_test;
#[cfg(test)]
mod row_selector_test;
#[cfg(test)]
mod scan_test;
#[cfg(test)]
mod set_role_state_test;
#[cfg(test)]
mod sync_test;

View File

@@ -76,11 +76,7 @@ async fn test_scan_projection() {
let request = ScanRequest {
projection: Some(vec![1, 3, 4]),
filters: Vec::new(),
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
distribution: None,
..Default::default()
};
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util;
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_scan_with_min_sst_sequence() {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence");
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let put_rows = async |start, end| {
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(start, end),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
};
// generates 3 SST files
put_rows(0, 3).await;
put_rows(3, 6).await;
put_rows(6, 9).await;
let scan_engine = async |file_min_sequence, expected_files, expected_data| {
let request = ScanRequest {
sst_min_sequence: file_min_sequence,
..Default::default()
};
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(scanner.num_files(), expected_files);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(batches.pretty_print().unwrap(), expected_data);
};
// scans with no sst minimal sequence limit
scan_engine(
None,
3,
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
+-------+---------+---------------------+",
)
.await;
// scans with sst minimal sequence > 2
scan_engine(
Some(2),
3,
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
+-------+---------+---------------------+",
)
.await;
// scans with sst minimal sequence > 3
scan_engine(
Some(3),
2,
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
+-------+---------+---------------------+",
)
.await;
// scans with sst minimal sequence > 7
scan_engine(
Some(7),
1,
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
+-------+---------+---------------------+",
)
.await;
// scans with sst minimal sequence > 9 (no sst files will be selected to scan)
scan_engine(
Some(9),
0,
"\
++
++",
)
.await;
}

View File

@@ -16,6 +16,7 @@
use std::collections::HashSet;
use std::fmt;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::Instant;
@@ -301,14 +302,26 @@ impl ScanRegion {
/// Creates a scan input.
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
let time_range = self.build_time_range_predicate();
let ssts = &self.version.ssts;
let mut files = Vec::new();
for level in ssts.levels() {
for file in level.files.values() {
let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
(Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
// If the file's sequence is None (or actually is zero), it could mean the file
// is generated and added to the region "directly". In this case, its data should
// be considered as fresh as the memtable. So its sequence is treated greater than
// the min_sequence, whatever the value of min_sequence is. Hence the default
// "true" in this arm.
(Some(_), None) => true,
(None, _) => true,
};
// Finds SST files in range.
if file_in_range(file, &time_range) {
if exceed_min_sequence && file_in_range(file, &time_range) {
files.push(file.clone());
}
// There is no need to check and prune for file's sequence here as the sequence number is usually very new,

View File

@@ -256,14 +256,13 @@ impl DummyTableProvider {
pub struct DummyTableProviderFactory;
#[async_trait]
impl TableProviderFactory for DummyTableProviderFactory {
async fn create(
impl DummyTableProviderFactory {
pub async fn create_table_provider(
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
) -> Result<Arc<dyn TableProvider>> {
) -> Result<DummyTableProvider> {
let metadata =
engine
.get_metadata(region_id)
@@ -274,19 +273,32 @@ impl TableProviderFactory for DummyTableProviderFactory {
})?;
let scan_request = ctx
.and_then(|c| c.get_snapshot(region_id.as_u64()))
.map(|seq| ScanRequest {
sequence: Some(seq),
.map(|ctx| ScanRequest {
sequence: ctx.get_snapshot(region_id.as_u64()),
sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
..Default::default()
})
.unwrap_or_default();
Ok(Arc::new(DummyTableProvider {
Ok(DummyTableProvider {
region_id,
engine,
metadata,
scan_request: Arc::new(Mutex::new(scan_request)),
}))
})
}
}
#[async_trait]
impl TableProviderFactory for DummyTableProviderFactory {
async fn create(
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
) -> Result<Arc<dyn TableProvider>> {
let provider = self.create_table_provider(region_id, engine, ctx).await?;
Ok(Arc::new(provider))
}
}

View File

@@ -48,6 +48,8 @@ pub struct QueryContext {
/// container data that was committed before(and include) the given sequence number
/// this field will only be filled if extensions contains a pair of "snapshot_read" and "true"
snapshot_seqs: Arc<RwLock<HashMap<u64, u64>>>,
/// Mappings of the RegionId to the minimal sequence of SST file to scan.
sst_min_sequences: Arc<RwLock<HashMap<u64, u64>>>,
// we use Arc<RwLock>> for modifiable fields
#[builder(default)]
mutable_session_data: Arc<RwLock<MutableInner>>,
@@ -127,25 +129,17 @@ impl QueryContextBuilder {
impl From<&RegionRequestHeader> for QueryContext {
fn from(value: &RegionRequestHeader) -> Self {
let mut builder = QueryContextBuilder::default();
if let Some(ctx) = &value.query_context {
builder = builder
.current_catalog(ctx.current_catalog.clone())
.current_schema(ctx.current_schema.clone())
.timezone(parse_timezone(Some(&ctx.timezone)))
.extensions(ctx.extensions.clone())
.channel(ctx.channel.into())
.snapshot_seqs(Arc::new(RwLock::new(
ctx.snapshot_seqs.clone().unwrap_or_default().snapshot_seqs,
)))
.explain_options(ctx.explain);
ctx.clone().into()
} else {
QueryContextBuilder::default().build()
}
builder.build()
}
}
impl From<api::v1::QueryContext> for QueryContext {
fn from(ctx: api::v1::QueryContext) -> Self {
let sequences = ctx.snapshot_seqs.as_ref();
QueryContextBuilder::default()
.current_catalog(ctx.current_catalog)
.current_schema(ctx.current_schema)
@@ -153,7 +147,14 @@ impl From<api::v1::QueryContext> for QueryContext {
.extensions(ctx.extensions)
.channel(ctx.channel.into())
.snapshot_seqs(Arc::new(RwLock::new(
ctx.snapshot_seqs.clone().unwrap_or_default().snapshot_seqs,
sequences
.map(|x| x.snapshot_seqs.clone())
.unwrap_or_default(),
)))
.sst_min_sequences(Arc::new(RwLock::new(
sequences
.map(|x| x.sst_min_sequences.clone())
.unwrap_or_default(),
)))
.explain_options(ctx.explain)
.build()
@@ -168,6 +169,7 @@ impl From<QueryContext> for api::v1::QueryContext {
extensions,
channel,
snapshot_seqs,
sst_min_sequences,
mutable_query_context_data,
..
}: QueryContext,
@@ -182,6 +184,7 @@ impl From<QueryContext> for api::v1::QueryContext {
channel: channel as u32,
snapshot_seqs: Some(api::v1::SnapshotSequences {
snapshot_seqs: snapshot_seqs.read().unwrap().clone(),
sst_min_sequences: sst_min_sequences.read().unwrap().clone(),
}),
explain,
}
@@ -411,6 +414,15 @@ impl QueryContext {
pub fn auto_string_to_numeric(&self) -> bool {
matches!(self.channel, Channel::Mysql)
}
/// Finds the minimal sequence of SST files to scan of a Region.
pub fn sst_min_sequence(&self, region_id: u64) -> Option<u64> {
self.sst_min_sequences
.read()
.unwrap()
.get(&region_id)
.copied()
}
}
impl QueryContextBuilder {
@@ -421,6 +433,7 @@ impl QueryContextBuilder {
.current_catalog
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()),
snapshot_seqs: self.snapshot_seqs.unwrap_or_default(),
sst_min_sequences: self.sst_min_sequences.unwrap_or_default(),
mutable_session_data: self.mutable_session_data.unwrap_or_default(),
mutable_query_context_data: self.mutable_query_context_data.unwrap_or_default(),
sql_dialect: self

View File

@@ -56,6 +56,9 @@ pub struct ScanRequest {
/// If set, only rows with a sequence number lesser or equal to this value
/// will be returned.
pub sequence: Option<SequenceNumber>,
/// Optional constraint on the minimal sequence number in the SST files.
/// If set, only the SST files that contain sequences greater than this value will be scanned.
pub sst_min_sequence: Option<SequenceNumber>,
/// Optional hint for the distribution of time-series data.
pub distribution: Option<TimeSeriesDistribution>,
}