From 203751f2fe83488c3578fa84188fc10e55ef5006 Mon Sep 17 00:00:00 2001 From: PSeitz-dd Date: Tue, 16 Sep 2025 18:21:03 +0200 Subject: [PATCH] Optimize ExistsQuery for a high number of dynamic columns (#2694) * Optimize ExistsQuery for a high number of dynamic columns The previous algorithm checked _each_ doc in _each_ column for existence. This causes huge cost on JSON fields with e.g. 100k columns. Compute a bitset instead if we have more than one column. add `iter_docs` to the multivalued_index * add benchmark subfields=1 exists_json_union Memory: 89.3 KB (+2.01%) Avg: 0.4865ms (-26.03%) Median: 0.4865ms (-26.03%) [0.4865ms .. 0.4865ms] subfields=2 exists_json_union Memory: 68.1 KB Avg: 1.7048ms (-0.46%) Median: 1.7048ms (-0.46%) [1.7048ms .. 1.7048ms] subfields=3 exists_json_union Memory: 61.8 KB Avg: 2.0742ms (-2.22%) Median: 2.0742ms (-2.22%) [2.0742ms .. 2.0742ms] subfields=4 exists_json_union Memory: 119.8 KB (+103.44%) Avg: 3.9500ms (+42.62%) Median: 3.9500ms (+42.62%) [3.9500ms .. 3.9500ms] subfields=5 exists_json_union Memory: 120.4 KB (+107.65%) Avg: 3.9610ms (+20.65%) Median: 3.9610ms (+20.65%) [3.9610ms .. 3.9610ms] subfields=6 exists_json_union Memory: 120.6 KB (+107.49%) Avg: 3.8903ms (+3.11%) Median: 3.8903ms (+3.11%) [3.8903ms .. 3.8903ms] subfields=7 exists_json_union Memory: 120.9 KB (+106.93%) Avg: 3.6220ms (-16.22%) Median: 3.6220ms (-16.22%) [3.6220ms .. 3.6220ms] subfields=8 exists_json_union Memory: 121.3 KB (+106.23%) Avg: 4.0981ms (-15.97%) Median: 4.0981ms (-15.97%) [4.0981ms .. 4.0981ms] subfields=16 exists_json_union Memory: 123.1 KB (+103.09%) Avg: 4.3483ms (-92.26%) Median: 4.3483ms (-92.26%) [4.3483ms .. 4.3483ms] subfields=256 exists_json_union Memory: 204.6 KB (+19.85%) Avg: 3.8874ms (-99.01%) Median: 3.8874ms (-99.01%) [3.8874ms .. 3.8874ms] subfields=4096 exists_json_union Memory: 2.0 MB Avg: 3.5571ms (-99.90%) Median: 3.5571ms (-99.90%) [3.5571ms .. 3.5571ms] subfields=65536 exists_json_union Memory: 28.3 MB Avg: 14.4417ms (-99.97%) Median: 14.4417ms (-99.97%) [14.4417ms .. 14.4417ms] subfields=262144 exists_json_union Memory: 113.3 MB Avg: 66.2860ms (-99.95%) Median: 66.2860ms (-99.95%) [66.2860ms .. 66.2860ms] * rename methods --- Cargo.toml | 4 + benches/exists_json.rs | 69 ++++++++++++++ columnar/src/column_index/merge/stacked.rs | 6 +- .../src/column_index/multivalued_index.rs | 26 ++++++ .../src/column_index/optional_index/mod.rs | 7 +- .../src/column_index/optional_index/tests.rs | 6 +- columnar/src/columnar/merge/mod.rs | 2 +- src/query/exist_query.rs | 90 +++++++++++++++++-- 8 files changed, 195 insertions(+), 15 deletions(-) create mode 100644 benches/exists_json.rs diff --git a/Cargo.toml b/Cargo.toml index 3fd18640d..3403df13c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -167,3 +167,7 @@ harness = false [[bench]] name = "agg_bench" harness = false + +[[bench]] +name = "exists_json" +harness = false diff --git a/benches/exists_json.rs b/benches/exists_json.rs new file mode 100644 index 000000000..a0165887e --- /dev/null +++ b/benches/exists_json.rs @@ -0,0 +1,69 @@ +use binggan::plugins::PeakMemAllocPlugin; +use binggan::{black_box, InputGroup, PeakMemAlloc, INSTRUMENTED_SYSTEM}; +use serde_json::json; +use tantivy::collector::Count; +use tantivy::query::ExistsQuery; +use tantivy::schema::{Schema, FAST, TEXT}; +use tantivy::{doc, Index}; + +#[global_allocator] +pub static GLOBAL: &PeakMemAlloc = &INSTRUMENTED_SYSTEM; + +fn main() { + let doc_count: usize = 500_000; + let subfield_counts: &[usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 16, 256, 4096, 65536, 262144]; + + let indices: Vec<(String, Index)> = subfield_counts + .iter() + .map(|&sub_fields| { + ( + format!("subfields={sub_fields}"), + build_index_with_json_subfields(doc_count, sub_fields), + ) + }) + .collect(); + + let mut group = InputGroup::new_with_inputs(indices); + group.add_plugin(PeakMemAllocPlugin::new(GLOBAL)); + + group.config().num_iter_group = Some(1); + group.config().num_iter_bench = Some(1); + group.register("exists_json", exists_json_union); + + group.run(); +} + +fn exists_json_union(index: &Index) { + let reader = index.reader().expect("reader"); + let searcher = reader.searcher(); + let query = ExistsQuery::new("json".to_string(), true); + let count = searcher.search(&query, &Count).expect("exists search"); + // Prevents optimizer from eliding the search + black_box(count); +} + +fn build_index_with_json_subfields(num_docs: usize, num_subfields: usize) -> Index { + // Schema: single JSON field stored as FAST to support ExistsQuery. + let mut schema_builder = Schema::builder(); + let json_field = schema_builder.add_json_field("json", TEXT | FAST); + let schema = schema_builder.build(); + + let index = Index::create_from_tempdir(schema).expect("create index"); + { + let mut index_writer = index + .writer_with_num_threads(1, 200_000_000) + .expect("writer"); + for i in 0..num_docs { + let sub = i % num_subfields; + // Only one subpath set per document; rotate subpaths so that + // no single subpath is full, but the union covers all docs. + let v = json!({ format!("field_{sub}"): i as u64 }); + index_writer + .add_document(doc!(json_field => v)) + .expect("add_document"); + } + index_writer.commit().expect("commit"); + } + + index +} diff --git a/columnar/src/column_index/merge/stacked.rs b/columnar/src/column_index/merge/stacked.rs index 8a2742d63..7305c496c 100644 --- a/columnar/src/column_index/merge/stacked.rs +++ b/columnar/src/column_index/merge/stacked.rs @@ -56,7 +56,7 @@ fn get_doc_ids_with_values<'a>( ColumnIndex::Full => Box::new(doc_range), ColumnIndex::Optional(optional_index) => Box::new( optional_index - .iter_docs() + .iter_non_null_docs() .map(move |row| row + doc_range.start), ), ColumnIndex::Multivalued(multivalued_index) => match multivalued_index { @@ -73,7 +73,7 @@ fn get_doc_ids_with_values<'a>( MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new( multivalued_index .optional_index - .iter_docs() + .iter_non_null_docs() .map(move |row| row + doc_range.start), ), }, @@ -177,7 +177,7 @@ impl<'a> Iterable for StackedOptionalIndex<'a> { ColumnIndex::Full => Box::new(columnar_row_range), ColumnIndex::Optional(optional_index) => Box::new( optional_index - .iter_docs() + .iter_non_null_docs() .map(move |row_id: RowId| columnar_row_range.start + row_id), ), ColumnIndex::Multivalued(_) => { diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index 953aec245..883475a1b 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -215,6 +215,32 @@ impl MultiValueIndex { } } + /// Returns an iterator over document ids that have at least one value. + pub fn iter_non_null_docs(&self) -> Box + '_> { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => { + let mut doc: DocId = 0u32; + let num_docs = idx.num_docs(); + Box::new(std::iter::from_fn(move || { + // This is not the most efficient way to do this, but it's legacy code. + while doc < num_docs { + let cur = doc; + doc += 1; + let start = idx.start_index_column.get_val(cur); + let end = idx.start_index_column.get_val(cur + 1); + if end > start { + return Some(cur); + } + } + None + })) + } + MultiValueIndex::MultiValueIndexV2(idx) => { + Box::new(idx.optional_index.iter_non_null_docs()) + } + } + } + /// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of /// docids. Positions are converted inplace to docids. /// diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index b0e3a6793..0bcf9bad4 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -88,7 +88,7 @@ pub struct OptionalIndex { impl Iterable for &OptionalIndex { fn boxed_iter(&self) -> Box + '_> { - Box::new(self.iter_docs()) + Box::new(self.iter_non_null_docs()) } } @@ -280,8 +280,9 @@ impl OptionalIndex { self.num_non_null_docs } - pub fn iter_docs(&self) -> impl Iterator + '_ { - // TODO optimize + pub fn iter_non_null_docs(&self) -> impl Iterator + '_ { + // TODO optimize. We could iterate over the blocks directly. + // We use the dense value ids and retrieve the doc ids via select. let mut select_batch = self.select_cursor(); (0..self.num_non_null_docs).map(move |rank| select_batch.select(rank)) } diff --git a/columnar/src/column_index/optional_index/tests.rs b/columnar/src/column_index/optional_index/tests.rs index 160cb454e..5f54f3362 100644 --- a/columnar/src/column_index/optional_index/tests.rs +++ b/columnar/src/column_index/optional_index/tests.rs @@ -164,7 +164,11 @@ fn test_optional_index_large() { fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) { let optional_index = OptionalIndex::for_test(num_rows, row_ids); assert_eq!(optional_index.num_docs(), num_rows); - assert!(optional_index.iter_docs().eq(row_ids.iter().copied())); + assert!( + optional_index + .iter_non_null_docs() + .eq(row_ids.iter().copied()) + ); } #[test] diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 54449f8a4..70ace6b78 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -367,7 +367,7 @@ fn is_empty_after_merge( ColumnIndex::Empty { .. } => true, ColumnIndex::Full => alive_bitset.len() == 0, ColumnIndex::Optional(optional_index) => { - for doc in optional_index.iter_docs() { + for doc in optional_index.iter_non_null_docs() { if alive_bitset.contains(doc) { return false; } diff --git a/src/query/exist_query.rs b/src/query/exist_query.rs index 21d85ceb4..f97e9e5c7 100644 --- a/src/query/exist_query.rs +++ b/src/query/exist_query.rs @@ -1,12 +1,15 @@ use core::fmt::Debug; use columnar::{ColumnIndex, DynamicColumn}; +use common::BitSet; use super::{ConstScorer, EmptyScorer}; use crate::docset::{DocSet, TERMINATED}; use crate::index::SegmentReader; +use crate::query::all_query::AllScorer; +use crate::query::boost_query::BoostScorer; use crate::query::explanation::does_not_match; -use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight}; +use crate::query::{BitSetDocSet, EnableScoring, Explanation, Query, Scorer, Weight}; use crate::schema::Type; use crate::{DocId, Score, TantivyError}; @@ -113,13 +116,49 @@ impl Weight for ExistsWeight { non_empty_columns.push(column) } } - // TODO: we can optimizer more here since in most cases we will have only one index - if !non_empty_columns.is_empty() { - let docset = ExistsDocSet::new(non_empty_columns, reader.max_doc()); - Ok(Box::new(ConstScorer::new(docset, boost))) - } else { - Ok(Box::new(EmptyScorer)) + if non_empty_columns.is_empty() { + return Ok(Box::new(EmptyScorer)); } + + // If any column is full, all docs match. + let max_doc = reader.max_doc(); + if non_empty_columns + .iter() + .any(|col| matches!(col.column_index(), ColumnIndex::Full)) + { + let all_scorer = AllScorer::new(max_doc); + return Ok(Box::new(BoostScorer::new(all_scorer, boost))); + } + + // If we have a single dynamic column, use ExistsDocSet + // NOTE: A lower number may be better for very sparse columns + if non_empty_columns.len() < 4 { + let docset = ExistsDocSet::new(non_empty_columns, reader.max_doc()); + return Ok(Box::new(ConstScorer::new(docset, boost))); + } + + // If we have many dynamic columns, precompute a bitset of matching docs + let mut doc_bitset = BitSet::with_max_value(max_doc); + for column in &non_empty_columns { + match column.column_index() { + ColumnIndex::Empty { .. } => {} + ColumnIndex::Full => { + // Handled by AllScorer return above. + } + ColumnIndex::Optional(optional_index) => { + for doc in optional_index.iter_non_null_docs() { + doc_bitset.insert(doc); + } + } + ColumnIndex::Multivalued(multi_idx) => { + for doc in multi_idx.iter_non_null_docs() { + doc_bitset.insert(doc); + } + } + } + } + let docset = BitSetDocSet::from(doc_bitset); + Ok(Box::new(ConstScorer::new(docset, boost))) } fn explain(&self, reader: &SegmentReader, doc: DocId) -> crate::Result { @@ -294,6 +333,43 @@ mod tests { Ok(()) } + #[test] + fn test_exists_query_json_union_no_single_full_subpath() -> crate::Result<()> { + // Build docs where no single subpath exists for all docs, but the union does. + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", TEXT | FAST); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + { + let mut index_writer = index.writer_for_tests()?; + for i in 0u64..100u64 { + if i % 2 == 0 { + // only subpath `a` + index_writer.add_document(doc!(json => json!({"a": i})))?; + } else { + // only subpath `b` + index_writer.add_document(doc!(json => json!({"b": i})))?; + } + } + index_writer.commit()?; + } + let reader = index.reader()?; + let searcher = reader.searcher(); + + // No single subpath is full + assert_eq!(count_existing_fields(&searcher, "json.a", false)?, 50); + assert_eq!(count_existing_fields(&searcher, "json.b", false)?, 50); + + // Root exists with subpaths disabled is zero + assert_eq!(count_existing_fields(&searcher, "json", false)?, 0); + + // Root exists with subpaths enabled should match all docs via union + assert_eq!(count_existing_fields(&searcher, "json", true)?, 100); + + Ok(()) + } + #[test] fn test_exists_query_misc_supported_types() -> crate::Result<()> { let mut schema_builder = Schema::builder();