mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 03:50:39 +00:00
fix(mito2): schema-safe inverted index pruning (#8089)
* fix(mito2): skip inverted index on per-SST type mismatch to avoid false negatives * restore INDEX_APPLY_MEMORY_USAGE * fix: cr * fix: cr
This commit is contained in:
@@ -20,15 +20,18 @@ use std::time::Instant;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use common_telemetry::warn;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
|
||||
use index::inverted_index::search::index_apply::{
|
||||
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
|
||||
ApplyOutput, IndexApplier, IndexNotFoundStrategy, PredicatesIndexApplier, SearchContext,
|
||||
};
|
||||
use index::inverted_index::search::predicate::Predicate;
|
||||
use index::target::IndexTarget;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
use puffin::puffin_manager::{PuffinManager, PuffinReader};
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
@@ -37,7 +40,8 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
|
||||
use crate::cache::index::result_cache::PredicateKey;
|
||||
use crate::error::{
|
||||
ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
|
||||
ApplyInvertedIndexSnafu, BuildIndexApplierSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
|
||||
PuffinReadBlobSnafu, Result,
|
||||
};
|
||||
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
|
||||
use crate::sst::file::RegionIndexId;
|
||||
@@ -121,10 +125,6 @@ pub(crate) struct InvertedIndexApplier {
|
||||
/// The cache of index files.
|
||||
file_cache: Option<FileCacheRef>,
|
||||
|
||||
/// Predefined index applier used to apply predicates to index files
|
||||
/// and return the relevant row group ids for further scan.
|
||||
index_applier: Box<dyn IndexApplier>,
|
||||
|
||||
/// The puffin manager factory.
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
|
||||
@@ -134,35 +134,49 @@ pub(crate) struct InvertedIndexApplier {
|
||||
/// Puffin metadata cache.
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
|
||||
/// Predicate key. Used to identify the predicate and fetch result from cache.
|
||||
predicate_key: PredicateKey,
|
||||
/// All collected predicates.
|
||||
predicates: BTreeMap<ColumnId, Vec<Predicate>>,
|
||||
|
||||
/// Default apply plan built from all collected predicates.
|
||||
default_plan: SstApplyPlan,
|
||||
|
||||
/// Expected predicate column types from the latest region metadata.
|
||||
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
|
||||
}
|
||||
|
||||
pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SstApplyPlan {
|
||||
pub predicate_key: PredicateKey,
|
||||
pub index_applier: Arc<PredicatesIndexApplier>,
|
||||
}
|
||||
|
||||
impl InvertedIndexApplier {
|
||||
/// Creates a new `InvertedIndexApplier`.
|
||||
pub fn new(
|
||||
table_dir: String,
|
||||
path_type: PathType,
|
||||
store: ObjectStore,
|
||||
index_applier: Box<dyn IndexApplier>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
predicates: BTreeMap<ColumnId, Vec<Predicate>>,
|
||||
) -> Self {
|
||||
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
|
||||
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
|
||||
) -> Result<Self> {
|
||||
let default_plan = Self::build_apply_plan(&predicates)?;
|
||||
INDEX_APPLY_MEMORY_USAGE.add(default_plan.index_applier.memory_usage() as i64);
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
table_dir,
|
||||
path_type,
|
||||
store,
|
||||
file_cache: None,
|
||||
index_applier,
|
||||
puffin_manager_factory,
|
||||
inverted_index_cache: None,
|
||||
puffin_metadata_cache: None,
|
||||
predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
|
||||
}
|
||||
predicates,
|
||||
default_plan,
|
||||
expected_predicate_col_types,
|
||||
})
|
||||
}
|
||||
|
||||
/// Sets the file cache.
|
||||
@@ -186,11 +200,12 @@ impl InvertedIndexApplier {
|
||||
self
|
||||
}
|
||||
|
||||
/// Applies predicates to the provided SST file id and returns the relevant row group ids.
|
||||
/// Applies predicates to one SST file with the provided index applier.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `file_id` - The region file ID to apply predicates to
|
||||
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
|
||||
/// * `index_applier` - Inverted index applier produced by `plan_for_sst`.
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
#[tracing::instrument(
|
||||
skip_all,
|
||||
@@ -200,6 +215,7 @@ impl InvertedIndexApplier {
|
||||
&self,
|
||||
file_id: RegionIndexId,
|
||||
file_size_hint: Option<u64>,
|
||||
index_applier: &PredicatesIndexApplier,
|
||||
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
|
||||
) -> Result<ApplyOutput> {
|
||||
let start = Instant::now();
|
||||
@@ -231,7 +247,7 @@ impl InvertedIndexApplier {
|
||||
InvertedIndexBlobReader::new(blob),
|
||||
index_cache.clone(),
|
||||
);
|
||||
self.index_applier
|
||||
index_applier
|
||||
.apply(
|
||||
context,
|
||||
&mut index_reader,
|
||||
@@ -243,7 +259,7 @@ impl InvertedIndexApplier {
|
||||
.context(ApplyInvertedIndexSnafu)
|
||||
} else {
|
||||
let mut index_reader = InvertedIndexBlobReader::new(blob);
|
||||
self.index_applier
|
||||
index_applier
|
||||
.apply(
|
||||
context,
|
||||
&mut index_reader,
|
||||
@@ -344,82 +360,141 @@ impl InvertedIndexApplier {
|
||||
.context(PuffinBuildReaderSnafu)
|
||||
}
|
||||
|
||||
/// Returns the predicate key.
|
||||
pub fn predicate_key(&self) -> &PredicateKey {
|
||||
&self.predicate_key
|
||||
/// Builds a per-SST apply plan.
|
||||
///
|
||||
/// Returns `None` when no compatible predicate remains for this SST.
|
||||
pub fn plan_for_sst(&self, sst_metadata: &RegionMetadataRef) -> Result<Option<SstApplyPlan>> {
|
||||
let mut compatible_predicates = BTreeMap::new();
|
||||
let mut has_type_mismatch = false;
|
||||
|
||||
for (col_id, expected) in &self.expected_predicate_col_types {
|
||||
if let Some(sst_col) = sst_metadata.column_by_id(*col_id)
|
||||
&& sst_col.column_schema.data_type != *expected
|
||||
{
|
||||
has_type_mismatch = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(predicates) = self.predicates.get(col_id) {
|
||||
compatible_predicates.insert(*col_id, predicates.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if compatible_predicates.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if !has_type_mismatch {
|
||||
return Ok(Some(self.default_plan.clone()));
|
||||
}
|
||||
|
||||
let plan = Self::build_apply_plan(&compatible_predicates)?;
|
||||
Ok(Some(plan))
|
||||
}
|
||||
|
||||
fn build_apply_plan(
|
||||
predicates_by_col: &BTreeMap<ColumnId, Vec<Predicate>>,
|
||||
) -> Result<SstApplyPlan> {
|
||||
let predicates = predicates_by_col
|
||||
.iter()
|
||||
.map(|(col_id, preds)| (format!("{}", IndexTarget::ColumnId(*col_id)), preds.clone()))
|
||||
.collect();
|
||||
|
||||
let index_applier =
|
||||
PredicatesIndexApplier::try_from(predicates).context(BuildIndexApplierSnafu)?;
|
||||
|
||||
let predicate_key = PredicateKey::new_inverted(Arc::new(predicates_by_col.clone()));
|
||||
Ok(SstApplyPlan {
|
||||
predicate_key,
|
||||
index_applier: Arc::new(index_applier),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for InvertedIndexApplier {
|
||||
fn drop(&mut self) {
|
||||
INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
|
||||
INDEX_APPLY_MEMORY_USAGE.sub(self.default_plan.index_applier.memory_usage() as i64);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use futures::io::Cursor;
|
||||
use index::bitmap::Bitmap;
|
||||
use index::inverted_index::search::index_apply::MockIndexApplier;
|
||||
use index::inverted_index::search::predicate::RegexMatchPredicate;
|
||||
use object_store::services::Memory;
|
||||
use puffin::puffin_manager::PuffinWriter;
|
||||
use store_api::storage::FileId;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
|
||||
use super::*;
|
||||
use crate::sst::index::RegionFileId;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_index_applier_apply_basic() {
|
||||
async fn test_plan_for_sst() {
|
||||
let (_d, puffin_manager_factory) =
|
||||
PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
|
||||
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let file_id = RegionFileId::new(0.into(), FileId::random());
|
||||
let index_id = RegionIndexId::new(file_id, 0);
|
||||
let table_dir = "table_dir".to_string();
|
||||
|
||||
let puffin_manager = puffin_manager_factory.build(
|
||||
object_store.clone(),
|
||||
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
|
||||
let mut predicates = BTreeMap::new();
|
||||
predicates.insert(
|
||||
1,
|
||||
vec![Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "foo".to_string(),
|
||||
})],
|
||||
);
|
||||
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
|
||||
writer
|
||||
.put_blob(
|
||||
INDEX_BLOB_TYPE,
|
||||
Cursor::new(vec![]),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
writer.finish().await.unwrap();
|
||||
|
||||
let mut mock_index_applier = MockIndexApplier::new();
|
||||
mock_index_applier.expect_memory_usage().returning(|| 100);
|
||||
mock_index_applier.expect_apply().returning(|_, _, _| {
|
||||
Ok(ApplyOutput {
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: 100,
|
||||
segment_row_count: 10,
|
||||
})
|
||||
});
|
||||
let expected_predicate_col_types =
|
||||
BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
|
||||
|
||||
let sst_index_applier = InvertedIndexApplier::new(
|
||||
table_dir.clone(),
|
||||
table_dir,
|
||||
PathType::Bare,
|
||||
object_store,
|
||||
Box::new(mock_index_applier),
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let output = sst_index_applier.apply(index_id, None, None).await.unwrap();
|
||||
assert_eq!(
|
||||
output,
|
||||
ApplyOutput {
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: 100,
|
||||
segment_row_count: 10,
|
||||
}
|
||||
predicates,
|
||||
expected_predicate_col_types,
|
||||
)
|
||||
.unwrap();
|
||||
let plan = sst_index_applier
|
||||
.plan_for_sst(&mock_region_metadata())
|
||||
.unwrap();
|
||||
assert!(plan.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_plan_for_sst_type_mismatch() {
|
||||
let (_d, puffin_manager_factory) =
|
||||
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let table_dir = "table_dir".to_string();
|
||||
|
||||
let mut predicates = BTreeMap::new();
|
||||
predicates.insert(
|
||||
1,
|
||||
vec![Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "foo".to_string(),
|
||||
})],
|
||||
);
|
||||
// Column id 1 is String in `mock_region_metadata`, set expected type to Int64.
|
||||
let expected_predicate_col_types =
|
||||
BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);
|
||||
|
||||
let sst_index_applier = InvertedIndexApplier::new(
|
||||
table_dir,
|
||||
PathType::Bare,
|
||||
object_store,
|
||||
puffin_manager_factory,
|
||||
predicates,
|
||||
expected_predicate_col_types,
|
||||
)
|
||||
.unwrap();
|
||||
let plan = sst_index_applier
|
||||
.plan_for_sst(&mock_region_metadata())
|
||||
.unwrap();
|
||||
assert!(plan.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -448,19 +523,52 @@ mod tests {
|
||||
.unwrap();
|
||||
writer.finish().await.unwrap();
|
||||
|
||||
let mut mock_index_applier = MockIndexApplier::new();
|
||||
mock_index_applier.expect_memory_usage().returning(|| 100);
|
||||
mock_index_applier.expect_apply().never();
|
||||
|
||||
let mut predicates = BTreeMap::new();
|
||||
predicates.insert(
|
||||
1,
|
||||
vec![Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "foo".to_string(),
|
||||
})],
|
||||
);
|
||||
let expected_predicate_col_types =
|
||||
BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
|
||||
let sst_index_applier = InvertedIndexApplier::new(
|
||||
table_dir.clone(),
|
||||
PathType::Bare,
|
||||
object_store,
|
||||
Box::new(mock_index_applier),
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let res = sst_index_applier.apply(index_id, None, None).await;
|
||||
predicates,
|
||||
expected_predicate_col_types,
|
||||
)
|
||||
.unwrap();
|
||||
let plan = sst_index_applier
|
||||
.plan_for_sst(&mock_region_metadata())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let res = sst_index_applier
|
||||
.apply(index_id, None, &plan.index_applier, None)
|
||||
.await;
|
||||
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
|
||||
}
|
||||
|
||||
fn mock_region_metadata() -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), false),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
})
|
||||
.primary_key(vec![1]);
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,9 +25,7 @@ use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
|
||||
use index::inverted_index::search::predicate::Predicate;
|
||||
use index::target::IndexTarget;
|
||||
use mito_codec::index::IndexValueCodec;
|
||||
use mito_codec::row_converter::SortField;
|
||||
use object_store::ObjectStore;
|
||||
@@ -39,9 +37,7 @@ use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::file_cache::FileCacheRef;
|
||||
use crate::cache::index::inverted_index::InvertedIndexCacheRef;
|
||||
use crate::error::{
|
||||
BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
|
||||
};
|
||||
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
@@ -137,33 +133,38 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let predicates = self
|
||||
.output
|
||||
.iter()
|
||||
.map(|(column_id, predicates)| {
|
||||
(
|
||||
format!("{}", IndexTarget::ColumnId(*column_id)),
|
||||
predicates.clone(),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let applier = PredicatesIndexApplier::try_from(predicates);
|
||||
let expected_predicate_column_types = self.expected_predicate_column_types();
|
||||
|
||||
Ok(Some(
|
||||
InvertedIndexApplier::new(
|
||||
self.table_dir,
|
||||
self.path_type,
|
||||
self.object_store,
|
||||
Box::new(applier.context(BuildIndexApplierSnafu)?),
|
||||
self.puffin_manager_factory,
|
||||
self.output,
|
||||
)
|
||||
expected_predicate_column_types,
|
||||
)?
|
||||
.with_file_cache(self.file_cache)
|
||||
.with_puffin_metadata_cache(self.puffin_metadata_cache)
|
||||
.with_index_cache(self.inverted_index_cache),
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns `(column_id, data_type)` pairs for predicate columns
|
||||
/// collected in `self.output`.
|
||||
///
|
||||
/// The data types are resolved from the latest region manifest. Columns
|
||||
/// that no longer exist in the latest metadata are skipped.
|
||||
fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
|
||||
self.output
|
||||
.keys()
|
||||
.filter_map(|col_id| {
|
||||
let col = self.metadata.column_by_id(*col_id)?;
|
||||
Some((*col_id, col.column_schema.data_type.clone()))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Recursively traverses expressions to collect predicates.
|
||||
/// Results are stored in `self.output`.
|
||||
fn traverse_and_collect(&mut self, expr: &Expr) {
|
||||
|
||||
@@ -614,9 +614,11 @@ mod tests {
|
||||
.build(&[expr])
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let sst_metadata = Arc::new(region_metadata.clone());
|
||||
let plan = applier.plan_for_sst(&sst_metadata).unwrap().unwrap();
|
||||
Box::pin(async move {
|
||||
applier
|
||||
.apply(index_id, None, None)
|
||||
.apply(index_id, None, &plan.index_applier, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.matched_segment_ids
|
||||
|
||||
@@ -929,11 +929,14 @@ mod tests {
|
||||
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
|
||||
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
|
||||
let plan = inverted_index_applier
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.plan_for_sst(&metadata)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let cached = index_result_cache
|
||||
.get(
|
||||
inverted_index_applier.unwrap().predicate_key(),
|
||||
handle.file_id().file_id(),
|
||||
)
|
||||
.get(&plan.predicate_key, handle.file_id().file_id())
|
||||
.unwrap();
|
||||
// inverted index will search all row groups
|
||||
assert!(cached.contains_row_group(0));
|
||||
|
||||
@@ -681,6 +681,7 @@ impl ParquetReaderBuilder {
|
||||
}
|
||||
|
||||
self.prune_row_groups_by_inverted_index(
|
||||
read_format.metadata(),
|
||||
row_group_size,
|
||||
num_row_groups,
|
||||
&mut output,
|
||||
@@ -807,6 +808,7 @@ impl ParquetReaderBuilder {
|
||||
/// the correctness of the index.
|
||||
async fn prune_row_groups_by_inverted_index(
|
||||
&self,
|
||||
sst_metadata: &RegionMetadataRef,
|
||||
row_group_size: usize,
|
||||
num_row_groups: usize,
|
||||
output: &mut RowGroupSelection,
|
||||
@@ -825,12 +827,19 @@ impl ParquetReaderBuilder {
|
||||
&self.inverted_index_appliers[..]
|
||||
};
|
||||
for index_applier in appliers.iter().flatten() {
|
||||
let predicate_key = index_applier.predicate_key();
|
||||
let Ok(Some(plan)) = index_applier
|
||||
.plan_for_sst(sst_metadata)
|
||||
.inspect_err(|e| warn!(e; "failed to build compatible plan for sst"))
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Fast path: return early if the result is in the cache.
|
||||
let cached = self
|
||||
.cache_strategy
|
||||
.index_result_cache()
|
||||
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
|
||||
let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
|
||||
let file_id = self.file_handle.file_id().file_id();
|
||||
cache.get(&plan.predicate_key, file_id)
|
||||
});
|
||||
|
||||
if let Some(result) = cached.as_ref()
|
||||
&& all_required_row_groups_searched(output, result)
|
||||
{
|
||||
@@ -847,9 +856,11 @@ impl ParquetReaderBuilder {
|
||||
.apply(
|
||||
self.file_handle.index_id(),
|
||||
Some(file_size_hint),
|
||||
&plan.index_applier,
|
||||
metrics.inverted_index_apply_metrics.as_mut(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let selection = match apply_res {
|
||||
Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
|
||||
row_group_size,
|
||||
@@ -863,7 +874,7 @@ impl ParquetReaderBuilder {
|
||||
};
|
||||
|
||||
self.apply_index_result_and_update_cache(
|
||||
predicate_key,
|
||||
&plan.predicate_key,
|
||||
self.file_handle.file_id().file_id(),
|
||||
selection,
|
||||
output,
|
||||
@@ -1832,8 +1843,21 @@ impl SimpleFilterContext {
|
||||
match sst_meta.column_by_id(column.column_id) {
|
||||
Some(sst_column) => {
|
||||
debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
|
||||
|
||||
(column, MaybeFilter::Filter(filter))
|
||||
// Schema evolution can make field columns with the same id have
|
||||
// different concrete data types across SSTs. In that case,
|
||||
// evaluating this simple filter against current SST column may
|
||||
// raise an invalid cross-type comparison error (e.g. Float64 == Utf8).
|
||||
let maybe_filter = if sst_column.column_schema.data_type
|
||||
== column.column_schema.data_type
|
||||
{
|
||||
MaybeFilter::Filter(filter)
|
||||
} else {
|
||||
// Altering tag or timestamp column types is not allowed,
|
||||
// so only field columns can reach this branch.
|
||||
debug_assert_eq!(column.semantic_type, SemanticType::Field);
|
||||
return None;
|
||||
};
|
||||
(column, maybe_filter)
|
||||
}
|
||||
None => {
|
||||
// If the column is not present in the SST metadata, we evaluate the filter
|
||||
@@ -2162,6 +2186,7 @@ mod tests {
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::RegionId;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use super::*;
|
||||
@@ -2326,6 +2351,64 @@ mod tests {
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_filter_context_drops_mismatched_field_filter() {
|
||||
let (sst_metadata, latest_metadata) = mock_metadata();
|
||||
let ctx = SimpleFilterContext::new_opt(
|
||||
&sst_metadata,
|
||||
Some(latest_metadata.as_ref()),
|
||||
&col("field_0").eq(lit(1_i64)),
|
||||
);
|
||||
|
||||
assert!(ctx.is_none());
|
||||
}
|
||||
|
||||
fn mock_metadata() -> (RegionMetadataRef, RegionMetadataRef) {
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let make_tag_0 = || ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_0".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 0,
|
||||
};
|
||||
let make_ts = || ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
};
|
||||
let make_field_0 = |data_type| ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("field_0".to_string(), data_type, true),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 1,
|
||||
};
|
||||
|
||||
let mut sst_builder = RegionMetadataBuilder::new(region_id);
|
||||
sst_builder
|
||||
.push_column_metadata(make_tag_0())
|
||||
.push_column_metadata(make_field_0(ConcreteDataType::uint64_datatype()))
|
||||
.push_column_metadata(make_ts())
|
||||
.primary_key(vec![0]);
|
||||
let sst_metadata = Arc::new(sst_builder.build().unwrap());
|
||||
|
||||
let mut expected_builder = RegionMetadataBuilder::new(region_id);
|
||||
expected_builder
|
||||
.push_column_metadata(make_tag_0())
|
||||
.push_column_metadata(make_field_0(ConcreteDataType::int64_datatype()))
|
||||
.push_column_metadata(make_ts())
|
||||
.primary_key(vec![0]);
|
||||
|
||||
let expected_metadata = Arc::new(expected_builder.build().unwrap());
|
||||
|
||||
(sst_metadata, expected_metadata)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_physical_filter_context_skips_renamed_column() {
|
||||
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
-- Regression test for issue #8074
|
||||
-- https://github.com/GreptimeTeam/greptimedb/issues/8074
|
||||
CREATE TABLE monitoring_data (
|
||||
host STRING INVERTED INDEX,
|
||||
`region` STRING,
|
||||
cpu_usage DOUBLE INVERTED INDEX,
|
||||
`timestamp` TIMESTAMP TIME INDEX
|
||||
) WITH ('append_mode'='true');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
|
||||
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
|
||||
('web-02', 'us-east', 18.3, '2026-05-06 10:00:00'),
|
||||
('web-03', 'us-east', 91.2, '2026-05-06 10:00:00'),
|
||||
('web-04', 'us-west', 73.8, '2026-05-06 10:00:00');
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
|
||||
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
|
||||
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
|
||||
('web-03', 'us-east', 94.5, '2026-05-06 10:01:00'),
|
||||
('web-04', 'us-west', 78.1, '2026-05-06 10:01:00');
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
ADMIN FLUSH_TABLE('monitoring_data');
|
||||
|
||||
+--------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('monitoring_data') |
|
||||
+--------------------------------------+
|
||||
| 0 |
|
||||
+--------------------------------------+
|
||||
|
||||
ALTER TABLE monitoring_data
|
||||
MODIFY COLUMN cpu_usage STRING;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SELECT host FROM monitoring_data WHERE cpu_usage = '23.7' ORDER BY host;
|
||||
|
||||
+--------+
|
||||
| host |
|
||||
+--------+
|
||||
| web-02 |
|
||||
+--------+
|
||||
|
||||
DROP TABLE monitoring_data;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
-- Regression test for issue #8074
|
||||
-- https://github.com/GreptimeTeam/greptimedb/issues/8074
|
||||
CREATE TABLE monitoring_data (
|
||||
host STRING INVERTED INDEX,
|
||||
`region` STRING,
|
||||
cpu_usage DOUBLE INVERTED INDEX,
|
||||
`timestamp` TIMESTAMP TIME INDEX
|
||||
) WITH ('append_mode'='true');
|
||||
|
||||
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
|
||||
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
|
||||
('web-02', 'us-east', 18.3, '2026-05-06 10:00:00'),
|
||||
('web-03', 'us-east', 91.2, '2026-05-06 10:00:00'),
|
||||
('web-04', 'us-west', 73.8, '2026-05-06 10:00:00');
|
||||
|
||||
INSERT INTO monitoring_data (host, region, cpu_usage, `timestamp`) VALUES
|
||||
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
|
||||
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
|
||||
('web-03', 'us-east', 94.5, '2026-05-06 10:01:00'),
|
||||
('web-04', 'us-west', 78.1, '2026-05-06 10:01:00');
|
||||
|
||||
ADMIN FLUSH_TABLE('monitoring_data');
|
||||
|
||||
ALTER TABLE monitoring_data
|
||||
MODIFY COLUMN cpu_usage STRING;
|
||||
|
||||
SELECT host FROM monitoring_data WHERE cpu_usage = '23.7' ORDER BY host;
|
||||
|
||||
DROP TABLE monitoring_data;
|
||||
Reference in New Issue
Block a user