feat: split field filters and index appliers

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-10-27 21:18:17 +08:00
parent c004e1c932
commit a2f8893c9e
3 changed files with 264 additions and 292 deletions

View File

@@ -466,12 +466,22 @@ impl ScanRegion {
self.version.options.append_mode,
);
// Remove field filters for LastNonNull mode after logging the request.
self.maybe_remove_field_filters();
// // Remove field filters for LastNonNull mode after logging the request.
// self.maybe_remove_field_filters();
let inverted_index_applier = self.build_invereted_index_applier(&self.request.filters);
let bloom_filter_applier = self.build_bloom_filter_applier(&self.request.filters);
let fulltext_index_applier = self.build_fulltext_index_applier(&self.request.filters);
let (non_field_filters, field_filters) = self.partition_by_field_filters();
let inverted_index_appliers = [
self.build_invereted_index_applier(&non_field_filters),
self.build_invereted_index_applier(&field_filters),
];
let bloom_filter_appliers = [
self.build_bloom_filter_applier(&non_field_filters),
self.build_bloom_filter_applier(&field_filters),
];
let fulltext_index_appliers = [
self.build_fulltext_index_applier(&non_field_filters),
self.build_fulltext_index_applier(&field_filters),
];
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
if self.flat_format {
@@ -485,9 +495,9 @@ impl ScanRegion {
.with_memtables(mem_range_builders)
.with_files(files)
.with_cache(self.cache_strategy)
.with_inverted_index_applier(inverted_index_applier)
.with_bloom_filter_index_applier(bloom_filter_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_inverted_index_appliers(inverted_index_appliers)
.with_bloom_filter_index_appliers(bloom_filter_appliers)
.with_fulltext_index_appliers(fulltext_index_appliers)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.max_concurrent_scan_files)
.with_start_time(self.start_time)
@@ -527,76 +537,30 @@ impl ScanRegion {
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
}
/// Remove field filters if the merge mode is [MergeMode::LastNonNull].
fn maybe_remove_field_filters(&mut self) {
if self.version.options.merge_mode() != MergeMode::LastNonNull {
return;
}
// TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
/// Partitions filters into two groups: non-field filters and field filters.
/// Returns `(non_field_filters, field_filters)`.
fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
let field_columns = self
.version
.metadata
.field_columns()
.map(|col| &col.column_schema.name)
.collect::<HashSet<_>>();
// Columns in the expr.
let mut columns = HashSet::new();
self.request.filters.retain(|expr| {
self.request.filters.iter().cloned().partition(|expr| {
columns.clear();
// `expr_to_columns` won't return error.
if expr_to_columns(expr, &mut columns).is_err() {
return false;
// If we can't extract columns, treat it as non-field filter
return true;
}
for column in &columns {
if field_columns.contains(&column.name) {
// This expr uses the field column.
return false;
}
}
true
});
}
/// Collects and returns filters that don't reference field columns.
/// Returns a new vector containing only non-field filters.
/// If the region is append-only, returns None.
fn filters_without_field_filters(&self) -> Option<Vec<Expr>> {
if self.version.options.append_mode {
return None;
}
let field_columns = self
.version
.metadata
.field_columns()
.map(|col| &col.column_schema.name)
.collect::<HashSet<_>>();
let mut columns = HashSet::new();
let filters = self
.request
.filters
.iter()
.filter(|expr| {
columns.clear();
// `expr_to_columns` won't return error.
if expr_to_columns(expr, &mut columns).is_err() {
return false;
}
// Keep the filter if it doesn't reference any field columns
for column in &columns {
if field_columns.contains(&column.name) {
return false;
}
}
true
})
.cloned()
.collect();
Some(filters)
// Return true for non-field filters (partition puts true cases in first vec)
!columns
.iter()
.any(|column| field_columns.contains(&column.name))
})
}
/// Use the latest schema to build the inverted index applier.
@@ -725,9 +689,9 @@ pub struct ScanInput {
/// Maximum number of SST files to scan concurrently.
pub(crate) max_concurrent_scan_files: usize,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
/// Start time of the query.
pub(crate) query_start: Option<Instant>,
/// The region is using append mode.
@@ -764,9 +728,9 @@ impl ScanInput {
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
fulltext_index_appliers: [None, None],
query_start: None,
append_mode: false,
filter_deleted: true,
@@ -843,33 +807,33 @@ impl ScanInput {
self
}
/// Sets invereted index applier.
/// Sets inverted index appliers.
#[must_use]
pub(crate) fn with_inverted_index_applier(
pub(crate) fn with_inverted_index_appliers(
mut self,
applier: Option<InvertedIndexApplierRef>,
appliers: [Option<InvertedIndexApplierRef>; 2],
) -> Self {
self.inverted_index_applier = applier;
self.inverted_index_appliers = appliers;
self
}
/// Sets bloom filter applier.
/// Sets bloom filter appliers.
#[must_use]
pub(crate) fn with_bloom_filter_index_applier(
pub(crate) fn with_bloom_filter_index_appliers(
mut self,
applier: Option<BloomFilterIndexApplierRef>,
appliers: [Option<BloomFilterIndexApplierRef>; 2],
) -> Self {
self.bloom_filter_index_applier = applier;
self.bloom_filter_index_appliers = appliers;
self
}
/// Sets fulltext index applier.
/// Sets fulltext index appliers.
#[must_use]
pub(crate) fn with_fulltext_index_applier(
pub(crate) fn with_fulltext_index_appliers(
mut self,
applier: Option<FulltextIndexApplierRef>,
appliers: [Option<FulltextIndexApplierRef>; 2],
) -> Self {
self.fulltext_index_applier = applier;
self.fulltext_index_appliers = appliers;
self
}
@@ -998,9 +962,9 @@ impl ScanInput {
.predicate(predicate)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_strategy.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
.bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.inverted_index_appliers(self.inverted_index_appliers.clone())
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.compaction(self.compaction)

View File

@@ -850,8 +850,8 @@ mod tests {
object_store.clone(),
)
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
@@ -906,8 +906,8 @@ mod tests {
object_store.clone(),
)
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
@@ -963,8 +963,8 @@ mod tests {
object_store.clone(),
)
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();

View File

@@ -106,9 +106,9 @@ pub struct ParquetReaderBuilder {
/// Strategy to cache SST data.
cache_strategy: CacheStrategy,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
/// Expected metadata of the region while reading the SST.
/// This is usually the latest metadata of the region. The reader use
/// it get the correct column id of a column by name.
@@ -135,9 +135,9 @@ impl ParquetReaderBuilder {
predicate: None,
projection: None,
cache_strategy: CacheStrategy::Disabled,
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
fulltext_index_appliers: [None, None],
expected_metadata: None,
flat_format: false,
compaction: false,
@@ -167,33 +167,33 @@ impl ParquetReaderBuilder {
self
}
/// Attaches the inverted index applier to the builder.
/// Attaches the inverted index appliers to the builder.
#[must_use]
pub(crate) fn inverted_index_applier(
pub(crate) fn inverted_index_appliers(
mut self,
index_applier: Option<InvertedIndexApplierRef>,
index_appliers: [Option<InvertedIndexApplierRef>; 2],
) -> Self {
self.inverted_index_applier = index_applier;
self.inverted_index_appliers = index_appliers;
self
}
/// Attaches the bloom filter index applier to the builder.
/// Attaches the bloom filter index appliers to the builder.
#[must_use]
pub(crate) fn bloom_filter_index_applier(
pub(crate) fn bloom_filter_index_appliers(
mut self,
index_applier: Option<BloomFilterIndexApplierRef>,
index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
) -> Self {
self.bloom_filter_index_applier = index_applier;
self.bloom_filter_index_appliers = index_appliers;
self
}
/// Attaches the fulltext index applier to the builder.
/// Attaches the fulltext index appliers to the builder.
#[must_use]
pub(crate) fn fulltext_index_applier(
pub(crate) fn fulltext_index_appliers(
mut self,
index_applier: Option<FulltextIndexApplierRef>,
index_appliers: [Option<FulltextIndexApplierRef>; 2],
) -> Self {
self.fulltext_index_applier = index_applier;
self.fulltext_index_appliers = index_appliers;
self
}
@@ -461,49 +461,51 @@ impl ParquetReaderBuilder {
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.fulltext_index_applier else {
return false;
};
if !self.file_handle.meta_ref().fulltext_index_available() {
return false;
}
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
return true;
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
Ok(None) => return false,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
return false;
let mut pruned = false;
for index_applier in self.fulltext_index_appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
pruned = true;
continue;
}
};
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_FULLTEXT,
);
true
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
Ok(None) => continue,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
continue;
}
};
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_FULLTEXT,
);
pruned = true;
}
pruned
}
/// Applies index to prune row groups.
@@ -518,52 +520,54 @@ impl ParquetReaderBuilder {
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.inverted_index_applier else {
return false;
};
if !self.file_handle.meta_ref().inverted_index_available() {
return false;
}
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
return true;
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
num_row_groups,
output,
),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
return false;
let mut pruned = false;
for index_applier in self.inverted_index_appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
pruned = true;
continue;
}
};
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_INVERTED,
);
true
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
num_row_groups,
output,
),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
continue;
}
};
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_INVERTED,
);
pruned = true;
}
pruned
}
async fn prune_row_groups_by_bloom_filter(
@@ -573,64 +577,66 @@ impl ParquetReaderBuilder {
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.bloom_filter_index_applier else {
return false;
};
if !self.file_handle.meta_ref().bloom_filter_index_available() {
return false;
}
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
return true;
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let mut selection = match apply_res {
Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
return false;
let mut pruned = false;
for index_applier in self.bloom_filter_index_appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
pruned = true;
continue;
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let mut selection = match apply_res {
Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
continue;
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
}
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_BLOOM,
);
pruned = true;
}
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_BLOOM,
);
true
pruned
}
async fn prune_row_groups_by_fulltext_bloom(
@@ -640,67 +646,69 @@ impl ParquetReaderBuilder {
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.fulltext_index_applier else {
return false;
};
if !self.file_handle.meta_ref().fulltext_index_available() {
return false;
}
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
return true;
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let mut selection = match apply_res {
Ok(Some(apply_output)) => {
RowGroupSelection::from_row_ranges(apply_output, row_group_size)
let mut pruned = false;
for index_applier in self.fulltext_index_appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
pruned = true;
continue;
}
Ok(None) => return false,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
return false;
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let mut selection = match apply_res {
Ok(Some(apply_output)) => {
RowGroupSelection::from_row_ranges(apply_output, row_group_size)
}
Ok(None) => continue,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
continue;
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_FULLTEXT,
);
pruned = true;
}
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
metrics,
INDEX_TYPE_FULLTEXT,
);
true
pruned
}
/// Prunes row groups by min-max index.