Merge branch 'main' into bump-datafusion

This commit is contained in:
Ruihang Xia
2026-05-13 16:54:21 +08:00
30 changed files with 13021 additions and 9288 deletions

2
Cargo.lock generated
View File

@@ -11071,6 +11071,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datafusion-expr",
"datafusion-expr-common",
"datafusion-functions",
"datafusion-optimizer",
"datafusion-physical-expr",
@@ -14081,6 +14082,7 @@ dependencies = [
"common-grpc",
"common-memory-manager",
"common-meta",
"common-options",
"common-procedure",
"common-procedure-test",
"common-query",

View File

@@ -131,6 +131,7 @@ datafusion = "=53.1.0"
datafusion-common = "=53.1.0"
datafusion-datasource = "=53.1.0"
datafusion-expr = "=53.1.0"
datafusion-expr-common = "=53.1.0"
datafusion-functions = "=53.1.0"
datafusion-functions-aggregate-common = "=53.1.0"
datafusion-functions-window-common = "=53.1.0"
@@ -337,35 +338,17 @@ rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-catalog = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-catalog-listing = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-common-runtime = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-arrow = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-csv = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-json = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-parquet = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-doc = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-execution = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-aggregate = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-nested = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-table = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-window = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-macros = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr-adapter = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-pruning = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-session = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "2aefa08a8d69c96eec2d6d6703598a009bba6e4c" } # on branch v0.61.x

File diff suppressed because it is too large Load Diff

View File

@@ -142,3 +142,87 @@
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to persist trigger alert records. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99` |
| Save Alert Failure Rate | `rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when persisting trigger alert records. | `prometheus` | `none` | `__auto` |
# Hotspot
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Hotspot Regions | `WITH table_stats AS (
SELECT
table_id,
COUNT(*) AS region_count,
SUM(disk_size) AS total_disk_size,
SUM(region_rows) as total_region_rows
FROM information_schema.region_statistics
WHERE region_role = 'Leader'
GROUP BY table_id
HAVING COUNT(*) > 1
)
SELECT
t.table_schema,
t.table_name,
r.region_id,
t.table_id,
r.region_number,
p.partition_description,
ROUND(
r.disk_size * 100.0
/ NULLIF(ts.total_disk_size, 0),
2
) AS disk_size_share_percent,
r.disk_size,
ROUND(
r.region_rows * 100.0
/ NULLIF(ts.total_region_rows, 0),
2
) AS region_rows_share_percent,
r.region_rows
FROM information_schema.region_statistics r
JOIN table_stats ts
ON r.table_id = ts.table_id
JOIN information_schema.tables t
ON r.table_id = t.table_id
LEFT JOIN information_schema.partitions p
ON p.table_schema = t.table_schema
AND p.table_name = t.table_name
AND p.greptime_partition_id = r.region_id
WHERE r.region_role = 'Leader'
ORDER BY region_rows_share_percent DESC
LIMIT 100;` | `table` | | `mysql` | -- | -- |
| Datanode Load(Write) | `greptime_datanode_history_load` | `timeseries` | Write load of each datanode over time. | `prometheus` | `binBps` | `datanode-{{datanode_id}}({{instance}})` |
| Datanode Load(Write) Distribution | `greptime_datanode_history_load` | `piechart` | Distribution of write load across datanodes. | `prometheus` | `binBps` | `datanode-{{datanode_id}}({{instance}})` |
| Datanode Data Distribution | `WITH leader_regions AS (
SELECT
CONCAT(
'datanode-',
p.peer_id,
' (',
p.peer_addr,
')'
) AS datanode,
r.disk_size
FROM information_schema.region_statistics r
JOIN information_schema.region_peers p
ON r.region_id = p.region_id
WHERE r.region_role = 'Leader'
AND p.is_leader = 'Yes'
)
SELECT
datanode,
COUNT(*) AS leader_region_count,
SUM(disk_size) AS data_size
FROM leader_regions
GROUP BY datanode
ORDER BY data_size DESC;` | `piechart` | Distribution of leader regions and data size across datanodes. | `mysql` | `bytes` | -- |

View File

@@ -1153,3 +1153,65 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Hotspot
panels:
- title: Hotspot Regions
type: table
queries:
- expr: "WITH table_stats AS (\n SELECT\n table_id,\n COUNT(*) AS region_count,\n SUM(disk_size) AS total_disk_size,\n SUM(region_rows) as total_region_rows\n FROM information_schema.region_statistics\n WHERE region_role = 'Leader'\n GROUP BY table_id\n HAVING COUNT(*) > 1\n)\n\nSELECT\n t.table_schema,\n t.table_name,\n\n r.region_id,\n t.table_id,\n r.region_number,\n\n p.partition_description,\n\n\n ROUND(\n r.disk_size * 100.0\n / NULLIF(ts.total_disk_size, 0),\n 2\n ) AS disk_size_share_percent,\n\n r.disk_size,\n \n ROUND(\n r.region_rows * 100.0\n / NULLIF(ts.total_region_rows, 0),\n 2\n ) AS region_rows_share_percent,\n r.region_rows\n\nFROM information_schema.region_statistics r\n\nJOIN table_stats ts\n ON r.table_id = ts.table_id\n\nJOIN information_schema.tables t\n ON r.table_id = t.table_id\n\nLEFT JOIN information_schema.partitions p\n ON p.table_schema = t.table_schema\n AND p.table_name = t.table_name\n AND p.greptime_partition_id = r.region_id\n\nWHERE r.region_role = 'Leader'\n\nORDER BY region_rows_share_percent DESC\nLIMIT 100;"
datasource:
type: mysql
uid: ${information_schema}
- title: Datanode Load(Write)
type: timeseries
description: Write load of each datanode over time.
unit: binBps
queries:
- expr: greptime_datanode_history_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: datanode-{{datanode_id}}({{instance}})
- title: Datanode Load(Write) Distribution
type: piechart
description: Distribution of write load across datanodes.
unit: binBps
queries:
- expr: greptime_datanode_history_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: datanode-{{datanode_id}}({{instance}})
- title: Datanode Data Distribution
type: piechart
description: Distribution of leader regions and data size across datanodes.
unit: bytes
queries:
- expr: |-
WITH leader_regions AS (
SELECT
CONCAT(
'datanode-',
p.peer_id,
' (',
p.peer_addr,
')'
) AS datanode,
r.disk_size
FROM information_schema.region_statistics r
JOIN information_schema.region_peers p
ON r.region_id = p.region_id
WHERE r.region_role = 'Leader'
AND p.is_leader = 'Yes'
)
SELECT
datanode,
COUNT(*) AS leader_region_count,
SUM(disk_size) AS data_size
FROM leader_regions
GROUP BY datanode
ORDER BY data_size DESC;
datasource:
type: mysql
uid: ${information_schema}

File diff suppressed because it is too large Load Diff

View File

@@ -142,3 +142,87 @@
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to persist trigger alert records. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99` |
| Save Alert Failure Rate | `rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when persisting trigger alert records. | `prometheus` | `none` | `__auto` |
# Hotspot
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Hotspot Regions | `WITH table_stats AS (
SELECT
table_id,
COUNT(*) AS region_count,
SUM(disk_size) AS total_disk_size,
SUM(region_rows) as total_region_rows
FROM information_schema.region_statistics
WHERE region_role = 'Leader'
GROUP BY table_id
HAVING COUNT(*) > 1
)
SELECT
t.table_schema,
t.table_name,
r.region_id,
t.table_id,
r.region_number,
p.partition_description,
ROUND(
r.disk_size * 100.0
/ NULLIF(ts.total_disk_size, 0),
2
) AS disk_size_share_percent,
r.disk_size,
ROUND(
r.region_rows * 100.0
/ NULLIF(ts.total_region_rows, 0),
2
) AS region_rows_share_percent,
r.region_rows
FROM information_schema.region_statistics r
JOIN table_stats ts
ON r.table_id = ts.table_id
JOIN information_schema.tables t
ON r.table_id = t.table_id
LEFT JOIN information_schema.partitions p
ON p.table_schema = t.table_schema
AND p.table_name = t.table_name
AND p.greptime_partition_id = r.region_id
WHERE r.region_role = 'Leader'
ORDER BY region_rows_share_percent DESC
LIMIT 100;` | `table` | | `mysql` | -- | -- |
| Datanode Load(Write) | `greptime_datanode_history_load` | `timeseries` | Write load of each datanode over time. | `prometheus` | `binBps` | `datanode-{{datanode_id}}({{instance}})` |
| Datanode Load(Write) Distribution | `greptime_datanode_history_load` | `piechart` | Distribution of write load across datanodes. | `prometheus` | `binBps` | `datanode-{{datanode_id}}({{instance}})` |
| Datanode Data Distribution | `WITH leader_regions AS (
SELECT
CONCAT(
'datanode-',
p.peer_id,
' (',
p.peer_addr,
')'
) AS datanode,
r.disk_size
FROM information_schema.region_statistics r
JOIN information_schema.region_peers p
ON r.region_id = p.region_id
WHERE r.region_role = 'Leader'
AND p.is_leader = 'Yes'
)
SELECT
datanode,
COUNT(*) AS leader_region_count,
SUM(disk_size) AS data_size
FROM leader_regions
GROUP BY datanode
ORDER BY data_size DESC;` | `piechart` | Distribution of leader regions and data size across datanodes. | `mysql` | `bytes` | -- |

View File

@@ -1153,3 +1153,65 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Hotspot
panels:
- title: Hotspot Regions
type: table
queries:
- expr: "WITH table_stats AS (\n SELECT\n table_id,\n COUNT(*) AS region_count,\n SUM(disk_size) AS total_disk_size,\n SUM(region_rows) as total_region_rows\n FROM information_schema.region_statistics\n WHERE region_role = 'Leader'\n GROUP BY table_id\n HAVING COUNT(*) > 1\n)\n\nSELECT\n t.table_schema,\n t.table_name,\n\n r.region_id,\n t.table_id,\n r.region_number,\n\n p.partition_description,\n\n\n ROUND(\n r.disk_size * 100.0\n / NULLIF(ts.total_disk_size, 0),\n 2\n ) AS disk_size_share_percent,\n\n r.disk_size,\n \n ROUND(\n r.region_rows * 100.0\n / NULLIF(ts.total_region_rows, 0),\n 2\n ) AS region_rows_share_percent,\n r.region_rows\n\nFROM information_schema.region_statistics r\n\nJOIN table_stats ts\n ON r.table_id = ts.table_id\n\nJOIN information_schema.tables t\n ON r.table_id = t.table_id\n\nLEFT JOIN information_schema.partitions p\n ON p.table_schema = t.table_schema\n AND p.table_name = t.table_name\n AND p.greptime_partition_id = r.region_id\n\nWHERE r.region_role = 'Leader'\n\nORDER BY region_rows_share_percent DESC\nLIMIT 100;"
datasource:
type: mysql
uid: ${information_schema}
- title: Datanode Load(Write)
type: timeseries
description: Write load of each datanode over time.
unit: binBps
queries:
- expr: greptime_datanode_history_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: datanode-{{datanode_id}}({{instance}})
- title: Datanode Load(Write) Distribution
type: piechart
description: Distribution of write load across datanodes.
unit: binBps
queries:
- expr: greptime_datanode_history_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: datanode-{{datanode_id}}({{instance}})
- title: Datanode Data Distribution
type: piechart
description: Distribution of leader regions and data size across datanodes.
unit: bytes
queries:
- expr: |-
WITH leader_regions AS (
SELECT
CONCAT(
'datanode-',
p.peer_id,
' (',
p.peer_addr,
')'
) AS datanode,
r.disk_size
FROM information_schema.region_statistics r
JOIN information_schema.region_peers p
ON r.region_id = p.region_id
WHERE r.region_role = 'Leader'
AND p.is_leader = 'Yes'
)
SELECT
datanode,
COUNT(*) AS leader_region_count,
SUM(disk_size) AS data_size
FROM leader_regions
GROUP BY datanode
ORDER BY data_size DESC;
datasource:
type: mysql
uid: ${information_schema}

View File

@@ -34,7 +34,7 @@ use crate::data::export_v2::error::{
};
use crate::data::export_v2::extractor::SchemaExtractor;
use crate::data::export_v2::manifest::{
ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange,
ChunkMeta, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange,
};
use crate::data::path::ddl_path_for_schema;
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
@@ -46,16 +46,73 @@ use crate::database::{DatabaseClient, parse_proxy_opts};
pub enum ExportV2Command {
/// Create a new snapshot.
Create(ExportCreateCommand),
/// List snapshots under a parent location.
List(ExportListCommand),
}
impl ExportV2Command {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
ExportV2Command::Create(cmd) => cmd.build().await,
ExportV2Command::List(cmd) => cmd.build().await,
}
}
}
/// List snapshots under a parent location.
#[derive(Debug, Parser)]
pub struct ExportListCommand {
/// Parent storage location whose direct subdirectories are snapshots.
#[clap(long)]
location: String,
/// Object store configuration for remote storage backends.
#[clap(flatten)]
storage: ObjectStoreConfig,
}
impl ExportListCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
validate_uri(&self.location).map_err(BoxedError::new)?;
let storage = OpenDalStorage::from_parent_uri(&self.location, &self.storage)
.map_err(BoxedError::new)?;
Ok(Box::new(ExportList {
location: self.location.clone(),
storage,
}))
}
}
/// Export list tool implementation.
pub struct ExportList {
location: String,
storage: OpenDalStorage,
}
#[async_trait]
impl Tool for ExportList {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
self.run().await.map_err(BoxedError::new)
}
}
impl ExportList {
async fn run(&self) -> Result<()> {
let result = scan_snapshots(&self.storage).await?;
println!("Scanning: {}", self.location);
if result.snapshots.is_empty() {
println!("No snapshots found.");
} else {
print_snapshot_list(&result.snapshots, result.unreadable.len());
}
print_unreadable_warnings(&result.unreadable);
Ok(())
}
}
/// Create a new snapshot.
#[derive(Debug, Parser)]
pub struct ExportCreateCommand {
@@ -628,10 +685,138 @@ fn format_chunk_plan(chunks: &[ChunkMeta]) -> String {
format!("[{}]", items.join(", "))
}
#[derive(Debug)]
struct SnapshotListEntry {
path: String,
manifest: Manifest,
}
#[derive(Debug, Default)]
struct SnapshotScanResult {
snapshots: Vec<SnapshotListEntry>,
unreadable: Vec<String>,
}
async fn scan_snapshots(storage: &OpenDalStorage) -> Result<SnapshotScanResult> {
let mut result = SnapshotScanResult::default();
for dir in storage.list_direct_child_dirs().await? {
let manifest_path = format!("{}/{}", dir.trim_matches('/'), MANIFEST_FILE);
let Some(data) = storage.read_file_if_exists(&manifest_path).await? else {
continue;
};
match serde_json::from_slice::<Manifest>(&data) {
Ok(manifest) => result.snapshots.push(SnapshotListEntry {
path: format!("{}/", dir.trim_matches('/')),
manifest,
}),
Err(_) => result
.unreadable
.push(format!("{}/", dir.trim_matches('/'))),
}
}
result
.snapshots
.sort_by_key(|entry| std::cmp::Reverse(entry.manifest.created_at));
result.unreadable.sort();
Ok(result)
}
fn print_snapshot_list(snapshots: &[SnapshotListEntry], unreadable_count: usize) {
if unreadable_count == 0 {
println!("Found {} snapshots:", snapshots.len());
} else {
println!(
"Found {} snapshots ({} {} skipped: unreadable manifest):",
snapshots.len(),
unreadable_count,
directory_word(unreadable_count)
);
}
println!();
println!(
" {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} Status",
"Path", "ID", "Created", "Catalog", "Schemas", "Chunks"
);
println!(
" {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} {:<10}",
"-".repeat(24),
"-".repeat(36),
"-".repeat(19),
"-".repeat(9),
"-".repeat(7),
"-".repeat(6),
"-".repeat(10)
);
for entry in snapshots {
let manifest = &entry.manifest;
println!(
" {:<24} {:<36} {:<19} {:<9} {:<7} {:<6} {}",
entry.path,
manifest.snapshot_id,
manifest.created_at.format("%Y-%m-%d %H:%M:%S"),
manifest.catalog,
manifest.schemas.len(),
format_list_chunks(manifest),
snapshot_status(manifest)
);
}
}
fn print_unreadable_warnings(unreadable: &[String]) {
if unreadable.is_empty() {
return;
}
println!();
println!(
"Warning: {} {} had corrupt/unreadable manifest.json:",
unreadable.len(),
directory_word(unreadable.len())
);
for path in unreadable {
println!(" - {}", path);
}
}
fn directory_word(count: usize) -> &'static str {
if count == 1 {
"directory"
} else {
"directories"
}
}
fn snapshot_status(manifest: &Manifest) -> &'static str {
if manifest.schema_only {
"schema-only"
} else if manifest.is_complete() {
"complete"
} else {
"incomplete"
}
}
fn format_list_chunks(manifest: &Manifest) -> String {
let total = manifest.chunks.len();
if total == 0 {
return "0".to_string();
}
format!(
"{}/{}",
manifest.completed_count() + manifest.skipped_count(),
total
)
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use clap::Parser;
use tempfile::tempdir;
use url::Url;
use super::*;
use crate::data::path::ddl_path_for_schema;
@@ -886,4 +1071,124 @@ mod tests {
.to_string();
assert!(error.contains("time_range"));
}
#[tokio::test]
async fn test_scan_snapshots_sorts_and_tracks_unreadable_manifests() {
let dir = tempdir().unwrap();
write_test_manifest(
dir.path(),
"older",
test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
),
);
write_test_manifest(
dir.path(),
"newer",
test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap(),
false,
true,
),
);
std::fs::create_dir_all(dir.path().join("empty-dir")).unwrap();
std::fs::create_dir_all(dir.path().join("not-snapshot")).unwrap();
std::fs::write(dir.path().join("not-snapshot").join("data.txt"), "x").unwrap();
std::fs::create_dir_all(dir.path().join("broken")).unwrap();
std::fs::write(dir.path().join("broken").join(MANIFEST_FILE), "{not-json").unwrap();
let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
let storage = OpenDalStorage::from_file_uri(&uri).unwrap();
let result = scan_snapshots(&storage).await.unwrap();
assert_eq!(result.snapshots.len(), 2);
assert_eq!(
result.snapshots[0].manifest.created_at,
chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap()
);
assert_eq!(
result.snapshots[1].manifest.created_at,
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap()
);
assert_eq!(result.unreadable, vec!["broken/".to_string()]);
assert_eq!(result.snapshots[0].path, "newer/");
assert_eq!(result.snapshots[1].path, "older/");
}
#[test]
fn test_snapshot_list_status_and_chunk_summary() {
let schema_only = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
true,
true,
);
assert_eq!(snapshot_status(&schema_only), "schema-only");
assert_eq!(format_list_chunks(&schema_only), "0");
let complete = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
true,
);
assert_eq!(snapshot_status(&complete), "complete");
assert_eq!(format_list_chunks(&complete), "2/2");
let incomplete = test_manifest(
chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
false,
false,
);
assert_eq!(snapshot_status(&incomplete), "incomplete");
assert_eq!(format_list_chunks(&incomplete), "1/2");
}
fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) {
let snapshot_dir = root.join(dir);
std::fs::create_dir_all(&snapshot_dir).unwrap();
std::fs::write(
snapshot_dir.join(MANIFEST_FILE),
serde_json::to_vec_pretty(&manifest).unwrap(),
)
.unwrap();
}
fn test_manifest(
created_at: chrono::DateTime<chrono::Utc>,
schema_only: bool,
complete: bool,
) -> Manifest {
let mut manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string(), "analytics".to_string()],
schema_only,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
manifest.created_at = created_at;
manifest.updated_at = created_at;
if !schema_only {
manifest.chunks.clear();
let mut first = ChunkMeta::new(1, TimeRange::unbounded());
first.mark_completed(vec!["data/public/chunk_1/file.parquet".to_string()], None);
manifest.chunks.push(first);
if complete {
manifest
.chunks
.push(ChunkMeta::skipped(2, TimeRange::unbounded()));
} else {
manifest
.chunks
.push(ChunkMeta::new(2, TimeRange::unbounded()));
}
}
manifest
}
}

View File

@@ -17,6 +17,8 @@
//! This module provides a unified interface for reading and writing snapshot data
//! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem).
use std::collections::BTreeSet;
use async_trait::async_trait;
use futures::TryStreamExt;
use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
@@ -75,7 +77,10 @@ impl StorageScheme {
}
/// Extracts bucket/container and root path from a URI.
fn extract_remote_location(uri: &str) -> Result<RemoteLocation> {
fn extract_remote_location_with_root_policy(
uri: &str,
allow_empty_root: bool,
) -> Result<RemoteLocation> {
let url = Url::parse(uri).context(UrlParseSnafu)?;
let bucket_or_container = url.host_str().unwrap_or("").to_string();
if bucket_or_container.is_empty() {
@@ -87,7 +92,7 @@ fn extract_remote_location(uri: &str) -> Result<RemoteLocation> {
}
let root = url.path().trim_start_matches('/').to_string();
if root.is_empty() {
if root.is_empty() && !allow_empty_root {
return InvalidUriSnafu {
uri,
reason: "snapshot URI must include a non-empty path after the bucket/container",
@@ -268,13 +273,21 @@ impl OpenDalStorage {
}
fn from_s3_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_s3_uri_with_root_policy(uri, storage, false)
}
fn from_s3_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_s3,
"s3:// requires --s3 and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.s3.clone();
config.s3_bucket = location.bucket_or_container;
config.s3_root = location.root;
@@ -291,13 +304,21 @@ impl OpenDalStorage {
}
fn from_oss_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_oss_uri_with_root_policy(uri, storage, false)
}
fn from_oss_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_oss,
"oss:// requires --oss and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.oss.clone();
config.oss_bucket = location.bucket_or_container;
config.oss_root = location.root;
@@ -314,17 +335,30 @@ impl OpenDalStorage {
}
fn from_gcs_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_gcs_uri_with_root_policy(uri, storage, false)
}
fn from_gcs_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_gcs,
"gs:// or gcs:// requires --gcs and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.gcs.clone();
config.gcs_bucket = location.bucket_or_container;
config.gcs_root = location.root;
Self::validate_remote_config(uri, "gcs", config.validate())?;
// GCS validate() rejects empty root, unlike S3/OSS/Azblob.
if allow_empty_root && config.gcs_root.is_empty() {
Self::validate_gcs_parent_config(uri, &config)?;
} else {
Self::validate_remote_config(uri, "gcs", config.validate())?;
}
let conn: GcsConnection = config.into();
let object_store = ObjectStore::new(Gcs::from(&conn))
@@ -336,14 +370,43 @@ impl OpenDalStorage {
))
}
fn validate_gcs_parent_config(
uri: &str,
config: &crate::common::PrefixedGcsConnection,
) -> Result<()> {
if config.gcs_bucket.is_empty() {
return InvalidUriSnafu {
uri,
reason: "invalid gcs config: GCS bucket must be set when --gcs is enabled.",
}
.fail();
}
if config.gcs_scope.is_empty() {
return InvalidUriSnafu {
uri,
reason: "invalid gcs config: GCS scope must be set when --gcs is enabled.",
}
.fail();
}
Ok(())
}
fn from_azblob_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
Self::from_azblob_uri_with_root_policy(uri, storage, false)
}
fn from_azblob_uri_with_root_policy(
uri: &str,
storage: &ObjectStoreConfig,
allow_empty_root: bool,
) -> Result<Self> {
Self::ensure_backend_enabled(
uri,
storage.enable_azblob,
"azblob:// requires --azblob and related options",
)?;
let location = extract_remote_location(uri)?;
let location = extract_remote_location_with_root_policy(uri, allow_empty_root)?;
let mut config = storage.azblob.clone();
config.azblob_container = location.bucket_or_container;
config.azblob_root = location.root;
@@ -370,6 +433,21 @@ impl OpenDalStorage {
}
}
/// Creates storage rooted at a snapshot parent URI.
///
/// Parent-oriented commands such as `export-v2 list` may scan bucket/container
/// roots. Snapshot-oriented commands must keep using `from_uri`, which rejects
/// empty remote roots to avoid unsafe snapshot operations at bucket scope.
pub fn from_parent_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
match StorageScheme::from_uri(uri)? {
StorageScheme::File => Self::from_file_uri_with_config(uri, storage),
StorageScheme::S3 => Self::from_s3_uri_with_root_policy(uri, storage, true),
StorageScheme::Oss => Self::from_oss_uri_with_root_policy(uri, storage, true),
StorageScheme::Gcs => Self::from_gcs_uri_with_root_policy(uri, storage, true),
StorageScheme::Azblob => Self::from_azblob_uri_with_root_policy(uri, storage, true),
}
}
/// Reads a file as bytes.
async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
let data = self
@@ -382,6 +460,17 @@ impl OpenDalStorage {
Ok(data.to_vec())
}
/// Reads a file as bytes if it exists.
pub(crate) async fn read_file_if_exists(&self, path: &str) -> Result<Option<Vec<u8>>> {
match self.object_store.read(path).await {
Ok(data) => Ok(Some(data.to_vec())),
Err(error) if error.kind() == ErrorKind::NotFound => Ok(None),
Err(error) => Err(error).context(StorageOperationSnafu {
operation: format!("read {}", path),
}),
}
}
/// Writes bytes to a file.
async fn write_file(&self, path: &str, data: Vec<u8>) -> Result<()> {
self.object_store
@@ -404,6 +493,37 @@ impl OpenDalStorage {
}
}
/// Lists direct child directory names under the storage root.
pub(crate) async fn list_direct_child_dirs(&self) -> Result<Vec<String>> {
let mut lister = match self.object_store.lister_with("/").recursive(false).await {
Ok(lister) => lister,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(error) => {
return Err(error).context(StorageOperationSnafu {
operation: "list /",
});
}
};
let mut dirs = BTreeSet::new();
while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu {
operation: "list /",
})? {
let path = entry.path().trim_matches('/');
if path.is_empty() {
continue;
}
if entry.metadata().is_dir()
&& let Some(name) = path.split('/').next()
{
dirs.insert(name.to_string());
}
}
Ok(dirs.into_iter().collect())
}
#[cfg(test)]
pub async fn read_schema(&self) -> Result<SchemaSnapshot> {
let schemas_path = schema_index_path();
@@ -557,11 +677,35 @@ mod tests {
#[test]
fn test_extract_remote_location_requires_non_empty_root() {
assert!(extract_remote_location("s3://bucket").is_err());
assert!(extract_remote_location("s3://bucket/").is_err());
assert!(extract_remote_location("oss://bucket").is_err());
assert!(extract_remote_location("gs://bucket").is_err());
assert!(extract_remote_location("azblob://container").is_err());
assert!(extract_remote_location_with_root_policy("s3://bucket", false).is_err());
assert!(extract_remote_location_with_root_policy("s3://bucket/", false).is_err());
assert!(extract_remote_location_with_root_policy("oss://bucket", false).is_err());
assert!(extract_remote_location_with_root_policy("gs://bucket", false).is_err());
assert!(extract_remote_location_with_root_policy("azblob://container", false).is_err());
}
#[test]
fn test_extract_remote_location_allows_empty_root_when_permitted() {
let location = extract_remote_location_with_root_policy("s3://bucket", true).unwrap();
assert_eq!(location.bucket_or_container, "bucket");
assert_eq!(location.root, "");
let location =
extract_remote_location_with_root_policy("azblob://container/", true).unwrap();
assert_eq!(location.bucket_or_container, "container");
assert_eq!(location.root, "");
}
#[test]
fn test_parent_storage_allows_s3_bucket_root() {
let mut storage = ObjectStoreConfig {
enable_s3: true,
..Default::default()
};
storage.s3.s3_region = Some("us-east-1".to_string());
assert!(OpenDalStorage::from_uri("s3://bucket", &storage).is_err());
assert!(OpenDalStorage::from_parent_uri("s3://bucket", &storage).is_ok());
}
#[cfg(not(windows))]

View File

@@ -38,6 +38,8 @@ use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS
use crate::sst::FormatType;
const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
const COMPACTION_TWCS_PREFIX: &str = "compaction.twcs.";
const MEMTABLE_PARTITION_TREE_PREFIX: &str = "memtable.partition_tree.";
pub(crate) fn parse_wal_options(
options_map: &HashMap<String, String>,
@@ -138,7 +140,8 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
// See https://github.com/serde-rs/serde/issues/1626
let options: RegionOptionsWithoutEnum =
serde_json::from_str(&json).context(JsonOptionsSnafu)?;
let has_compaction_type = validate_enum_options(options_map, "compaction.type")?;
let has_compaction_type =
validate_enum_options(options_map, "compaction.type", &[COMPACTION_TWCS_PREFIX])?;
let compaction = if has_compaction_type {
serde_json::from_str(&json).context(JsonOptionsSnafu)?
} else {
@@ -148,7 +151,11 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?;
let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
let memtable = if validate_enum_options(options_map, "memtable.type")? {
let memtable = if validate_enum_options(
options_map,
"memtable.type",
&[MEMTABLE_PARTITION_TREE_PREFIX],
)? {
Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
} else {
None
@@ -437,25 +444,36 @@ fn options_map_to_value(options: &HashMap<String, String>) -> Value {
// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
// check the type key first.
/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
/// and returns `true` if the map contains enum options.
/// and returns `true` if the map contains the enum tag.
///
/// Variant options must start with one of `enum_option_prefixes`. If variant options
/// are provided, the tagged enum type key must also be provided.
fn validate_enum_options(
options_map: &HashMap<String, String>,
enum_tag_key: &str,
enum_option_prefixes: &[&str],
) -> Result<bool> {
let enum_type = enum_tag_key.split('.').next().unwrap();
let mut has_other_options = false;
let mut has_enum_options = false;
let mut has_tag = false;
for key in options_map.keys() {
if key == enum_tag_key {
has_tag = true;
} else if key.starts_with(enum_type) {
has_other_options = true;
} else if !has_enum_options
&& enum_option_prefixes
.iter()
.any(|prefix| key.starts_with(prefix))
{
has_enum_options = true;
}
if has_tag && has_enum_options {
break;
}
}
// If tag is not provided, then other options for the enum should not exist.
ensure!(
has_tag || !has_other_options,
has_tag || !has_enum_options,
InvalidRegionOptionsSnafu {
reason: format!("missing key {} in options", enum_tag_key),
}
@@ -538,6 +556,34 @@ mod tests {
assert_eq!(expect, options);
}
#[test]
fn test_with_compaction_override_true_without_compaction_type() {
let map = make_map(&[(COMPACTION_OVERRIDE, "true")]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
compaction_override: true,
..Default::default()
};
assert_eq!(expect, options);
}
#[test]
fn test_with_compaction_override_false_without_compaction_type() {
let map = make_map(&[(COMPACTION_OVERRIDE, "false")]);
let options = RegionOptions::try_from(&map).unwrap();
assert_eq!(RegionOptions::default(), options);
}
#[test]
fn test_compaction_twcs_options_still_require_compaction_type_with_override() {
let map = make_map(&[
(COMPACTION_OVERRIDE, "true"),
("compaction.twcs.time_window", "2h"),
]);
let err = RegionOptions::try_from(&map).unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
fn test_with_wal_options(wal_options: &WalOptions) -> bool {
let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
@@ -608,6 +654,13 @@ mod tests {
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_without_memtable_type() {
let map = make_map(&[("memtable.partition_tree.index_max_keys_per_shard", "2048")]);
let err = RegionOptions::try_from(&map).unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_with_merge_mode() {
let map = make_map(&[("merge_mode", "last_row")]);

View File

@@ -20,15 +20,18 @@ use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use datatypes::data_type::ConcreteDataType;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
ApplyOutput, IndexApplier, IndexNotFoundStrategy, PredicatesIndexApplier, SearchContext,
};
use index::inverted_index::search::predicate::Predicate;
use index::target::IndexTarget;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType;
use store_api::storage::ColumnId;
@@ -37,7 +40,8 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::cache::index::result_cache::PredicateKey;
use crate::error::{
ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
ApplyInvertedIndexSnafu, BuildIndexApplierSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
PuffinReadBlobSnafu, Result,
};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::RegionIndexId;
@@ -121,10 +125,6 @@ pub(crate) struct InvertedIndexApplier {
/// The cache of index files.
file_cache: Option<FileCacheRef>,
/// Predefined index applier used to apply predicates to index files
/// and return the relevant row group ids for further scan.
index_applier: Box<dyn IndexApplier>,
/// The puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
@@ -134,35 +134,49 @@ pub(crate) struct InvertedIndexApplier {
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Predicate key. Used to identify the predicate and fetch result from cache.
predicate_key: PredicateKey,
/// All collected predicates.
predicates: BTreeMap<ColumnId, Vec<Predicate>>,
/// Default apply plan built from all collected predicates.
default_plan: SstApplyPlan,
/// Expected predicate column types from the latest region metadata.
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
}
pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
#[derive(Clone)]
pub(crate) struct SstApplyPlan {
pub predicate_key: PredicateKey,
pub index_applier: Arc<PredicatesIndexApplier>,
}
impl InvertedIndexApplier {
/// Creates a new `InvertedIndexApplier`.
pub fn new(
table_dir: String,
path_type: PathType,
store: ObjectStore,
index_applier: Box<dyn IndexApplier>,
puffin_manager_factory: PuffinManagerFactory,
predicates: BTreeMap<ColumnId, Vec<Predicate>>,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
) -> Result<Self> {
let default_plan = Self::build_apply_plan(&predicates)?;
INDEX_APPLY_MEMORY_USAGE.add(default_plan.index_applier.memory_usage() as i64);
Self {
Ok(Self {
table_dir,
path_type,
store,
file_cache: None,
index_applier,
puffin_manager_factory,
inverted_index_cache: None,
puffin_metadata_cache: None,
predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
}
predicates,
default_plan,
expected_predicate_col_types,
})
}
/// Sets the file cache.
@@ -186,11 +200,12 @@ impl InvertedIndexApplier {
self
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids.
/// Applies predicates to one SST file with the provided index applier.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `index_applier` - Inverted index applier produced by `plan_for_sst`.
/// * `metrics` - Optional mutable reference to collect metrics on demand
#[tracing::instrument(
skip_all,
@@ -200,6 +215,7 @@ impl InvertedIndexApplier {
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
index_applier: &PredicatesIndexApplier,
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> {
let start = Instant::now();
@@ -231,7 +247,7 @@ impl InvertedIndexApplier {
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
);
self.index_applier
index_applier
.apply(
context,
&mut index_reader,
@@ -243,7 +259,7 @@ impl InvertedIndexApplier {
.context(ApplyInvertedIndexSnafu)
} else {
let mut index_reader = InvertedIndexBlobReader::new(blob);
self.index_applier
index_applier
.apply(
context,
&mut index_reader,
@@ -344,82 +360,141 @@ impl InvertedIndexApplier {
.context(PuffinBuildReaderSnafu)
}
/// Returns the predicate key.
pub fn predicate_key(&self) -> &PredicateKey {
&self.predicate_key
/// Builds a per-SST apply plan.
///
/// Returns `None` when no compatible predicate remains for this SST.
pub fn plan_for_sst(&self, sst_metadata: &RegionMetadataRef) -> Result<Option<SstApplyPlan>> {
let mut compatible_predicates = BTreeMap::new();
let mut has_type_mismatch = false;
for (col_id, expected) in &self.expected_predicate_col_types {
if let Some(sst_col) = sst_metadata.column_by_id(*col_id)
&& sst_col.column_schema.data_type != *expected
{
has_type_mismatch = true;
continue;
}
if let Some(predicates) = self.predicates.get(col_id) {
compatible_predicates.insert(*col_id, predicates.clone());
}
}
if compatible_predicates.is_empty() {
return Ok(None);
}
if !has_type_mismatch {
return Ok(Some(self.default_plan.clone()));
}
let plan = Self::build_apply_plan(&compatible_predicates)?;
Ok(Some(plan))
}
fn build_apply_plan(
predicates_by_col: &BTreeMap<ColumnId, Vec<Predicate>>,
) -> Result<SstApplyPlan> {
let predicates = predicates_by_col
.iter()
.map(|(col_id, preds)| (format!("{}", IndexTarget::ColumnId(*col_id)), preds.clone()))
.collect();
let index_applier =
PredicatesIndexApplier::try_from(predicates).context(BuildIndexApplierSnafu)?;
let predicate_key = PredicateKey::new_inverted(Arc::new(predicates_by_col.clone()));
Ok(SstApplyPlan {
predicate_key,
index_applier: Arc::new(index_applier),
})
}
}
impl Drop for InvertedIndexApplier {
fn drop(&mut self) {
INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
INDEX_APPLY_MEMORY_USAGE.sub(self.default_plan.index_applier.memory_usage() as i64);
}
}
#[cfg(test)]
mod tests {
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use futures::io::Cursor;
use index::bitmap::Bitmap;
use index::inverted_index::search::index_apply::MockIndexApplier;
use index::inverted_index::search::predicate::RegexMatchPredicate;
use object_store::services::Memory;
use puffin::puffin_manager::PuffinWriter;
use store_api::storage::FileId;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::{FileId, RegionId};
use super::*;
use crate::sst::index::RegionFileId;
#[tokio::test]
async fn test_index_applier_apply_basic() {
async fn test_plan_for_sst() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = RegionFileId::new(0.into(), FileId::random());
let index_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
let mut predicates = BTreeMap::new();
predicates.insert(
1,
vec![Predicate::RegexMatch(RegexMatchPredicate {
pattern: "foo".to_string(),
})],
);
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
writer
.put_blob(
INDEX_BLOB_TYPE,
Cursor::new(vec![]),
Default::default(),
Default::default(),
)
.await
.unwrap();
writer.finish().await.unwrap();
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier.expect_apply().returning(|_, _, _| {
Ok(ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: 100,
segment_row_count: 10,
})
});
let expected_predicate_col_types =
BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
let sst_index_applier = InvertedIndexApplier::new(
table_dir.clone(),
table_dir,
PathType::Bare,
object_store,
Box::new(mock_index_applier),
puffin_manager_factory,
Default::default(),
);
let output = sst_index_applier.apply(index_id, None, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: 100,
segment_row_count: 10,
}
predicates,
expected_predicate_col_types,
)
.unwrap();
let plan = sst_index_applier
.plan_for_sst(&mock_region_metadata())
.unwrap();
assert!(plan.is_some());
}
#[tokio::test]
async fn test_plan_for_sst_type_mismatch() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let table_dir = "table_dir".to_string();
let mut predicates = BTreeMap::new();
predicates.insert(
1,
vec![Predicate::RegexMatch(RegexMatchPredicate {
pattern: "foo".to_string(),
})],
);
// Column id 1 is String in `mock_region_metadata`, set expected type to Int64.
let expected_predicate_col_types =
BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);
let sst_index_applier = InvertedIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
predicates,
expected_predicate_col_types,
)
.unwrap();
let plan = sst_index_applier
.plan_for_sst(&mock_region_metadata())
.unwrap();
assert!(plan.is_none());
}
#[tokio::test]
@@ -448,19 +523,52 @@ mod tests {
.unwrap();
writer.finish().await.unwrap();
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier.expect_apply().never();
let mut predicates = BTreeMap::new();
predicates.insert(
1,
vec![Predicate::RegexMatch(RegexMatchPredicate {
pattern: "foo".to_string(),
})],
);
let expected_predicate_col_types =
BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
let sst_index_applier = InvertedIndexApplier::new(
table_dir.clone(),
PathType::Bare,
object_store,
Box::new(mock_index_applier),
puffin_manager_factory,
Default::default(),
);
let res = sst_index_applier.apply(index_id, None, None).await;
predicates,
expected_predicate_col_types,
)
.unwrap();
let plan = sst_index_applier
.plan_for_sst(&mock_region_metadata())
.unwrap()
.unwrap();
let res = sst_index_applier
.apply(index_id, None, &plan.index_applier, None)
.await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
fn mock_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![1]);
Arc::new(builder.build().unwrap())
}
}

View File

@@ -25,9 +25,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use index::target::IndexTarget;
use mito_codec::index::IndexValueCodec;
use mito_codec::row_converter::SortField;
use object_store::ObjectStore;
@@ -39,9 +37,7 @@ use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::index::inverted_index::InvertedIndexCacheRef;
use crate::error::{
BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
};
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -137,33 +133,38 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
return Ok(None);
}
let predicates = self
.output
.iter()
.map(|(column_id, predicates)| {
(
format!("{}", IndexTarget::ColumnId(*column_id)),
predicates.clone(),
)
})
.collect::<Vec<_>>();
let applier = PredicatesIndexApplier::try_from(predicates);
let expected_predicate_column_types = self.expected_predicate_column_types();
Ok(Some(
InvertedIndexApplier::new(
self.table_dir,
self.path_type,
self.object_store,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
self.output,
)
expected_predicate_column_types,
)?
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
.with_index_cache(self.inverted_index_cache),
))
}
/// Returns `(column_id, data_type)` pairs for predicate columns
/// collected in `self.output`.
///
/// The data types are resolved from the latest region manifest. Columns
/// that no longer exist in the latest metadata are skipped.
fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
self.output
.keys()
.filter_map(|col_id| {
let col = self.metadata.column_by_id(*col_id)?;
Some((*col_id, col.column_schema.data_type.clone()))
})
.collect()
}
/// Recursively traverses expressions to collect predicates.
/// Results are stored in `self.output`.
fn traverse_and_collect(&mut self, expr: &Expr) {

View File

@@ -614,9 +614,11 @@ mod tests {
.build(&[expr])
.unwrap()
.unwrap();
let sst_metadata = Arc::new(region_metadata.clone());
let plan = applier.plan_for_sst(&sst_metadata).unwrap().unwrap();
Box::pin(async move {
applier
.apply(index_id, None, None)
.apply(index_id, None, &plan.index_applier, None)
.await
.unwrap()
.matched_segment_ids

View File

@@ -929,11 +929,14 @@ mod tests {
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
let plan = inverted_index_applier
.as_ref()
.unwrap()
.plan_for_sst(&metadata)
.unwrap()
.unwrap();
let cached = index_result_cache
.get(
inverted_index_applier.unwrap().predicate_key(),
handle.file_id().file_id(),
)
.get(&plan.predicate_key, handle.file_id().file_id())
.unwrap();
// inverted index will search all row groups
assert!(cached.contains_row_group(0));

View File

@@ -681,6 +681,7 @@ impl ParquetReaderBuilder {
}
self.prune_row_groups_by_inverted_index(
read_format.metadata(),
row_group_size,
num_row_groups,
&mut output,
@@ -807,6 +808,7 @@ impl ParquetReaderBuilder {
/// the correctness of the index.
async fn prune_row_groups_by_inverted_index(
&self,
sst_metadata: &RegionMetadataRef,
row_group_size: usize,
num_row_groups: usize,
output: &mut RowGroupSelection,
@@ -825,12 +827,19 @@ impl ParquetReaderBuilder {
&self.inverted_index_appliers[..]
};
for index_applier in appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
let Ok(Some(plan)) = index_applier
.plan_for_sst(sst_metadata)
.inspect_err(|e| warn!(e; "failed to build compatible plan for sst"))
else {
continue;
};
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
let file_id = self.file_handle.file_id().file_id();
cache.get(&plan.predicate_key, file_id)
});
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
@@ -847,9 +856,11 @@ impl ParquetReaderBuilder {
.apply(
self.file_handle.index_id(),
Some(file_size_hint),
&plan.index_applier,
metrics.inverted_index_apply_metrics.as_mut(),
)
.await;
let selection = match apply_res {
Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
@@ -863,7 +874,7 @@ impl ParquetReaderBuilder {
};
self.apply_index_result_and_update_cache(
predicate_key,
&plan.predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
@@ -1832,8 +1843,21 @@ impl SimpleFilterContext {
match sst_meta.column_by_id(column.column_id) {
Some(sst_column) => {
debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
(column, MaybeFilter::Filter(filter))
// Schema evolution can make field columns with the same id have
// different concrete data types across SSTs. In that case,
// evaluating this simple filter against current SST column may
// raise an invalid cross-type comparison error (e.g. Float64 == Utf8).
let maybe_filter = if sst_column.column_schema.data_type
== column.column_schema.data_type
{
MaybeFilter::Filter(filter)
} else {
// Altering tag or timestamp column types is not allowed,
// so only field columns can reach this branch.
debug_assert_eq!(column.semantic_type, SemanticType::Field);
return None;
};
(column, maybe_filter)
}
None => {
// If the column is not present in the SST metadata, we evaluate the filter
@@ -2162,6 +2186,7 @@ mod tests {
use parquet::arrow::ArrowWriter;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_request::PathType;
use store_api::storage::RegionId;
use table::predicate::Predicate;
use super::*;
@@ -2326,6 +2351,64 @@ mod tests {
));
}
#[test]
fn test_simple_filter_context_drops_mismatched_field_filter() {
let (sst_metadata, latest_metadata) = mock_metadata();
let ctx = SimpleFilterContext::new_opt(
&sst_metadata,
Some(latest_metadata.as_ref()),
&col("field_0").eq(lit(1_i64)),
);
assert!(ctx.is_none());
}
fn mock_metadata() -> (RegionMetadataRef, RegionMetadataRef) {
let region_id = RegionId::new(1, 1);
let make_tag_0 = || ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 0,
};
let make_ts = || ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
};
let make_field_0 = |data_type| ColumnMetadata {
column_schema: ColumnSchema::new("field_0".to_string(), data_type, true),
semantic_type: SemanticType::Field,
column_id: 1,
};
let mut sst_builder = RegionMetadataBuilder::new(region_id);
sst_builder
.push_column_metadata(make_tag_0())
.push_column_metadata(make_field_0(ConcreteDataType::uint64_datatype()))
.push_column_metadata(make_ts())
.primary_key(vec![0]);
let sst_metadata = Arc::new(sst_builder.build().unwrap());
let mut expected_builder = RegionMetadataBuilder::new(region_id);
expected_builder
.push_column_metadata(make_tag_0())
.push_column_metadata(make_field_0(ConcreteDataType::int64_datatype()))
.push_column_metadata(make_ts())
.primary_key(vec![0]);
let expected_metadata = Arc::new(expected_builder.build().unwrap());
(sst_metadata, expected_metadata)
}
#[test]
fn test_physical_filter_context_skips_renamed_column() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());

View File

@@ -40,6 +40,7 @@ common-time.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-expr-common.workspace = true
datafusion-functions.workspace = true
datafusion-optimizer.workspace = true
datafusion-physical-expr.workspace = true

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod const_normalization;
pub mod constant_term;
pub mod count_nest_aggr;
pub mod count_wildcard;

File diff suppressed because it is too large Load Diff

View File

@@ -59,6 +59,7 @@ use crate::dist_plan::{
};
use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::const_normalization::ConstNormalizationRule;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_nest_aggr::CountNestAggrRule;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
@@ -157,6 +158,7 @@ impl QueryEngineState {
analyzer
.rules
.insert(0, Arc::new(CountWildcardToTimeIndexRule));
analyzer.rules.push(Arc::new(ConstNormalizationRule));
// Add ApplyFunctionRewrites rule,
// Note we cannot use `analyzer.add_function_rewrite`

View File

@@ -39,6 +39,7 @@ common-frontend.workspace = true
common-grpc.workspace = true
common-memory-manager.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-options.workspace = true
common-procedure.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true

View File

@@ -37,6 +37,7 @@ use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
};
use common_memory_manager::OnExhaustedPolicy;
use common_options::plugin_options::StandaloneFlag;
use flate2::Compression;
use flate2::write::GzEncoder;
use log_query::{Context, Limit, LogQuery, TimeFilter};
@@ -283,6 +284,7 @@ pub async fn test_http_auth_from_standalone_user_provider_config() {
let fe_opts = options.component.frontend_options();
let mut plugins = Plugins::new();
plugins.insert(StandaloneFlag);
plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
.await
.unwrap();

View File

@@ -0,0 +1,52 @@
-- Regression test for issue #8074
-- https://github.com/GreptimeTeam/greptimedb/issues/8074
CREATE TABLE monitoring_data (
host STRING INVERTED INDEX,
`region` STRING,
cpu_usage DOUBLE INVERTED INDEX,
`timestamp` TIMESTAMP TIME INDEX
) WITH ('append_mode'='true');
Affected Rows: 0
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
('web-02', 'us-east', 18.3, '2026-05-06 10:00:00'),
('web-03', 'us-east', 91.2, '2026-05-06 10:00:00'),
('web-04', 'us-west', 73.8, '2026-05-06 10:00:00');
Affected Rows: 4
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
('web-03', 'us-east', 94.5, '2026-05-06 10:01:00'),
('web-04', 'us-west', 78.1, '2026-05-06 10:01:00');
Affected Rows: 4
ADMIN FLUSH_TABLE('monitoring_data');
+--------------------------------------+
| ADMIN FLUSH_TABLE('monitoring_data') |
+--------------------------------------+
| 0 |
+--------------------------------------+
ALTER TABLE monitoring_data
MODIFY COLUMN cpu_usage STRING;
Affected Rows: 0
SELECT host FROM monitoring_data WHERE cpu_usage = '23.7' ORDER BY host;
+--------+
| host |
+--------+
| web-02 |
+--------+
DROP TABLE monitoring_data;
Affected Rows: 0

View File

@@ -0,0 +1,29 @@
-- Regression test for issue #8074
-- https://github.com/GreptimeTeam/greptimedb/issues/8074
CREATE TABLE monitoring_data (
host STRING INVERTED INDEX,
`region` STRING,
cpu_usage DOUBLE INVERTED INDEX,
`timestamp` TIMESTAMP TIME INDEX
) WITH ('append_mode'='true');
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
('web-02', 'us-east', 18.3, '2026-05-06 10:00:00'),
('web-03', 'us-east', 91.2, '2026-05-06 10:00:00'),
('web-04', 'us-west', 73.8, '2026-05-06 10:00:00');
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
('web-03', 'us-east', 94.5, '2026-05-06 10:01:00'),
('web-04', 'us-west', 78.1, '2026-05-06 10:01:00');
ADMIN FLUSH_TABLE('monitoring_data');
ALTER TABLE monitoring_data
MODIFY COLUMN cpu_usage STRING;
SELECT host FROM monitoring_data WHERE cpu_usage = '23.7' ORDER BY host;
DROP TABLE monitoring_data;

View File

@@ -81,6 +81,22 @@ drop table test_mito_options;
Affected Rows: 0
create table if not exists test_compaction_override_without_type(
host string,
ts timestamp,
memory double,
TIME INDEX (ts),
PRIMARY KEY(host)
)
engine=mito
with('compaction.override'='true');
Affected Rows: 0
drop table test_compaction_override_without_type;
Affected Rows: 0
create table if not exists invalid_compaction(
host string,
ts timestamp,

View File

@@ -67,6 +67,18 @@ with(
drop table test_mito_options;
create table if not exists test_compaction_override_without_type(
host string,
ts timestamp,
memory double,
TIME INDEX (ts),
PRIMARY KEY(host)
)
engine=mito
with('compaction.override'='true');
drop table test_compaction_override_without_type;
create table if not exists invalid_compaction(
host string,
ts timestamp,

View File

@@ -74,6 +74,7 @@ TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (elapsed_compute.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN VERBOSE (0, 10, '5s') test;
+-+-+
@@ -90,6 +91,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after TranscribeAtatRule_| SAME TEXT AS ABOVE_|
| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_|
| logical_plan after type_coercion_| SAME TEXT AS ABOVE_|
| logical_plan after ConstNormalizationRule_| SAME TEXT AS ABOVE_|
| logical_plan after DistPlannerAnalyzer_| Projection: test.i, test.j, test.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
@@ -218,6 +220,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
-- SQLNESS REPLACE (elapsed_compute.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
+-+-+
@@ -235,6 +238,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
| logical_plan after TranscribeAtatRule_| SAME TEXT AS ABOVE_|
| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_|
| logical_plan after type_coercion_| SAME TEXT AS ABOVE_|
| logical_plan after ConstNormalizationRule_| SAME TEXT AS ABOVE_|
| logical_plan after DistPlannerAnalyzer_| Projection: series, test.k, test.j_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Projection: test.i AS series, test.k, test.j_|
@@ -360,6 +364,394 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
|_|_|
+-+-+
CREATE TABLE test_nano(i DOUBLE, j TIMESTAMP(9) TIME INDEX, k STRING PRIMARY KEY);
Affected Rows: 0
INSERT INTO test_nano VALUES (1, 1000000, "a"), (1, 1000000, "b"), (2, 2000000, "a");
Affected Rows: 3
-- explain at 0s, 5s and 10s for a nanosecond time index.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN (0, 10, '5s') test_nano;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST |
| | Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j |
| | Projection: test_nano.i, test_nano.j, test_nano.k |
| | Filter: __common_expr_3 >= TimestampMillisecond(-299999, None) AND __common_expr_3 <= TimestampMillisecond(10000, None) |
| | Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_3, test_nano.i, test_nano.j, test_nano.k |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | TableScan: test_nano |
| | ]] |
| physical_plan | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true] |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j] |
| | FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, projection=[i@1, j@2, k@3] |
| | ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
-- explain verbose at 0s, 5s and 10s for a nanosecond time index.
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (elapsed_compute.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN VERBOSE (0, 10, '5s') test_nano;
+-+-+
| plan_type_| plan_|
+-+-+
| initial_logical_plan_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Filter: test_nano.j >= TimestampMillisecond(-299999, None) AND test_nano.j <= TimestampMillisecond(10000, None)_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_TableScan: test_nano_|
| logical_plan after apply_function_rewrites_| SAME TEXT AS ABOVE_|
| logical_plan after count_wildcard_to_time_index_rule_| SAME TEXT AS ABOVE_|
| logical_plan after StringNormalizationRule_| SAME TEXT AS ABOVE_|
| logical_plan after TranscribeAtatRule_| SAME TEXT AS ABOVE_|
| logical_plan after resolve_grouping_function_| SAME TEXT AS ABOVE_|
| logical_plan after type_coercion_| SAME TEXT AS ABOVE_|
| logical_plan after ConstNormalizationRule_| SAME TEXT AS ABOVE_|
| logical_plan after DistPlannerAnalyzer_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Filter: test_nano.j >= TimestampMillisecond(-299999, None) AND test_nano.j <= TimestampMillisecond(10000, None)_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after FixStateUdafOrderingAnalyzer_| SAME TEXT AS ABOVE_|
| analyzed_logical_plan_| SAME TEXT AS ABOVE_|
| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_|
| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_|
| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_|
| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_|
| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_filter_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: CAST(test_nano.j AS Timestamp(ms)) >= TimestampMillisecond(-299999, None) AND CAST(test_nano.j AS Timestamp(ms)) <= TimestampMillisecond(10000, None)_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
| logical_plan after common_sub_expression_eliminate_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: __common_expr_1 >= TimestampMillisecond(-299999, None) AND __common_expr_1 <= TimestampMillisecond(10000, None)_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_1, test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: __common_expr_1 >= TimestampMillisecond(-299999, None) AND __common_expr_1 <= TimestampMillisecond(10000, None)_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_1, test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after JsonTypeConcretizeRule_| SAME TEXT AS ABOVE_|
| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_|
| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_|
| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_|
| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_|
| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_filter_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_1, test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: CAST(test_nano.j AS Timestamp(ms)) >= TimestampMillisecond(-299999, None) AND CAST(test_nano.j AS Timestamp(ms)) <= TimestampMillisecond(10000, None)_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
| logical_plan after common_sub_expression_eliminate_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_1, test_nano.i, test_nano.j, test_nano.k_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: __common_expr_2 >= TimestampMillisecond(-299999, None) AND __common_expr_2 <= TimestampMillisecond(10000, None)_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_2, test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: __common_expr_2 >= TimestampMillisecond(-299999, None) AND __common_expr_2 <= TimestampMillisecond(10000, None)_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_2, test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after JsonTypeConcretizeRule_| SAME TEXT AS ABOVE_|
| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after replace_distinct_aggregate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_predicate_subquery_| SAME TEXT AS ABOVE_|
| logical_plan after scalar_subquery_to_join_| SAME TEXT AS ABOVE_|
| logical_plan after decorrelate_lateral_join_| SAME TEXT AS ABOVE_|
| logical_plan after extract_equijoin_predicate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_duplicated_expr_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_filter_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_cross_join_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_limit_| SAME TEXT AS ABOVE_|
| logical_plan after propagate_empty_relation_| SAME TEXT AS ABOVE_|
| logical_plan after filter_null_join_keys_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_outer_join_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_limit_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_filter_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_2, test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: CAST(test_nano.j AS Timestamp(ms)) >= TimestampMillisecond(-299999, None) AND CAST(test_nano.j AS Timestamp(ms)) <= TimestampMillisecond(10000, None)_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE_|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
| logical_plan after common_sub_expression_eliminate_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_2, test_nano.i, test_nano.j, test_nano.k_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: __common_expr_3 >= TimestampMillisecond(-299999, None) AND __common_expr_3 <= TimestampMillisecond(10000, None)_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_3, test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: __common_expr_3 >= TimestampMillisecond(-299999, None) AND __common_expr_3 <= TimestampMillisecond(10000, None)_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_3, test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after JsonTypeConcretizeRule_| SAME TEXT AS ABOVE_|
| logical_plan_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
|_|_Sort: test_nano.k ASC NULLS FIRST, test_nano.j ASC NULLS FIRST_|
|_|_Projection: test_nano.i, test_nano.k, CAST(test_nano.j AS Timestamp(ms)) AS j_|
|_|_Projection: test_nano.i, test_nano.j, test_nano.k_|
|_|_Filter: __common_expr_3 >= TimestampMillisecond(-299999, None) AND __common_expr_3 <= TimestampMillisecond(10000, None)_|
|_|_Projection: CAST(test_nano.j AS Timestamp(ms)) AS __common_expr_3, test_nano.i, test_nano.j, test_nano.k_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| TableScan: test_nano_|
|_| ]]_|
| initial_physical_plan_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[false]_|
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_ProjectionExec: expr=[i@1 as i, j@2 as j, k@3 as k]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_stats_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], statistics=[Rows=Inexact(2), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_ProjectionExec: expr=[i@1 as i, j@2 as j, k@3 as k], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]] |
|_|_MergeScanExec: REDACTED
|_|_|
| initial_physical_plan_with_schema_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[false], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_ProjectionExec: expr=[i@1 as i, j@2 as j, k@3 as k], schema=[i:Float64;N, j:Timestamp(ns), k:Utf8;N]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, schema=[__common_expr_3:Timestamp(ms), i:Float64;N, j:Timestamp(ns), k:Utf8;N]_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k], schema=[__common_expr_3:Timestamp(ms), i:Float64;N, j:Timestamp(ns), k:Utf8;N]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OutputRequirements_| OutputRequirementExec: order_by=[], dist_by=Unspecified_|
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[false]_|
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_ProjectionExec: expr=[i@1 as i, j@2 as j, k@3 as k]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_|
| physical_plan after join_selection_| SAME TEXT AS ABOVE_|
| physical_plan after LimitedDistinctAggregation_| SAME TEXT AS ABOVE_|
| physical_plan after FilterPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after parallelize_scan_| SAME TEXT AS ABOVE_|
| physical_plan after PassDistributionRule_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| OutputRequirementExec: order_by=[], dist_by=Unspecified_|
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true]_|
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_ProjectionExec: expr=[i@1 as i, j@2 as j, k@3 as k]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after EnforceDistribution_| OutputRequirementExec: order_by=[], dist_by=Unspecified_|
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true]_|
|_|_RepartitionExec: REDACTED
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true]_|
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_ProjectionExec: expr=[i@1 as i, j@2 as j, k@3 as k]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| OutputRequirementExec: order_by=[], dist_by=Unspecified_|
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true]_|
|_|_RepartitionExec: REDACTED
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_ProjectionExec: expr=[i@1 as i, j@2 as j, k@3 as k]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| OutputRequirementExec: order_by=[], dist_by=Unspecified_|
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true]_|
|_|_RepartitionExec: REDACTED
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, projection=[i@1, j@2, k@3]_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OutputRequirements_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true]_|
|_|_RepartitionExec: REDACTED
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, projection=[i@1, j@2, k@3]_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_|
| physical_plan after LimitPushPastWindows_| SAME TEXT AS ABOVE_|
| physical_plan after LimitPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_|
| physical_plan after PushdownSort_| SAME TEXT AS ABOVE_|
| physical_plan after EnsureCooperative_| SAME TEXT AS ABOVE_|
| physical_plan after FilterPushdown(Post)_| SAME TEXT AS ABOVE_|
| physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_|
| physical_plan after MatchesConstantTerm_| SAME TEXT AS ABOVE_|
| physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_|
| physical_plan after SanityCheckPlan_| SAME TEXT AS ABOVE_|
| physical_plan_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true]_|
|_|_RepartitionExec: REDACTED
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, projection=[i@1, j@2, k@3]_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan_with_stats_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], statistics=[Rows=Inexact(2), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_RepartitionExec: REDACTED
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, projection=[i@1, j@2, k@3], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]] |
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan_with_schema_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_PromSeriesDivideExec: tags=["k"], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_SortExec: expr=[k@1 ASC, j@2 ASC], preserve_partitioning=[true], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_RepartitionExec: REDACTED
|_|_ProjectionExec: expr=[i@0 as i, k@2 as k, CAST(j@1 AS Timestamp(ms)) as j], schema=[i:Float64;N, k:Utf8;N, j:Timestamp(ms)]_|
|_|_FilterExec: __common_expr_3@0 >= -299999 AND __common_expr_3@0 <= 10000, projection=[i@1, j@2, k@3], schema=[i:Float64;N, j:Timestamp(ns), k:Utf8;N]_|
|_|_ProjectionExec: expr=[CAST(j@1 AS Timestamp(ms)) as __common_expr_3, i@0 as i, j@1 as j, k@2 as k], schema=[__common_expr_3:Timestamp(ms), i:Float64;N, j:Timestamp(ns), k:Utf8;N]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
DROP TABLE test_nano;
Affected Rows: 0
DROP TABLE test;
Affected Rows: 0

View File

@@ -25,6 +25,7 @@ TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (elapsed_compute.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN VERBOSE (0, 10, '5s') test;
-- explain verbose at 0s, 5s and 10s. No point at 0s.
@@ -33,6 +34,28 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
-- SQLNESS REPLACE (elapsed_compute.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
CREATE TABLE test_nano(i DOUBLE, j TIMESTAMP(9) TIME INDEX, k STRING PRIMARY KEY);
INSERT INTO test_nano VALUES (1, 1000000, "a"), (1, 1000000, "b"), (2, 2000000, "a");
-- explain at 0s, 5s and 10s for a nanosecond time index.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN (0, 10, '5s') test_nano;
-- explain verbose at 0s, 5s and 10s for a nanosecond time index.
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (elapsed_compute.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
TQL EXPLAIN VERBOSE (0, 10, '5s') test_nano;
DROP TABLE test_nano;
DROP TABLE test;

View File

@@ -4,6 +4,7 @@ CREATE TABLE
IF NOT EXISTS `cpu` (
`rack` STRING NULL,
`os` STRING NULL,
`usage_small` SMALLINT NULL,
`usage_user` BIGINT NULL,
`greptime_timestamp` TIMESTAMP(9) NOT NULL,
TIME INDEX (`greptime_timestamp`),
@@ -25,11 +26,11 @@ Affected Rows: 0
INSERT INTO
cpu
VALUES
("1", "linux", 10, "2023-06-12 01:04:49"),
("1", "linux", 15, "2023-06-12 01:04:50"),
("3", "windows", 25, "2023-06-12 01:05:00"),
("5", "mac", 30, "2023-06-12 01:03:00"),
("7", "linux", 45, "2023-06-12 02:00:00");
("1", "linux", 10, 10, "2023-06-12 01:04:49"),
("1", "linux", 15, 15, "2023-06-12 01:04:50"),
("3", "windows", 25, 25, "2023-06-12 01:05:00"),
("5", "mac", 30, 30, "2023-06-12 01:03:00"),
("7", "linux", 45, 45, "2023-06-12 02:00:00");
Affected Rows: 5
@@ -44,14 +45,108 @@ ADMIN FLUSH_TABLE ('cpu');
INSERT INTO
cpu
VALUES
("2", "linux", 20, "2023-06-12 01:04:51"),
("2", "windows", 22, "2023-06-12 01:06:00"),
("4", "mac", 12, "2023-06-12 00:59:00"),
("6", "linux", 35, "2023-06-12 01:04:55"),
("8", "windows", 50, "2023-06-12 02:10:00");
("2", "linux", 20, 20, "2023-06-12 01:04:51"),
("2", "windows", 22, 22, "2023-06-12 01:06:00"),
("4", "mac", 12, 12, "2023-06-12 00:59:00"),
("6", "linux", 35, 35, "2023-06-12 01:04:55"),
("8", "windows", 50, 50, "2023-06-12 02:10:00");
Affected Rows: 5
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN SELECT
rack
FROM
cpu
WHERE
usage_small IN (10, 20);
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: cpu.rack |
| | Filter: cpu.usage_small = Int16(10) OR cpu.usage_small = Int16(20) |
| | TableScan: cpu |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN SELECT
rack
FROM
cpu
WHERE
usage_small BETWEEN 10 AND 20;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: cpu.rack |
| | Filter: cpu.usage_small >= Int16(10) AND cpu.usage_small <= Int16(20) |
| | TableScan: cpu |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
CREATE TABLE
IF NOT EXISTS `cpu_single` (
`rack` STRING NULL,
`usage_small` SMALLINT NULL,
`greptime_timestamp` TIMESTAMP(9) NOT NULL,
TIME INDEX (`greptime_timestamp`),
) ENGINE = mito
WITH
(append_mode = 'true', sst_format = 'flat');
Affected Rows: 0
INSERT INTO
cpu_single
VALUES
("1", 10, "2023-06-12 01:04:49"),
("2", 20, "2023-06-12 01:04:50"),
("3", 25, "2023-06-12 01:05:00");
Affected Rows: 3
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT
rack
FROM
cpu_single
WHERE
10 <= usage_small;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_FilterExec: usage_small@1 >= 10, projection=[rack@0] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_UnorderedScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["rack", "usage_small"], "filters": ["usage_small >= Int16(10)"], "flat_format": REDACTED, "REDACTED
|_|_|_|
|_|_| Total rows: 3_|
+-+-+-+
drop table cpu_single;
Affected Rows: 0
-- SQLNESS SORT_RESULT 3 1
select
count(*)

View File

@@ -4,6 +4,7 @@ CREATE TABLE
IF NOT EXISTS `cpu` (
`rack` STRING NULL,
`os` STRING NULL,
`usage_small` SMALLINT NULL,
`usage_user` BIGINT NULL,
`greptime_timestamp` TIMESTAMP(9) NOT NULL,
TIME INDEX (`greptime_timestamp`),
@@ -23,22 +24,71 @@ WITH
INSERT INTO
cpu
VALUES
("1", "linux", 10, "2023-06-12 01:04:49"),
("1", "linux", 15, "2023-06-12 01:04:50"),
("3", "windows", 25, "2023-06-12 01:05:00"),
("5", "mac", 30, "2023-06-12 01:03:00"),
("7", "linux", 45, "2023-06-12 02:00:00");
("1", "linux", 10, 10, "2023-06-12 01:04:49"),
("1", "linux", 15, 15, "2023-06-12 01:04:50"),
("3", "windows", 25, 25, "2023-06-12 01:05:00"),
("5", "mac", 30, 30, "2023-06-12 01:03:00"),
("7", "linux", 45, 45, "2023-06-12 02:00:00");
ADMIN FLUSH_TABLE ('cpu');
INSERT INTO
cpu
VALUES
("2", "linux", 20, "2023-06-12 01:04:51"),
("2", "windows", 22, "2023-06-12 01:06:00"),
("4", "mac", 12, "2023-06-12 00:59:00"),
("6", "linux", 35, "2023-06-12 01:04:55"),
("8", "windows", 50, "2023-06-12 02:10:00");
("2", "linux", 20, 20, "2023-06-12 01:04:51"),
("2", "windows", 22, 22, "2023-06-12 01:06:00"),
("4", "mac", 12, 12, "2023-06-12 00:59:00"),
("6", "linux", 35, 35, "2023-06-12 01:04:55"),
("8", "windows", 50, 50, "2023-06-12 02:10:00");
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN SELECT
rack
FROM
cpu
WHERE
usage_small IN (10, 20);
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN SELECT
rack
FROM
cpu
WHERE
usage_small BETWEEN 10 AND 20;
CREATE TABLE
IF NOT EXISTS `cpu_single` (
`rack` STRING NULL,
`usage_small` SMALLINT NULL,
`greptime_timestamp` TIMESTAMP(9) NOT NULL,
TIME INDEX (`greptime_timestamp`),
) ENGINE = mito
WITH
(append_mode = 'true', sst_format = 'flat');
INSERT INTO
cpu_single
VALUES
("1", 10, "2023-06-12 01:04:49"),
("2", 20, "2023-06-12 01:04:50"),
("3", 25, "2023-06-12 01:05:00");
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT
rack
FROM
cpu_single
WHERE
10 <= usage_small;
drop table cpu_single;
-- SQLNESS SORT_RESULT 3 1
select
@@ -70,4 +120,4 @@ where
group by
os;
drop table cpu;
drop table cpu;