fix: read filter's column (#7579)

* fix: read columns need for filter

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

* feat: add support for explicit read columns in projection mappers

Signed-off-by: discord9 <discord9@163.com>

* test: add compatibility tests for projection mappers

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

* fix: rename variable for clarity and improve column ID retrieval logic

Signed-off-by: discord9 <discord9@163.com>

* fix: update scan input construction to include read column IDs

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness for projection filter

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: more redacting

Signed-off-by: discord9 <discord9@163.com>

* chore: more redact

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-01-20 16:19:50 +08:00
committed by GitHub
parent e7c19a87cd
commit aa3daf7053
7 changed files with 2188 additions and 134 deletions

View File

@@ -740,19 +740,17 @@ impl CompactionSstReaderBuilder<'_> {
}
fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
let mut scan_input = ScanInput::new(
self.sst_layer,
ProjectionMapper::all(&self.metadata, flat_format)?,
)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode)
.with_flat_format(flat_format);
let mapper = ProjectionMapper::all(&self.metadata, flat_format)?;
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode)
.with_flat_format(flat_format);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.

View File

@@ -1417,6 +1417,53 @@ mod tests {
.await;
}
#[tokio::test]
async fn test_compat_reader_projection_read_superset() {
let reader_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
// Output: tag_1, field_3, field_2. Read also includes field_4.
let mapper = ProjectionMapper::new_with_read_columns(
&expect_meta,
[1, 3, 2].into_iter(),
false,
vec![1, 3, 2, 4],
)
.unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3)],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_different_pk_encoding() {
let mut reader_meta = new_metadata(
@@ -1569,6 +1616,100 @@ mod tests {
assert_eq!(expected_batch, result);
}
#[test]
fn test_flat_compat_batch_with_read_projection_superset() {
let actual_metadata = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expected_metadata = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
// Adds a new field.
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
// Output projection: tag_1, field_2. Read also includes field_3.
let mapper = FlatProjectionMapper::new_with_read_columns(
&expected_metadata,
vec![1, 2],
vec![1, 2, 3],
)
.unwrap();
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[1, 2, 3].into_iter(),
None,
"test",
false,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
.unwrap()
.unwrap();
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
tag_builder.append_value("tag1");
tag_builder.append_value("tag1");
let tag_dict_array = Arc::new(tag_builder.finish());
let k1 = encode_key(&[Some("tag1")]);
let input_columns: Vec<ArrayRef> = vec![
tag_dict_array.clone(),
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&k1, &k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let input_schema =
to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
let result = compat_batch.compat(input_batch).unwrap();
let expected_schema =
to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
let expected_columns: Vec<ArrayRef> = vec![
tag_dict_array.clone(),
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&k1, &k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
assert_eq!(expected_batch, result);
}
#[test]
fn test_flat_compat_batch_with_different_pk_encoding() {
let mut actual_metadata = new_metadata(

View File

@@ -29,6 +29,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::projection::read_column_ids_from_projection;
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
@@ -44,12 +45,11 @@ pub struct FlatProjectionMapper {
metadata: RegionMetadataRef,
/// Schema for converted [RecordBatch] to return.
output_schema: SchemaRef,
/// Ids of columns to project. It keeps ids in the same order as the `projection`
/// indices to build the mapper.
/// Ids of columns to read from memtables and SSTs.
/// The mapper won't deduplicate the column ids.
///
/// Note that this doesn't contain the `__table_id` and `__tsid`.
column_ids: Vec<ColumnId>,
read_column_ids: Vec<ColumnId>,
/// Ids and DataTypes of columns of the expected batch.
/// We can use this to check if the batch is compatible with the expected schema.
///
@@ -72,30 +72,36 @@ impl FlatProjectionMapper {
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
) -> Result<Self> {
let mut projection: Vec<_> = projection.collect();
let projection: Vec<_> = projection.collect();
let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
Self::new_with_read_columns(metadata, projection, read_column_ids)
}
/// Returns a new mapper with output projection and explicit read columns.
pub fn new_with_read_columns(
metadata: &RegionMetadataRef,
projection: Vec<usize>,
read_column_ids: Vec<ColumnId>,
) -> Result<Self> {
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
if is_empty_projection {
// If the projection is empty, we still read the time index column.
projection.push(metadata.time_index_column_pos());
}
// Output column schemas for the projection.
let mut column_schemas = Vec::with_capacity(projection.len());
// Column ids of the projection without deduplication.
let mut column_ids = Vec::with_capacity(projection.len());
// Column ids of the output projection without deduplication.
let mut output_column_ids = Vec::with_capacity(projection.len());
for idx in &projection {
// For each projection index, we get the column id for projection.
let column = metadata
.column_metadatas
.get(*idx)
.context(InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
let column =
metadata
.column_metadatas
.get(*idx)
.with_context(|| InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
column_ids.push(column.column_id);
output_column_ids.push(column.column_id);
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
}
@@ -107,7 +113,7 @@ impl FlatProjectionMapper {
&id_to_index,
// All columns with internal columns.
metadata.column_metadatas.len() + 3,
column_ids.iter().copied(),
read_column_ids.iter().copied(),
);
let batch_schema = flat_projected_columns(metadata, &format_projection);
@@ -115,38 +121,46 @@ impl FlatProjectionMapper {
// Safety: We get the column id from the metadata.
let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(FlatProjectionMapper {
metadata: metadata.clone(),
output_schema: Arc::new(Schema::new(vec![])),
column_ids,
batch_schema: vec![],
is_empty_projection,
batch_indices: vec![],
input_arrow_schema,
});
}
// If projection is empty, we don't output any column.
let output_schema = if is_empty_projection {
Arc::new(Schema::new(vec![]))
} else {
// Safety: Columns come from existing schema.
Arc::new(Schema::new(column_schemas))
};
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
let batch_indices: Vec<_> = column_ids
.iter()
.map(|id| {
// Safety: The map is computed from `projection` itself.
format_projection
.column_id_to_projected_index
.get(id)
.copied()
.unwrap()
})
.collect();
let batch_indices = if is_empty_projection {
vec![]
} else {
output_column_ids
.iter()
.map(|id| {
// Safety: The map is computed from the read projection.
format_projection
.column_id_to_projected_index
.get(id)
.copied()
.with_context(|| {
let name = metadata
.column_by_id(*id)
.map(|column| column.column_schema.name.clone())
.unwrap_or_else(|| id.to_string());
InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!(
"output column {} is missing in read projection",
name
),
}
})
})
.collect::<Result<Vec<_>>>()?
};
Ok(FlatProjectionMapper {
metadata: metadata.clone(),
output_schema,
column_ids,
read_column_ids,
batch_schema,
is_empty_projection,
batch_indices,
@@ -167,7 +181,7 @@ impl FlatProjectionMapper {
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
&self.column_ids
&self.read_column_ids
}
/// Returns the field column start index in output batch.

View File

@@ -65,6 +65,29 @@ impl ProjectionMapper {
}
}
/// Returns a new mapper with output projection and explicit read columns.
pub fn new_with_read_columns(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
flat_format: bool,
read_column_ids: Vec<ColumnId>,
) -> Result<Self> {
let projection: Vec<_> = projection.collect();
if flat_format {
Ok(ProjectionMapper::Flat(
FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
))
} else {
Ok(ProjectionMapper::PrimaryKey(
PrimaryKeyProjectionMapper::new_with_read_columns(
metadata,
projection,
read_column_ids,
)?,
))
}
}
/// Returns a new mapper without projection.
pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
if flat_format {
@@ -147,10 +170,9 @@ pub struct PrimaryKeyProjectionMapper {
codec: Arc<dyn PrimaryKeyCodec>,
/// Schema for converted [RecordBatch].
output_schema: SchemaRef,
/// Ids of columns to project. It keeps ids in the same order as the `projection`
/// indices to build the mapper.
column_ids: Vec<ColumnId>,
/// Ids and DataTypes of field columns in the [Batch].
/// Ids of columns to read from memtables and SSTs.
read_column_ids: Vec<ColumnId>,
/// Ids and DataTypes of field columns in the read [Batch].
batch_fields: Vec<(ColumnId, ConcreteDataType)>,
/// `true` If the original projection is empty.
is_empty_projection: bool,
@@ -165,50 +187,46 @@ impl PrimaryKeyProjectionMapper {
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
) -> Result<PrimaryKeyProjectionMapper> {
let mut projection: Vec<_> = projection.collect();
let projection: Vec<_> = projection.collect();
let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
Self::new_with_read_columns(metadata, projection, read_column_ids)
}
/// Returns a new mapper with output projection and explicit read columns.
pub fn new_with_read_columns(
metadata: &RegionMetadataRef,
projection: Vec<usize>,
read_column_ids: Vec<ColumnId>,
) -> Result<PrimaryKeyProjectionMapper> {
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
if is_empty_projection {
// If the projection is empty, we still read the time index column.
projection.push(metadata.time_index_column_pos());
}
let mut column_schemas = Vec::with_capacity(projection.len());
let mut column_ids = Vec::with_capacity(projection.len());
for idx in &projection {
// For each projection index, we get the column id for projection.
let column = metadata
.column_metadatas
.get(*idx)
.context(InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
column_ids.push(column.column_id);
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
// For each projection index, we get the column schema for projection
column_schemas.push(
metadata
.schema
.column_schemas()
.get(*idx)
.with_context(|| InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?
.clone(),
);
}
let codec = build_primary_key_codec(metadata);
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(PrimaryKeyProjectionMapper {
metadata: metadata.clone(),
batch_indices: vec![],
has_tags: false,
codec,
output_schema: Arc::new(Schema::new(vec![])),
column_ids,
batch_fields: vec![],
is_empty_projection,
});
}
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
// Get fields in each batch.
let batch_fields = Batch::projected_fields(metadata, &column_ids);
// If projection is empty, we don't output any column.
let output_schema = if is_empty_projection {
Arc::new(Schema::new(vec![]))
} else {
// Safety: Columns come from existing schema.
Arc::new(Schema::new(column_schemas))
};
// Get fields in each read batch.
let batch_fields = Batch::projected_fields(metadata, &read_column_ids);
// Field column id to its index in batch.
let field_id_to_index: HashMap<_, _> = batch_fields
@@ -219,28 +237,37 @@ impl PrimaryKeyProjectionMapper {
// For each projected column, compute its index in batches.
let mut batch_indices = Vec::with_capacity(projection.len());
let mut has_tags = false;
for idx in &projection {
// Safety: idx is valid.
let column = &metadata.column_metadatas[*idx];
// Get column index in a batch by its semantic type and column id.
let batch_index = match column.semantic_type {
SemanticType::Tag => {
// Safety: It is a primary key column.
let index = metadata.primary_key_index(column.column_id).unwrap();
// We need to output a tag.
has_tags = true;
// We always read all primary key so the column always exists and the tag
// index is always valid.
BatchIndex::Tag((index, column.column_id))
}
SemanticType::Timestamp => BatchIndex::Timestamp,
SemanticType::Field => {
// Safety: It is a field column so it should be in `field_id_to_index`.
let index = field_id_to_index[&column.column_id];
BatchIndex::Field(index)
}
};
batch_indices.push(batch_index);
if !is_empty_projection {
for idx in &projection {
// Safety: idx is valid.
let column = &metadata.column_metadatas[*idx];
// Get column index in a batch by its semantic type and column id.
let batch_index = match column.semantic_type {
SemanticType::Tag => {
// Safety: It is a primary key column.
let index = metadata.primary_key_index(column.column_id).unwrap();
// We need to output a tag.
has_tags = true;
// We always read all primary key so the column always exists and the tag
// index is always valid.
BatchIndex::Tag((index, column.column_id))
}
SemanticType::Timestamp => BatchIndex::Timestamp,
SemanticType::Field => {
let index = *field_id_to_index.get(&column.column_id).context(
InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!(
"field column {} is missing in read projection",
column.column_schema.name
),
},
)?;
BatchIndex::Field(index)
}
};
batch_indices.push(batch_index);
}
}
Ok(PrimaryKeyProjectionMapper {
@@ -249,7 +276,7 @@ impl PrimaryKeyProjectionMapper {
has_tags,
codec,
output_schema,
column_ids,
read_column_ids,
batch_fields,
is_empty_projection,
})
@@ -273,7 +300,7 @@ impl PrimaryKeyProjectionMapper {
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
&self.column_ids
&self.read_column_ids
}
/// Returns ids of fields in [Batch]es the mapper expects to convert.
@@ -361,6 +388,29 @@ impl PrimaryKeyProjectionMapper {
}
}
pub(crate) fn read_column_ids_from_projection(
metadata: &RegionMetadataRef,
projection: &[usize],
) -> Result<Vec<ColumnId>> {
let mut column_ids = Vec::with_capacity(projection.len().max(1));
if projection.is_empty() {
column_ids.push(metadata.time_index_column().column_id);
return Ok(column_ids);
}
for idx in projection {
let column = metadata
.column_metadatas
.get(*idx)
.with_context(|| InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
column_ids.push(column.column_id);
}
Ok(column_ids)
}
/// Index of a vector in a [Batch].
#[derive(Debug, Clone, Copy)]
enum BatchIndex {
@@ -581,6 +631,43 @@ mod tests {
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_projection_mapper_read_superset() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Output columns v1, k0. Read also includes v0.
let mapper = ProjectionMapper::new_with_read_columns(
&metadata,
[4, 1].into_iter(),
false,
vec![4, 1, 3],
)
.unwrap();
assert_eq!([4, 1, 3], mapper.column_ids());
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
let expect = "\
+----+----+
| v1 | k0 |
+----+----+
| 4 | 1 |
| 4 | 1 |
| 4 | 1 |
+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_projection_mapper_empty_projection() {
let metadata = Arc::new(
@@ -781,6 +868,37 @@ mod tests {
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_flat_projection_mapper_read_superset() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Output columns v1, k0. Read also includes v0.
let mapper = ProjectionMapper::new_with_read_columns(
&metadata,
[4, 1].into_iter(),
true,
vec![4, 1, 3],
)
.unwrap();
assert_eq!([4, 1, 3], mapper.column_ids());
let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
let expect = "\
+----+----+
| v1 | k0 |
+----+----+
| 4 | 1 |
| 4 | 1 |
| 4 | 1 |
+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_flat_projection_mapper_empty_projection() {
let metadata = Arc::new(
@@ -794,7 +912,10 @@ mod tests {
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
assert!(mapper.output_schema().is_empty());
let flat_mapper = mapper.as_flat().unwrap();
assert!(flat_mapper.batch_schema().is_empty());
assert_eq!(
[(0, ConcreteDataType::timestamp_millisecond_datatype())],
flat_mapper.batch_schema()
);
let batch = new_flat_batch(Some(0), &[], &[], 3);
let record_batch = flat_mapper.convert(&batch).unwrap();

View File

@@ -33,11 +33,11 @@ use datafusion_expr::utils::expr_to_columns;
use futures::StreamExt;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::ResultExt;
use snafu::{OptionExt as _, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{
RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
};
use table::predicate::{Predicate, build_time_range_predicate};
use tokio::sync::{Semaphore, mpsc};
@@ -46,7 +46,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
use crate::error::{InvalidPartitionExprSnafu, Result};
use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::{MemtableRange, RangesOptions};
@@ -410,11 +410,25 @@ impl ScanRegion {
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
let flat_format = self.use_flat_format();
let read_column_ids = match &self.request.projection {
Some(p) => self.build_read_column_ids(p, &predicate)?,
None => self
.version
.metadata
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect(),
};
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Some(p) => {
ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
}
Some(p) => ProjectionMapper::new_with_read_columns(
&self.version.metadata,
p.iter().copied(),
flat_format,
read_column_ids.clone(),
)?,
None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
};
@@ -462,7 +476,7 @@ impl ScanRegion {
continue;
}
let ranges_in_memtable = m.ranges(
Some(mapper.column_ids()),
Some(read_column_ids.as_slice()),
RangesOptions::default()
.with_predicate(predicate.clone())
.with_sequence(SequenceRange::new(
@@ -512,7 +526,6 @@ impl ScanRegion {
search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
}
});
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
if flat_format {
// The batch is already large enough so we use a small channel size here.
@@ -571,6 +584,72 @@ impl ScanRegion {
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
}
/// Return all columns id to read according to the projection and filters.
fn build_read_column_ids(
&self,
projection: &[usize],
predicate: &PredicateGroup,
) -> Result<Vec<ColumnId>> {
let metadata = &self.version.metadata;
// use Vec for read_column_ids to keep the order of columns.
let mut read_column_ids = Vec::new();
let mut seen = HashSet::new();
for idx in projection {
let column =
metadata
.column_metadatas
.get(*idx)
.with_context(|| InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
seen.insert(column.column_id);
// keep the projection order
read_column_ids.push(column.column_id);
}
if projection.is_empty() {
let time_index = metadata.time_index_column().column_id;
if seen.insert(time_index) {
read_column_ids.push(time_index);
}
}
let mut extra_names = HashSet::new();
let mut columns = HashSet::new();
for expr in &self.request.filters {
columns.clear();
if expr_to_columns(expr, &mut columns).is_err() {
continue;
}
extra_names.extend(columns.iter().map(|column| column.name.clone()));
}
if let Some(expr) = predicate.region_partition_expr() {
expr.collect_column_names(&mut extra_names);
}
if !extra_names.is_empty() {
for column in &metadata.column_metadatas {
if extra_names.contains(column.column_schema.name.as_str())
&& !seen.contains(&column.column_id)
{
read_column_ids.push(column.column_id);
}
extra_names.remove(column.column_schema.name.as_str());
}
if !extra_names.is_empty() {
warn!(
"Some columns in filters are not found in region {}: {:?}",
metadata.region_id, extra_names
);
}
}
Ok(read_column_ids)
}
/// Partitions filters into two groups: non-field filters and field filters.
/// Returns `(non_field_filters, field_filters)`.
fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
@@ -729,6 +808,10 @@ pub struct ScanInput {
access_layer: AccessLayerRef,
/// Maps projected Batches to RecordBatches.
pub(crate) mapper: Arc<ProjectionMapper>,
/// Column ids to read from memtables and SSTs.
/// Notice this is different from the columns in `mapper` which are projected columns.
/// But this read columns might also include non-projected columns needed for filtering.
pub(crate) read_column_ids: Vec<ColumnId>,
/// Time range filter for time index.
time_range: Option<TimestampRange>,
/// Predicate to push down.
@@ -783,6 +866,7 @@ impl ScanInput {
pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
ScanInput {
access_layer,
read_column_ids: mapper.column_ids().to_vec(),
mapper: Arc::new(mapper),
time_range: None,
predicate: PredicateGroup::default(),
@@ -1064,7 +1148,7 @@ impl ScanInput {
.access_layer
.read_sst(file.clone())
.predicate(predicate)
.projection(Some(self.mapper.column_ids().to_vec()))
.projection(Some(self.read_column_ids.clone()))
.cache(self.cache_strategy.clone())
.inverted_index_appliers(self.inverted_index_appliers.clone())
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
@@ -1622,3 +1706,107 @@ impl PredicateGroup {
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion_expr::{col, lit};
use store_api::storage::ScanRequest;
use super::*;
use crate::memtable::time_partition::TimePartitions;
use crate::region::version::VersionBuilder;
use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
use crate::test_util::scheduler_util::SchedulerEnv;
fn new_version(metadata: RegionMetadataRef) -> VersionRef {
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
Arc::new(EmptyMemtableBuilder::default()),
0,
None,
));
Arc::new(VersionBuilder::new(metadata, mutable).build())
}
#[tokio::test]
async fn test_build_read_column_ids_includes_filters() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection: Some(vec![4]),
filters: vec![
col("v0").gt(lit(1)),
col("ts").gt(lit(0)),
col("k0").eq(lit("foo")),
],
..Default::default()
};
let scan_region = ScanRegion::new(
version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = scan_region.request.projection.as_ref().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
assert_eq!(vec![4, 0, 2, 3], read_ids);
}
#[tokio::test]
async fn test_build_read_column_ids_empty_projection() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection: Some(vec![]),
..Default::default()
};
let scan_region = ScanRegion::new(
version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = scan_region.request.projection.as_ref().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
// Empty projection should still read the time index column (id 2 in this test schema).
assert_eq!(vec![2], read_ids);
}
#[tokio::test]
async fn test_build_read_column_ids_keeps_projection_order() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection: Some(vec![4, 1]),
filters: vec![col("v0").gt(lit(1))],
..Default::default()
};
let scan_region = ScanRegion::new(
version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = scan_region.request.projection.as_ref().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
// Projection order preserved, extra columns appended in schema order.
assert_eq!(vec![4, 1, 3], read_ids);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,425 @@
-- Tests for scenarios where filter column is not in the projection (SELECT list)
-- This tests the query optimizer's ability to handle column pruning when:
-- 1. A column is used in WHERE clause but not in SELECT
-- 2. Multiple columns are used in WHERE but only subset in SELECT
-- 3. Aggregations with filters on non-projected columns
CREATE TABLE filter_prune_test (
ts TIMESTAMP TIME INDEX,
host STRING,
`region` STRING,
cpu_usage DOUBLE,
mem_usage DOUBLE,
disk_usage DOUBLE,
PRIMARY KEY (host, `region`)
);
INSERT INTO filter_prune_test VALUES
(1000, 'host1', 'us-east', 10.5, 20.0, 30.5),
(2000, 'host1', 'us-east', 15.5, 25.0, 35.5),
(3000, 'host1', 'us-west', 20.5, 30.0, 40.5),
(4000, 'host2', 'us-east', 25.5, 35.0, 45.5),
(5000, 'host2', 'us-west', 30.5, 40.0, 50.5),
(6000, 'host3', 'eu-west', 35.5, 45.0, 55.5);
-- Basic case: filter column (host) not in SELECT projection
SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' ORDER BY ts;
-- Filter on multiple columns (host, region) but only select value columns
SELECT mem_usage, cpu_usage FROM filter_prune_test WHERE host = 'host1' AND `region` = 'us-east' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT mem_usage, cpu_usage FROM filter_prune_test WHERE host = 'host1' AND `region` = 'us-east' ORDER BY ts;
-- Filter on value column (cpu_usage) not in projection
SELECT host, `region` FROM filter_prune_test WHERE cpu_usage > 20.0 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT host, `region` FROM filter_prune_test WHERE cpu_usage > 20.0 ORDER BY ts;
-- Filter on time index but time index not in projection
SELECT host, cpu_usage FROM filter_prune_test WHERE ts > 2000 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT host, cpu_usage FROM filter_prune_test WHERE ts > 2000 ORDER BY ts;
-- Complex filter: multiple columns in WHERE, only one in SELECT
SELECT mem_usage FROM filter_prune_test WHERE host = 'host1' AND cpu_usage > 10.0 AND `region` = 'us-east' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT mem_usage FROM filter_prune_test WHERE host = 'host1' AND cpu_usage > 10.0 AND `region` = 'us-east' ORDER BY ts;
-- Aggregation with filter on non-projected column
SELECT SUM(cpu_usage) as total_cpu FROM filter_prune_test WHERE host = 'host1';
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT SUM(cpu_usage) as total_cpu FROM filter_prune_test WHERE host = 'host1';
-- GROUP BY with filter on column not in projection or grouping
SELECT `region`, AVG(cpu_usage) as avg_cpu FROM filter_prune_test WHERE mem_usage > 25.0 GROUP BY `region` ORDER BY `region`;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT `region`, AVG(cpu_usage) as avg_cpu FROM filter_prune_test WHERE mem_usage > 25.0 GROUP BY `region` ORDER BY `region`;
-- Filter with IN clause on non-projected column
SELECT cpu_usage FROM filter_prune_test WHERE host IN ('host1', 'host2') ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host IN ('host1', 'host2') ORDER BY ts;
-- Filter with BETWEEN on non-projected column
SELECT host FROM filter_prune_test WHERE cpu_usage BETWEEN 15.0 AND 30.0 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT host FROM filter_prune_test WHERE cpu_usage BETWEEN 15.0 AND 30.0 ORDER BY ts;
-- Filter with LIKE on non-projected column
SELECT cpu_usage FROM filter_prune_test WHERE host LIKE 'host%' AND `region` LIKE 'us-%' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host LIKE 'host%' AND `region` LIKE 'us-%' ORDER BY ts;
-- Filter with OR conditions on non-projected columns
SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' OR `region` = 'eu-west' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' OR `region` = 'eu-west' ORDER BY ts;
-- Subquery with filter on non-projected column
SELECT cpu_usage FROM (SELECT * FROM filter_prune_test WHERE host = 'host1') sub WHERE `region` = 'us-east' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM (SELECT * FROM filter_prune_test WHERE host = 'host1') sub WHERE `region` = 'us-east' ORDER BY ts;
-- Repeat with data flushed to verify column pruning works with on-disk data
ADMIN FLUSH_TABLE('filter_prune_test');
-- Basic case: filter column (host) not in SELECT projection
SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' ORDER BY ts;
-- Filter on multiple columns (host, region) but only select value columns
SELECT mem_usage, cpu_usage FROM filter_prune_test WHERE host = 'host1' AND `region` = 'us-east' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT mem_usage, cpu_usage FROM filter_prune_test WHERE host = 'host1' AND `region` = 'us-east' ORDER BY ts;
-- Filter on value column (cpu_usage) not in projection
SELECT host, `region` FROM filter_prune_test WHERE cpu_usage > 20.0 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT host, `region` FROM filter_prune_test WHERE cpu_usage > 20.0 ORDER BY ts;
-- Filter on time index but time index not in projection
SELECT host, cpu_usage FROM filter_prune_test WHERE ts > 2000 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT host, cpu_usage FROM filter_prune_test WHERE ts > 2000 ORDER BY ts;
-- Complex filter: multiple columns in WHERE, only one in SELECT
SELECT mem_usage FROM filter_prune_test WHERE host = 'host1' AND cpu_usage > 10.0 AND `region` = 'us-east' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT mem_usage FROM filter_prune_test WHERE host = 'host1' AND cpu_usage > 10.0 AND `region` = 'us-east' ORDER BY ts;
-- Aggregation with filter on non-projected column
SELECT SUM(cpu_usage) as total_cpu FROM filter_prune_test WHERE host = 'host1';
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT SUM(cpu_usage) as total_cpu FROM filter_prune_test WHERE host = 'host1';
-- GROUP BY with filter on column not in projection or grouping
SELECT `region`, AVG(cpu_usage) as avg_cpu FROM filter_prune_test WHERE mem_usage > 25.0 GROUP BY `region` ORDER BY `region`;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT `region`, AVG(cpu_usage) as avg_cpu FROM filter_prune_test WHERE mem_usage > 25.0 GROUP BY `region` ORDER BY `region`;
-- Filter with IN clause on non-projected column
SELECT cpu_usage FROM filter_prune_test WHERE host IN ('host1', 'host2') ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host IN ('host1', 'host2') ORDER BY ts;
-- Filter with BETWEEN on non-projected column
SELECT host FROM filter_prune_test WHERE cpu_usage BETWEEN 15.0 AND 30.0 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT host FROM filter_prune_test WHERE cpu_usage BETWEEN 15.0 AND 30.0 ORDER BY ts;
-- Filter with LIKE on non-projected column
SELECT cpu_usage FROM filter_prune_test WHERE host LIKE 'host%' AND `region` LIKE 'us-%' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host LIKE 'host%' AND `region` LIKE 'us-%' ORDER BY ts;
-- Filter with OR conditions on non-projected columns
SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' OR `region` = 'eu-west' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM filter_prune_test WHERE host = 'host1' OR `region` = 'eu-west' ORDER BY ts;
-- Subquery with filter on non-projected column
SELECT cpu_usage FROM (SELECT * FROM filter_prune_test WHERE host = 'host1') sub WHERE `region` = 'us-east' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT cpu_usage FROM (SELECT * FROM filter_prune_test WHERE host = 'host1') sub WHERE `region` = 'us-east' ORDER BY ts;
DROP TABLE filter_prune_test;
-- Test with data flushed to files to verify file-level column pruning
CREATE TABLE filter_prune_files (
ts TIMESTAMP TIME INDEX,
tag_key STRING PRIMARY KEY,
field1 DOUBLE,
field2 DOUBLE,
field3 DOUBLE
);
INSERT INTO filter_prune_files VALUES
(1000, 'a', 1.0, 2.0, 3.0),
(2000, 'a', 4.0, 5.0, 6.0),
(3000, 'b', 7.0, 8.0, 9.0);
ADMIN FLUSH_TABLE('filter_prune_files');
-- Second batch goes to files after flush
INSERT INTO filter_prune_files VALUES
(4000, 'b', 10.0, 11.0, 12.0),
(5000, 'c', 13.0, 14.0, 15.0);
ADMIN FLUSH_TABLE('filter_prune_files');
-- Third batch stays in memtable (no flush)
INSERT INTO filter_prune_files VALUES
(6000, 'c', 16.0, 17.0, 18.0),
(7000, 'd', 19.0, 20.0, 21.0);
-- Query after flush and additional memtable writes: filter on tag (not in projection), select only field1
SELECT field1 FROM filter_prune_files WHERE tag_key = 'a' ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT field1 FROM filter_prune_files WHERE tag_key = 'a' ORDER BY ts;
-- Query: filter on field2 (not in projection), select only field1
SELECT field1 FROM filter_prune_files WHERE field2 > 5.0 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT field1 FROM filter_prune_files WHERE field2 > 5.0 ORDER BY ts;
-- Complex: filter on tag and field2, select only field3
SELECT field3 FROM filter_prune_files WHERE tag_key = 'b' AND field2 > 7.0 ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE \"files\":\s\[\{.* \"file\":REDACTED
-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE "flat_format":\s\w+, "flat_format": REDACTED,
EXPLAIN ANALYZE VERBOSE SELECT field3 FROM filter_prune_files WHERE tag_key = 'b' AND field2 > 7.0 ORDER BY ts;
DROP TABLE filter_prune_files;