diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 005e276bbd..8ed6c241bf 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -134,6 +134,7 @@ impl WriteFormat { /// Helper for reading the SST format. pub struct ReadFormat { + /// The metadata stored in the SST. metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, @@ -305,17 +306,23 @@ impl ReadFormat { &self, row_groups: &[impl Borrow], column_id: ColumnId, - ) -> Option { - let column = self.metadata.column_by_id(column_id)?; + ) -> StatValues { + let Some(column) = self.metadata.column_by_id(column_id) else { + // No such column in the SST. + return StatValues::NoColumn; + }; match column.semantic_type { SemanticType::Tag => self.tag_values(row_groups, column, true), SemanticType::Field => { - let index = self.field_id_to_index.get(&column_id)?; - Self::column_values(row_groups, column, *index, true) + // Safety: `field_id_to_index` is initialized by the semantic type. + let index = self.field_id_to_index.get(&column_id).unwrap(); + let stats = Self::column_values(row_groups, column, *index, true); + StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - Self::column_values(row_groups, column, index, true) + let stats = Self::column_values(row_groups, column, index, true); + StatValues::from_stats_opt(stats) } } } @@ -325,17 +332,23 @@ impl ReadFormat { &self, row_groups: &[impl Borrow], column_id: ColumnId, - ) -> Option { - let column = self.metadata.column_by_id(column_id)?; + ) -> StatValues { + let Some(column) = self.metadata.column_by_id(column_id) else { + // No such column in the SST. + return StatValues::NoColumn; + }; match column.semantic_type { SemanticType::Tag => self.tag_values(row_groups, column, false), SemanticType::Field => { - let index = self.field_id_to_index.get(&column_id)?; - Self::column_values(row_groups, column, *index, false) + // Safety: `field_id_to_index` is initialized by the semantic type. + let index = self.field_id_to_index.get(&column_id).unwrap(); + let stats = Self::column_values(row_groups, column, *index, false); + StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - Self::column_values(row_groups, column, index, false) + let stats = Self::column_values(row_groups, column, index, false); + StatValues::from_stats_opt(stats) } } } @@ -345,17 +358,23 @@ impl ReadFormat { &self, row_groups: &[impl Borrow], column_id: ColumnId, - ) -> Option { - let column = self.metadata.column_by_id(column_id)?; + ) -> StatValues { + let Some(column) = self.metadata.column_by_id(column_id) else { + // No such column in the SST. + return StatValues::NoColumn; + }; match column.semantic_type { - SemanticType::Tag => None, + SemanticType::Tag => StatValues::NoStats, SemanticType::Field => { - let index = self.field_id_to_index.get(&column_id)?; - Self::column_null_counts(row_groups, *index) + // Safety: `field_id_to_index` is initialized by the semantic type. + let index = self.field_id_to_index.get(&column_id).unwrap(); + let stats = Self::column_null_counts(row_groups, *index); + StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - Self::column_null_counts(row_groups, index) + let stats = Self::column_null_counts(row_groups, index); + StatValues::from_stats_opt(stats) } } } @@ -390,8 +409,7 @@ impl ReadFormat { row_groups: &[impl Borrow], column: &ColumnMetadata, is_min: bool, - ) -> Option { - let primary_key_encoding = self.metadata.primary_key_encoding; + ) -> StatValues { let is_first_tag = self .metadata .primary_key @@ -400,9 +418,28 @@ impl ReadFormat { .unwrap_or(false); if !is_first_tag { // Only the min-max of the first tag is available in the primary key. - return None; + return StatValues::NoStats; } + StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min)) + } + + /// Returns min/max values of the first tag. + /// Returns None if the tag does not have statistics. + fn first_tag_values( + &self, + row_groups: &[impl Borrow], + column: &ColumnMetadata, + is_min: bool, + ) -> Option { + debug_assert!(self + .metadata + .primary_key + .first() + .map(|id| *id == column.column_id) + .unwrap_or(false)); + + let primary_key_encoding = self.metadata.primary_key_encoding; let converter = build_primary_key_codec_with_fields( primary_key_encoding, [( @@ -452,6 +489,7 @@ impl ReadFormat { } /// Returns min/max values of specific non-tag columns. + /// Returns None if the column does not have statistics. fn column_values( row_groups: &[impl Borrow], column: &ColumnMetadata, @@ -544,6 +582,29 @@ impl ReadFormat { } } +/// Values of column statistics of the SST. +/// +/// It also distinguishes the case that a column is not found and +/// the column exists but has no statistics. +pub enum StatValues { + /// Values of each row group. + Values(ArrayRef), + /// No such column. + NoColumn, + /// Column exists but has no statistics. + NoStats, +} + +impl StatValues { + /// Creates a new `StatValues` instance from optional statistics. + pub fn from_stats_opt(stats: Option) -> Self { + match stats { + Some(stats) => StatValues::Values(stats), + None => StatValues::NoStats, + } + } +} + #[cfg(test)] impl ReadFormat { /// Creates a helper with existing `metadata` and all columns. diff --git a/src/mito2/src/sst/parquet/stats.rs b/src/mito2/src/sst/parquet/stats.rs index ead3679397..bf0ad4a46a 100644 --- a/src/mito2/src/sst/parquet/stats.rs +++ b/src/mito2/src/sst/parquet/stats.rs @@ -25,7 +25,7 @@ use parquet::file::metadata::RowGroupMetaData; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; -use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::format::{ReadFormat, StatValues}; /// Statistics for pruning row groups. pub(crate) struct RowGroupPruningStats<'a, T> { @@ -100,16 +100,18 @@ impl> PruningStatistics for RowGroupPruningStats<'_, fn min_values(&self, column: &Column) -> Option { let column_id = self.column_id_to_prune(&column.name)?; match self.read_format.min_values(self.row_groups, column_id) { - Some(values) => Some(values), - None => self.compat_default_value(&column.name), + StatValues::Values(values) => Some(values), + StatValues::NoColumn => self.compat_default_value(&column.name), + StatValues::NoStats => None, } } fn max_values(&self, column: &Column) -> Option { let column_id = self.column_id_to_prune(&column.name)?; match self.read_format.max_values(self.row_groups, column_id) { - Some(values) => Some(values), - None => self.compat_default_value(&column.name), + StatValues::Values(values) => Some(values), + StatValues::NoColumn => self.compat_default_value(&column.name), + StatValues::NoStats => None, } } @@ -118,10 +120,12 @@ impl> PruningStatistics for RowGroupPruningStats<'_, } fn null_counts(&self, column: &Column) -> Option { - let Some(column_id) = self.column_id_to_prune(&column.name) else { - return self.compat_null_count(&column.name); - }; - self.read_format.null_counts(self.row_groups, column_id) + let column_id = self.column_id_to_prune(&column.name)?; + match self.read_format.null_counts(self.row_groups, column_id) { + StatValues::Values(values) => Some(values), + StatValues::NoColumn => self.compat_null_count(&column.name), + StatValues::NoStats => None, + } } fn row_counts(&self, _column: &Column) -> Option { diff --git a/tests/cases/standalone/common/flow/flow_step_aggr.result b/tests/cases/standalone/common/flow/flow_step_aggr.result index 74113ccccb..5a20fb1e4c 100644 --- a/tests/cases/standalone/common/flow/flow_step_aggr.result +++ b/tests/cases/standalone/common/flow/flow_step_aggr.result @@ -50,6 +50,7 @@ ADMIN FLUSH_FLOW('calc_access_log_10s'); +-----------------------------------------+ -- query should return 3 rows +-- SQLNESS SORT_RESULT 3 1 SELECT "url", time_window FROM access_log_10s ORDER BY time_window; @@ -63,6 +64,7 @@ ORDER BY +------------+---------------------+ -- use hll_count to query the approximate data in access_log_10s +-- SQLNESS SORT_RESULT 3 1 SELECT "url", time_window, hll_count(state) FROM access_log_10s ORDER BY time_window; @@ -76,6 +78,7 @@ ORDER BY +------------+---------------------+---------------------------------+ -- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state +-- SQLNESS SORT_RESULT 3 1 SELECT "url", date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m, @@ -91,8 +94,8 @@ ORDER BY +------------+---------------------+------------+ | url | time_window_1m | uv_per_min | +------------+---------------------+------------+ -| /not_found | 2025-03-04T00:00:00 | 1 | | /dashboard | 2025-03-04T00:00:00 | 3 | +| /not_found | 2025-03-04T00:00:00 | 1 | +------------+---------------------+------------+ DROP FLOW calc_access_log_10s; diff --git a/tests/cases/standalone/common/flow/flow_step_aggr.sql b/tests/cases/standalone/common/flow/flow_step_aggr.sql index 92698d8de6..2849965bc9 100644 --- a/tests/cases/standalone/common/flow/flow_step_aggr.sql +++ b/tests/cases/standalone/common/flow/flow_step_aggr.sql @@ -36,16 +36,19 @@ INSERT INTO access_log VALUES ADMIN FLUSH_FLOW('calc_access_log_10s'); -- query should return 3 rows +-- SQLNESS SORT_RESULT 3 1 SELECT "url", time_window FROM access_log_10s ORDER BY time_window; -- use hll_count to query the approximate data in access_log_10s +-- SQLNESS SORT_RESULT 3 1 SELECT "url", time_window, hll_count(state) FROM access_log_10s ORDER BY time_window; -- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state +-- SQLNESS SORT_RESULT 3 1 SELECT "url", date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m, diff --git a/tests/cases/standalone/common/select/prune_pk.result b/tests/cases/standalone/common/select/prune_pk.result new file mode 100644 index 0000000000..afa7aba986 --- /dev/null +++ b/tests/cases/standalone/common/select/prune_pk.result @@ -0,0 +1,158 @@ +CREATE TABLE IF NOT EXISTS `test_multi_pk_filter` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `flag` INT NULL, `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`, `flag`) ) ENGINE=mito; + +Affected Rows: 0 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:00:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 0, 421, '2023-05-15 10:05:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'dev', 1, 356, '2023-05-15 10:10:00'); + +Affected Rows: 1 + +ADMIN FLUSH_TABLE('test_multi_pk_filter'); + ++-------------------------------------------+ +| ADMIN FLUSH_TABLE('test_multi_pk_filter') | ++-------------------------------------------+ +| 0 | ++-------------------------------------------+ + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'dev', 1, 412, '2023-05-15 10:15:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'dev', 1, 298, '2023-05-15 10:20:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:25:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 5874, '2023-05-15 10:30:00'); + +Affected Rows: 1 + +ADMIN FLUSH_TABLE('test_multi_pk_filter'); + ++-------------------------------------------+ +| ADMIN FLUSH_TABLE('test_multi_pk_filter') | ++-------------------------------------------+ +| 0 | ++-------------------------------------------+ + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 6132, '2023-05-15 10:35:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'testing', 1, 1287, '2023-05-15 10:40:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'testing', 1, 1432, '2023-05-15 10:45:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'testing', 1, 1056, '2023-05-15 10:50:00'); + +Affected Rows: 1 + +SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE + greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' + ORDER BY greptime_timestamp; + ++---------------------+---------------+------------+-------+ +| greptime_timestamp | namespace | env | total | ++---------------------+---------------+------------+-------+ +| 2023-05-15T10:00:00 | thermostat_v2 | production | 5289 | +| 2023-05-15T10:10:00 | thermostat_v2 | dev | 356 | +| 2023-05-15T10:15:00 | thermostat_v2 | dev | 412 | +| 2023-05-15T10:20:00 | thermostat_v2 | dev | 298 | +| 2023-05-15T10:25:00 | thermostat_v2 | production | 5289 | +| 2023-05-15T10:30:00 | thermostat_v2 | production | 5874 | +| 2023-05-15T10:35:00 | thermostat_v2 | production | 6132 | +| 2023-05-15T10:40:00 | thermostat_v2 | testing | 1287 | +| 2023-05-15T10:45:00 | thermostat_v2 | testing | 1432 | +| 2023-05-15T10:50:00 | thermostat_v2 | testing | 1056 | ++---------------------+---------------+------------+-------+ + +SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE + greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' AND env='dev' + ORDER BY greptime_timestamp; + ++---------------------+---------------+-----+-------+ +| greptime_timestamp | namespace | env | total | ++---------------------+---------------+-----+-------+ +| 2023-05-15T10:10:00 | thermostat_v2 | dev | 356 | +| 2023-05-15T10:15:00 | thermostat_v2 | dev | 412 | +| 2023-05-15T10:20:00 | thermostat_v2 | dev | 298 | ++---------------------+---------------+-----+-------+ + +DROP TABLE test_multi_pk_filter; + +Affected Rows: 0 + +CREATE TABLE IF NOT EXISTS `test_multi_pk_null` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`) ) ENGINE=mito; + +Affected Rows: 0 + +INSERT INTO test_multi_pk_null + (namespace, env, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 5289, '2023-05-15 10:00:00'); + +Affected Rows: 1 + +INSERT INTO test_multi_pk_null + (namespace, env, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 421, '2023-05-15 10:05:00'); + +Affected Rows: 1 + +ADMIN FLUSH_TABLE('test_multi_pk_null'); + ++-----------------------------------------+ +| ADMIN FLUSH_TABLE('test_multi_pk_null') | ++-----------------------------------------+ +| 0 | ++-----------------------------------------+ + +SELECT * FROM test_multi_pk_null WHERE env IS NOT NULL; + ++---------------+------------+-------+---------------------+ +| namespace | env | total | greptime_timestamp | ++---------------+------------+-------+---------------------+ +| thermostat_v2 | production | 5289 | 2023-05-15T10:00:00 | +| thermostat_v2 | production | 421 | 2023-05-15T10:05:00 | ++---------------+------------+-------+---------------------+ + +DROP TABLE test_multi_pk_null; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/select/prune_pk.sql b/tests/cases/standalone/common/select/prune_pk.sql new file mode 100644 index 0000000000..bb06b4ec7f --- /dev/null +++ b/tests/cases/standalone/common/select/prune_pk.sql @@ -0,0 +1,66 @@ +CREATE TABLE IF NOT EXISTS `test_multi_pk_filter` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `flag` INT NULL, `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`, `flag`) ) ENGINE=mito; + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:00:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 0, 421, '2023-05-15 10:05:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'dev', 1, 356, '2023-05-15 10:10:00'); + +ADMIN FLUSH_TABLE('test_multi_pk_filter'); + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'dev', 1, 412, '2023-05-15 10:15:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'dev', 1, 298, '2023-05-15 10:20:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:25:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 5874, '2023-05-15 10:30:00'); + +ADMIN FLUSH_TABLE('test_multi_pk_filter'); + +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 1, 6132, '2023-05-15 10:35:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'testing', 1, 1287, '2023-05-15 10:40:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'testing', 1, 1432, '2023-05-15 10:45:00'); +INSERT INTO test_multi_pk_filter + (namespace, env, flag, total, greptime_timestamp) + VALUES ('thermostat_v2', 'testing', 1, 1056, '2023-05-15 10:50:00'); + +SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE + greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' + ORDER BY greptime_timestamp; + +SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE + greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' AND env='dev' + ORDER BY greptime_timestamp; + +DROP TABLE test_multi_pk_filter; + +CREATE TABLE IF NOT EXISTS `test_multi_pk_null` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`) ) ENGINE=mito; + +INSERT INTO test_multi_pk_null + (namespace, env, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 5289, '2023-05-15 10:00:00'); +INSERT INTO test_multi_pk_null + (namespace, env, total, greptime_timestamp) + VALUES ('thermostat_v2', 'production', 421, '2023-05-15 10:05:00'); + +ADMIN FLUSH_TABLE('test_multi_pk_null'); + +SELECT * FROM test_multi_pk_null WHERE env IS NOT NULL; + +DROP TABLE test_multi_pk_null;