diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index c8e23f94a1..90df3a5f55 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -466,12 +466,22 @@ impl ScanRegion { self.version.options.append_mode, ); - // Remove field filters for LastNonNull mode after logging the request. - self.maybe_remove_field_filters(); + // // Remove field filters for LastNonNull mode after logging the request. + // self.maybe_remove_field_filters(); - let inverted_index_applier = self.build_invereted_index_applier(&self.request.filters); - let bloom_filter_applier = self.build_bloom_filter_applier(&self.request.filters); - let fulltext_index_applier = self.build_fulltext_index_applier(&self.request.filters); + let (non_field_filters, field_filters) = self.partition_by_field_filters(); + let inverted_index_appliers = [ + self.build_invereted_index_applier(&non_field_filters), + self.build_invereted_index_applier(&field_filters), + ]; + let bloom_filter_appliers = [ + self.build_bloom_filter_applier(&non_field_filters), + self.build_bloom_filter_applier(&field_filters), + ]; + let fulltext_index_appliers = [ + self.build_fulltext_index_applier(&non_field_filters), + self.build_fulltext_index_applier(&field_filters), + ]; let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?; if self.flat_format { @@ -485,9 +495,9 @@ impl ScanRegion { .with_memtables(mem_range_builders) .with_files(files) .with_cache(self.cache_strategy) - .with_inverted_index_applier(inverted_index_applier) - .with_bloom_filter_index_applier(bloom_filter_applier) - .with_fulltext_index_applier(fulltext_index_applier) + .with_inverted_index_appliers(inverted_index_appliers) + .with_bloom_filter_index_appliers(bloom_filter_appliers) + .with_fulltext_index_appliers(fulltext_index_appliers) .with_parallel_scan_channel_size(self.parallel_scan_channel_size) .with_max_concurrent_scan_files(self.max_concurrent_scan_files) .with_start_time(self.start_time) @@ -527,76 +537,30 @@ impl ScanRegion { build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters) } - /// Remove field filters if the merge mode is [MergeMode::LastNonNull]. - fn maybe_remove_field_filters(&mut self) { - if self.version.options.merge_mode() != MergeMode::LastNonNull { - return; - } - - // TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window. + /// Partitions filters into two groups: non-field filters and field filters. + /// Returns `(non_field_filters, field_filters)`. + fn partition_by_field_filters(&self) -> (Vec, Vec) { let field_columns = self .version .metadata .field_columns() .map(|col| &col.column_schema.name) .collect::>(); - // Columns in the expr. + let mut columns = HashSet::new(); - self.request.filters.retain(|expr| { + self.request.filters.iter().cloned().partition(|expr| { columns.clear(); // `expr_to_columns` won't return error. if expr_to_columns(expr, &mut columns).is_err() { - return false; + // If we can't extract columns, treat it as non-field filter + return true; } - for column in &columns { - if field_columns.contains(&column.name) { - // This expr uses the field column. - return false; - } - } - true - }); - } - - /// Collects and returns filters that don't reference field columns. - /// Returns a new vector containing only non-field filters. - /// If the region is append-only, returns None. - fn filters_without_field_filters(&self) -> Option> { - if self.version.options.append_mode { - return None; - } - - let field_columns = self - .version - .metadata - .field_columns() - .map(|col| &col.column_schema.name) - .collect::>(); - - let mut columns = HashSet::new(); - - let filters = self - .request - .filters - .iter() - .filter(|expr| { - columns.clear(); - // `expr_to_columns` won't return error. - if expr_to_columns(expr, &mut columns).is_err() { - return false; - } - // Keep the filter if it doesn't reference any field columns - for column in &columns { - if field_columns.contains(&column.name) { - return false; - } - } - true - }) - .cloned() - .collect(); - Some(filters) + // Return true for non-field filters (partition puts true cases in first vec) + !columns + .iter() + .any(|column| field_columns.contains(&column.name)) + }) } /// Use the latest schema to build the inverted index applier. @@ -725,9 +689,9 @@ pub struct ScanInput { /// Maximum number of SST files to scan concurrently. pub(crate) max_concurrent_scan_files: usize, /// Index appliers. - inverted_index_applier: Option, - bloom_filter_index_applier: Option, - fulltext_index_applier: Option, + inverted_index_appliers: [Option; 2], + bloom_filter_index_appliers: [Option; 2], + fulltext_index_appliers: [Option; 2], /// Start time of the query. pub(crate) query_start: Option, /// The region is using append mode. @@ -764,9 +728,9 @@ impl ScanInput { ignore_file_not_found: false, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES, - inverted_index_applier: None, - bloom_filter_index_applier: None, - fulltext_index_applier: None, + inverted_index_appliers: [None, None], + bloom_filter_index_appliers: [None, None], + fulltext_index_appliers: [None, None], query_start: None, append_mode: false, filter_deleted: true, @@ -843,33 +807,33 @@ impl ScanInput { self } - /// Sets invereted index applier. + /// Sets inverted index appliers. #[must_use] - pub(crate) fn with_inverted_index_applier( + pub(crate) fn with_inverted_index_appliers( mut self, - applier: Option, + appliers: [Option; 2], ) -> Self { - self.inverted_index_applier = applier; + self.inverted_index_appliers = appliers; self } - /// Sets bloom filter applier. + /// Sets bloom filter appliers. #[must_use] - pub(crate) fn with_bloom_filter_index_applier( + pub(crate) fn with_bloom_filter_index_appliers( mut self, - applier: Option, + appliers: [Option; 2], ) -> Self { - self.bloom_filter_index_applier = applier; + self.bloom_filter_index_appliers = appliers; self } - /// Sets fulltext index applier. + /// Sets fulltext index appliers. #[must_use] - pub(crate) fn with_fulltext_index_applier( + pub(crate) fn with_fulltext_index_appliers( mut self, - applier: Option, + appliers: [Option; 2], ) -> Self { - self.fulltext_index_applier = applier; + self.fulltext_index_appliers = appliers; self } @@ -998,9 +962,9 @@ impl ScanInput { .predicate(predicate) .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_strategy.clone()) - .inverted_index_applier(self.inverted_index_applier.clone()) - .bloom_filter_index_applier(self.bloom_filter_index_applier.clone()) - .fulltext_index_applier(self.fulltext_index_applier.clone()) + .inverted_index_appliers(self.inverted_index_appliers.clone()) + .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone()) + .fulltext_index_appliers(self.fulltext_index_appliers.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) .flat_format(self.flat_format) .compaction(self.compaction) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 5a11a15e70..a421f947a0 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -850,8 +850,8 @@ mod tests { object_store.clone(), ) .predicate(Some(Predicate::new(preds))) - .inverted_index_applier(inverted_index_applier.clone()) - .bloom_filter_index_applier(bloom_filter_applier.clone()) + .inverted_index_appliers([inverted_index_applier.clone(), None]) + .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) .cache(CacheStrategy::EnableAll(cache.clone())); let mut metrics = ReaderMetrics::default(); @@ -906,8 +906,8 @@ mod tests { object_store.clone(), ) .predicate(Some(Predicate::new(preds))) - .inverted_index_applier(inverted_index_applier.clone()) - .bloom_filter_index_applier(bloom_filter_applier.clone()) + .inverted_index_appliers([inverted_index_applier.clone(), None]) + .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) .cache(CacheStrategy::EnableAll(cache.clone())); let mut metrics = ReaderMetrics::default(); @@ -963,8 +963,8 @@ mod tests { object_store.clone(), ) .predicate(Some(Predicate::new(preds))) - .inverted_index_applier(inverted_index_applier.clone()) - .bloom_filter_index_applier(bloom_filter_applier.clone()) + .inverted_index_appliers([inverted_index_applier.clone(), None]) + .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) .cache(CacheStrategy::EnableAll(cache.clone())); let mut metrics = ReaderMetrics::default(); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 21fc6511f8..ce0fe3a63f 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -106,9 +106,9 @@ pub struct ParquetReaderBuilder { /// Strategy to cache SST data. cache_strategy: CacheStrategy, /// Index appliers. - inverted_index_applier: Option, - bloom_filter_index_applier: Option, - fulltext_index_applier: Option, + inverted_index_appliers: [Option; 2], + bloom_filter_index_appliers: [Option; 2], + fulltext_index_appliers: [Option; 2], /// Expected metadata of the region while reading the SST. /// This is usually the latest metadata of the region. The reader use /// it get the correct column id of a column by name. @@ -135,9 +135,9 @@ impl ParquetReaderBuilder { predicate: None, projection: None, cache_strategy: CacheStrategy::Disabled, - inverted_index_applier: None, - bloom_filter_index_applier: None, - fulltext_index_applier: None, + inverted_index_appliers: [None, None], + bloom_filter_index_appliers: [None, None], + fulltext_index_appliers: [None, None], expected_metadata: None, flat_format: false, compaction: false, @@ -167,33 +167,33 @@ impl ParquetReaderBuilder { self } - /// Attaches the inverted index applier to the builder. + /// Attaches the inverted index appliers to the builder. #[must_use] - pub(crate) fn inverted_index_applier( + pub(crate) fn inverted_index_appliers( mut self, - index_applier: Option, + index_appliers: [Option; 2], ) -> Self { - self.inverted_index_applier = index_applier; + self.inverted_index_appliers = index_appliers; self } - /// Attaches the bloom filter index applier to the builder. + /// Attaches the bloom filter index appliers to the builder. #[must_use] - pub(crate) fn bloom_filter_index_applier( + pub(crate) fn bloom_filter_index_appliers( mut self, - index_applier: Option, + index_appliers: [Option; 2], ) -> Self { - self.bloom_filter_index_applier = index_applier; + self.bloom_filter_index_appliers = index_appliers; self } - /// Attaches the fulltext index applier to the builder. + /// Attaches the fulltext index appliers to the builder. #[must_use] - pub(crate) fn fulltext_index_applier( + pub(crate) fn fulltext_index_appliers( mut self, - index_applier: Option, + index_appliers: [Option; 2], ) -> Self { - self.fulltext_index_applier = index_applier; + self.fulltext_index_appliers = index_appliers; self } @@ -461,49 +461,51 @@ impl ParquetReaderBuilder { output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { - let Some(index_applier) = &self.fulltext_index_applier else { - return false; - }; if !self.file_handle.meta_ref().fulltext_index_available() { return false; } - let predicate_key = index_applier.predicate_key(); - // Fast path: return early if the result is in the cache. - let cached = self - .cache_strategy - .index_result_cache() - .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); - if let Some(result) = cached.as_ref() - && all_required_row_groups_searched(output, result) - { - apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT); - return true; - } - - // Slow path: apply the index from the file. - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let apply_res = index_applier - .apply_fine(self.file_handle.file_id(), Some(file_size_hint)) - .await; - let selection = match apply_res { - Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups), - Ok(None) => return false, - Err(err) => { - handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT); - return false; + let mut pruned = false; + for index_applier in self.fulltext_index_appliers.iter().flatten() { + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); + if let Some(result) = cached.as_ref() + && all_required_row_groups_searched(output, result) + { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT); + pruned = true; + continue; } - }; - self.apply_index_result_and_update_cache( - predicate_key, - self.file_handle.file_id().file_id(), - selection, - output, - metrics, - INDEX_TYPE_FULLTEXT, - ); - true + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let apply_res = index_applier + .apply_fine(self.file_handle.file_id(), Some(file_size_hint)) + .await; + let selection = match apply_res { + Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups), + Ok(None) => continue, + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT); + continue; + } + }; + + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id().file_id(), + selection, + output, + metrics, + INDEX_TYPE_FULLTEXT, + ); + pruned = true; + } + pruned } /// Applies index to prune row groups. @@ -518,52 +520,54 @@ impl ParquetReaderBuilder { output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { - let Some(index_applier) = &self.inverted_index_applier else { - return false; - }; if !self.file_handle.meta_ref().inverted_index_available() { return false; } - let predicate_key = index_applier.predicate_key(); - // Fast path: return early if the result is in the cache. - let cached = self - .cache_strategy - .index_result_cache() - .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); - if let Some(result) = cached.as_ref() - && all_required_row_groups_searched(output, result) - { - apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED); - return true; - } - - // Slow path: apply the index from the file. - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint)) - .await; - let selection = match apply_res { - Ok(output) => RowGroupSelection::from_inverted_index_apply_output( - row_group_size, - num_row_groups, - output, - ), - Err(err) => { - handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED); - return false; + let mut pruned = false; + for index_applier in self.inverted_index_appliers.iter().flatten() { + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); + if let Some(result) = cached.as_ref() + && all_required_row_groups_searched(output, result) + { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED); + pruned = true; + continue; } - }; - self.apply_index_result_and_update_cache( - predicate_key, - self.file_handle.file_id().file_id(), - selection, - output, - metrics, - INDEX_TYPE_INVERTED, - ); - true + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let apply_res = index_applier + .apply(self.file_handle.file_id(), Some(file_size_hint)) + .await; + let selection = match apply_res { + Ok(output) => RowGroupSelection::from_inverted_index_apply_output( + row_group_size, + num_row_groups, + output, + ), + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED); + continue; + } + }; + + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id().file_id(), + selection, + output, + metrics, + INDEX_TYPE_INVERTED, + ); + pruned = true; + } + pruned } async fn prune_row_groups_by_bloom_filter( @@ -573,64 +577,66 @@ impl ParquetReaderBuilder { output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { - let Some(index_applier) = &self.bloom_filter_index_applier else { - return false; - }; if !self.file_handle.meta_ref().bloom_filter_index_available() { return false; } - let predicate_key = index_applier.predicate_key(); - // Fast path: return early if the result is in the cache. - let cached = self - .cache_strategy - .index_result_cache() - .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); - if let Some(result) = cached.as_ref() - && all_required_row_groups_searched(output, result) - { - apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM); - return true; - } - - // Slow path: apply the index from the file. - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| { - ( - rg.num_rows() as usize, - // Optimize: only search the row group that required by `output` and not stored in `cached`. - output.contains_non_empty_row_group(i) - && cached - .as_ref() - .map(|c| !c.contains_row_group(i)) - .unwrap_or(true), - ) - }); - let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint), rgs) - .await; - let mut selection = match apply_res { - Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size), - Err(err) => { - handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM); - return false; + let mut pruned = false; + for index_applier in self.bloom_filter_index_appliers.iter().flatten() { + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); + if let Some(result) = cached.as_ref() + && all_required_row_groups_searched(output, result) + { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM); + pruned = true; + continue; } - }; - // New searched row groups are added to `selection`, concat them with `cached`. - if let Some(cached) = cached.as_ref() { - selection.concat(cached); + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| { + ( + rg.num_rows() as usize, + // Optimize: only search the row group that required by `output` and not stored in `cached`. + output.contains_non_empty_row_group(i) + && cached + .as_ref() + .map(|c| !c.contains_row_group(i)) + .unwrap_or(true), + ) + }); + let apply_res = index_applier + .apply(self.file_handle.file_id(), Some(file_size_hint), rgs) + .await; + let mut selection = match apply_res { + Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size), + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM); + continue; + } + }; + + // New searched row groups are added to `selection`, concat them with `cached`. + if let Some(cached) = cached.as_ref() { + selection.concat(cached); + } + + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id().file_id(), + selection, + output, + metrics, + INDEX_TYPE_BLOOM, + ); + pruned = true; } - - self.apply_index_result_and_update_cache( - predicate_key, - self.file_handle.file_id().file_id(), - selection, - output, - metrics, - INDEX_TYPE_BLOOM, - ); - true + pruned } async fn prune_row_groups_by_fulltext_bloom( @@ -640,67 +646,69 @@ impl ParquetReaderBuilder { output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { - let Some(index_applier) = &self.fulltext_index_applier else { - return false; - }; if !self.file_handle.meta_ref().fulltext_index_available() { return false; } - let predicate_key = index_applier.predicate_key(); - // Fast path: return early if the result is in the cache. - let cached = self - .cache_strategy - .index_result_cache() - .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); - if let Some(result) = cached.as_ref() - && all_required_row_groups_searched(output, result) - { - apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT); - return true; - } - - // Slow path: apply the index from the file. - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| { - ( - rg.num_rows() as usize, - // Optimize: only search the row group that required by `output` and not stored in `cached`. - output.contains_non_empty_row_group(i) - && cached - .as_ref() - .map(|c| !c.contains_row_group(i)) - .unwrap_or(true), - ) - }); - let apply_res = index_applier - .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs) - .await; - let mut selection = match apply_res { - Ok(Some(apply_output)) => { - RowGroupSelection::from_row_ranges(apply_output, row_group_size) + let mut pruned = false; + for index_applier in self.fulltext_index_appliers.iter().flatten() { + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); + if let Some(result) = cached.as_ref() + && all_required_row_groups_searched(output, result) + { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT); + pruned = true; + continue; } - Ok(None) => return false, - Err(err) => { - handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT); - return false; + + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| { + ( + rg.num_rows() as usize, + // Optimize: only search the row group that required by `output` and not stored in `cached`. + output.contains_non_empty_row_group(i) + && cached + .as_ref() + .map(|c| !c.contains_row_group(i)) + .unwrap_or(true), + ) + }); + let apply_res = index_applier + .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs) + .await; + let mut selection = match apply_res { + Ok(Some(apply_output)) => { + RowGroupSelection::from_row_ranges(apply_output, row_group_size) + } + Ok(None) => continue, + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT); + continue; + } + }; + + // New searched row groups are added to `selection`, concat them with `cached`. + if let Some(cached) = cached.as_ref() { + selection.concat(cached); } - }; - // New searched row groups are added to `selection`, concat them with `cached`. - if let Some(cached) = cached.as_ref() { - selection.concat(cached); + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id().file_id(), + selection, + output, + metrics, + INDEX_TYPE_FULLTEXT, + ); + pruned = true; } - - self.apply_index_result_and_update_cache( - predicate_key, - self.file_handle.file_id().file_id(), - selection, - output, - metrics, - INDEX_TYPE_FULLTEXT, - ); - true + pruned } /// Prunes row groups by min-max index.