From b60d8621506a59bbf58f5ba3b50b8760c80e7dc8 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 13 Nov 2023 05:14:27 +0100 Subject: [PATCH 01/12] docid deltas while indexing (#2249) * docid deltas while indexing storing deltas is especially helpful for repetitive data like logs. In those cases, recording a doc on a term costed 4 bytes instead of 1 byte now. HDFS Indexing 1.1GB Total memory consumption: Before: 760 MB Now: 590 MB * use scan for delta decoding --- src/postings/recorder.rs | 77 ++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index bc65010d9..9620f155b 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -82,21 +82,12 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static { } /// Only records the doc ids -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Default)] pub struct DocIdRecorder { stack: ExpUnrolledLinkedList, current_doc: DocId, } -impl Default for DocIdRecorder { - fn default() -> Self { - DocIdRecorder { - stack: ExpUnrolledLinkedList::default(), - current_doc: u32::MAX, - } - } -} - impl Recorder for DocIdRecorder { #[inline] fn current_doc(&self) -> DocId { @@ -105,8 +96,9 @@ impl Recorder for DocIdRecorder { #[inline] fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { + let delta = doc - self.current_doc; self.current_doc = doc; - self.stack.writer(arena).write_u32_vint(doc); + self.stack.writer(arena).write_u32_vint(delta); } #[inline] @@ -123,21 +115,20 @@ impl Recorder for DocIdRecorder { buffer_lender: &mut BufferLender, ) { let (buffer, doc_ids) = buffer_lender.lend_all(); - self.stack.read_to_end(arena, buffer); // TODO avoid reading twice. + self.stack.read_to_end(arena, buffer); if let Some(doc_id_map) = doc_id_map { - doc_ids.extend( - VInt32Reader::new(&buffer[..]) - .map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)), - ); + let iter = get_sum_reader(VInt32Reader::new(&buffer[..])); + doc_ids.extend(iter.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id))); doc_ids.sort_unstable(); for doc in doc_ids { serializer.write_doc(*doc, 0u32, &[][..]); } } else { - for doc in VInt32Reader::new(&buffer[..]) { - serializer.write_doc(doc, 0u32, &[][..]); + let iter = get_sum_reader(VInt32Reader::new(&buffer[..])); + for doc_id in iter { + serializer.write_doc(doc_id, 0u32, &[][..]); } } } @@ -147,6 +138,15 @@ impl Recorder for DocIdRecorder { } } +/// Takes an Iterator of delta encoded elements and returns an iterator +/// that yields the sum of the elements. +fn get_sum_reader(iter: impl Iterator) -> impl Iterator { + iter.scan(0, |state, delta| { + *state += delta; + Some(*state) + }) +} + /// Recorder encoding document ids, and term frequencies #[derive(Clone, Copy, Default)] pub struct TermFrequencyRecorder { @@ -164,9 +164,10 @@ impl Recorder for TermFrequencyRecorder { #[inline] fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { + let delta = doc - self.current_doc; self.term_doc_freq += 1; self.current_doc = doc; - self.stack.writer(arena).write_u32_vint(doc); + self.stack.writer(arena).write_u32_vint(delta); } #[inline] @@ -193,9 +194,12 @@ impl Recorder for TermFrequencyRecorder { let mut u32_it = VInt32Reader::new(&buffer[..]); if let Some(doc_id_map) = doc_id_map { let mut doc_id_and_tf = vec![]; - while let Some(old_doc_id) = u32_it.next() { + let mut prev_doc = 0; + while let Some(delta_doc_id) = u32_it.next() { + let doc_id = prev_doc + delta_doc_id; + prev_doc = doc_id; let term_freq = u32_it.next().unwrap_or(self.current_tf); - doc_id_and_tf.push((doc_id_map.get_new_doc_id(old_doc_id), term_freq)); + doc_id_and_tf.push((doc_id_map.get_new_doc_id(doc_id), term_freq)); } doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id); @@ -203,9 +207,12 @@ impl Recorder for TermFrequencyRecorder { serializer.write_doc(doc_id, tf, &[][..]); } } else { - while let Some(doc) = u32_it.next() { + let mut prev_doc = 0; + while let Some(delta_doc_id) = u32_it.next() { + let doc_id = prev_doc + delta_doc_id; + prev_doc = doc_id; let term_freq = u32_it.next().unwrap_or(self.current_tf); - serializer.write_doc(doc, term_freq, &[][..]); + serializer.write_doc(doc_id, term_freq, &[][..]); } } } @@ -216,23 +223,13 @@ impl Recorder for TermFrequencyRecorder { } /// Recorder encoding term frequencies as well as positions. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Default)] pub struct TfAndPositionRecorder { stack: ExpUnrolledLinkedList, current_doc: DocId, term_doc_freq: u32, } -impl Default for TfAndPositionRecorder { - fn default() -> Self { - TfAndPositionRecorder { - stack: ExpUnrolledLinkedList::default(), - current_doc: u32::MAX, - term_doc_freq: 0u32, - } - } -} - impl Recorder for TfAndPositionRecorder { #[inline] fn current_doc(&self) -> DocId { @@ -241,9 +238,10 @@ impl Recorder for TfAndPositionRecorder { #[inline] fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { + let delta = doc - self.current_doc; self.current_doc = doc; self.term_doc_freq += 1u32; - self.stack.writer(arena).write_u32_vint(doc); + self.stack.writer(arena).write_u32_vint(delta); } #[inline] @@ -269,7 +267,10 @@ impl Recorder for TfAndPositionRecorder { self.stack.read_to_end(arena, buffer_u8); let mut u32_it = VInt32Reader::new(&buffer_u8[..]); let mut doc_id_and_positions = vec![]; - while let Some(doc) = u32_it.next() { + let mut prev_doc = 0; + while let Some(delta_doc_id) = u32_it.next() { + let doc_id = prev_doc + delta_doc_id; + prev_doc = doc_id; let mut prev_position_plus_one = 1u32; buffer_positions.clear(); loop { @@ -287,9 +288,9 @@ impl Recorder for TfAndPositionRecorder { if let Some(doc_id_map) = doc_id_map { // this simple variant to remap may consume to much memory doc_id_and_positions - .push((doc_id_map.get_new_doc_id(doc), buffer_positions.to_vec())); + .push((doc_id_map.get_new_doc_id(doc_id), buffer_positions.to_vec())); } else { - serializer.write_doc(doc, buffer_positions.len() as u32, buffer_positions); + serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions); } } if doc_id_map.is_some() { From 6b59ec6fd5d615a43869ccf4627d95ae10625d72 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 14 Nov 2023 15:48:31 +0900 Subject: [PATCH 02/12] Fix bug occuring when merging JSON object indexed with positions. In JSON Object field the presence of term frequencies depend on the field. Typically, a string with postiions indexed will have positions while numbers won't. The presence or absence of term freqs for a given term is unfortunately encoded in a very passive way. It is given by the presence of extra information in the skip info, or the lack of term freqs after decoding vint blocks. Before, after writing a segment, we would encode the segment correctly (without any term freq for number in json object field). However during merge, we would get the default term freq=1 value. (this is default in the absence of encoded term freqs) The merger would then proceed and attempt to decode 1 position when there are in fact none. This PR requires to explictly tell the posting serialize whether term frequencies should be serialized for each new term. Closes #2251 --- src/core/tests.rs | 138 ++++++++++++++++++++++++++- src/indexer/merger.rs | 45 ++++++++- src/postings/json_postings_writer.rs | 4 + src/postings/mod.rs | 2 +- src/postings/postings_writer.rs | 6 +- src/postings/recorder.rs | 9 ++ src/postings/segment_postings.rs | 12 ++- src/postings/serializer.rs | 14 ++- 8 files changed, 213 insertions(+), 17 deletions(-) diff --git a/src/core/tests.rs b/src/core/tests.rs index f2a26e038..e215c31f4 100644 --- a/src/core/tests.rs +++ b/src/core/tests.rs @@ -1,12 +1,13 @@ use crate::collector::Count; use crate::directory::{RamDirectory, WatchCallback}; -use crate::indexer::NoMergePolicy; +use crate::indexer::{LogMergePolicy, NoMergePolicy}; +use crate::json_utils::JsonTermWriter; use crate::query::TermQuery; -use crate::schema::{Field, IndexRecordOption, Schema, INDEXED, STRING, TEXT}; +use crate::schema::{Field, IndexRecordOption, Schema, Type, INDEXED, STRING, TEXT}; use crate::tokenizer::TokenizerManager; use crate::{ - Directory, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, ReloadPolicy, - SegmentId, TantivyDocument, Term, + Directory, DocSet, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, Postings, + ReloadPolicy, SegmentId, TantivyDocument, Term, }; #[test] @@ -344,3 +345,132 @@ fn test_merging_segment_update_docfreq() { let term_info = inv_index.get_term_info(&term).unwrap().unwrap(); assert_eq!(term_info.doc_freq, 12); } + +// motivated by https://github.com/quickwit-oss/quickwit/issues/4130 +#[test] +fn test_positions_merge_bug_non_text_json_vint() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + let mut merge_policy = LogMergePolicy::default(); + merge_policy.set_min_num_segments(2); + writer.set_merge_policy(Box::new(merge_policy)); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + writer.wait_merging_threads().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); +} + +// Same as above but with bitpacked blocks +#[test] +fn test_positions_merge_bug_non_text_json_bitpacked_block() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + let mut merge_policy = LogMergePolicy::default(); + merge_policy.set_min_num_segments(2); + writer.set_merge_policy(Box::new(merge_policy)); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + for _ in 0..128 { + writer.add_document(doc.clone()).unwrap(); + } + writer.commit().unwrap(); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + writer.wait_merging_threads().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); +} + +#[test] +fn test_non_text_json_term_freq() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); + let searcher = reader.searcher(); + let segment_reader = searcher.segment_reader(0u32); + let inv_idx = segment_reader.inverted_index(field).unwrap(); + let mut term = Term::with_type_and_field(Type::Json, field); + let mut json_term_writer = JsonTermWriter::wrap(&mut term, false); + json_term_writer.push_path_segment("tenant_id"); + json_term_writer.close_path_and_set_type(Type::U64); + json_term_writer.set_fast_value(75u64); + let postings = inv_idx + .read_postings( + &json_term_writer.term(), + IndexRecordOption::WithFreqsAndPositions, + ) + .unwrap() + .unwrap(); + assert_eq!(postings.doc(), 0); + assert_eq!(postings.term_freq(), 1u32); +} + +#[test] +fn test_non_text_json_term_freq_bitpacked() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + let num_docs = 132; + for _ in 0..num_docs { + writer.add_document(doc.clone()).unwrap(); + } + writer.commit().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); + let searcher = reader.searcher(); + let segment_reader = searcher.segment_reader(0u32); + let inv_idx = segment_reader.inverted_index(field).unwrap(); + let mut term = Term::with_type_and_field(Type::Json, field); + let mut json_term_writer = JsonTermWriter::wrap(&mut term, false); + json_term_writer.push_path_segment("tenant_id"); + json_term_writer.close_path_and_set_type(Type::U64); + json_term_writer.set_fast_value(75u64); + let mut postings = inv_idx + .read_postings( + &json_term_writer.term(), + IndexRecordOption::WithFreqsAndPositions, + ) + .unwrap() + .unwrap(); + assert_eq!(postings.doc(), 0); + assert_eq!(postings.term_freq(), 1u32); + for i in 1..num_docs { + assert_eq!(postings.advance(), i); + assert_eq!(postings.term_freq(), 1u32); + } +} diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 6c7837a49..87bc4c8c8 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -552,7 +552,41 @@ impl IndexMerger { continue; } - field_serializer.new_term(term_bytes, total_doc_freq)?; + // This should never happen as we early exited for total_doc_freq == 0. + assert!(!segment_postings_containing_the_term.is_empty()); + + let has_term_freq = { + let has_term_freq = !segment_postings_containing_the_term[0] + .1 + .block_cursor + .freqs() + .is_empty(); + for (_, postings) in &segment_postings_containing_the_term[1..] { + // This may look at a strange way to test whether we have term freq or not. + // With JSON object, the schema is not sufficient to know whether a term + // has its term frequency encoded or not: + // strings may have term frequencies, while number terms never have one. + // + // Ideally, we should have burnt one bit of two in the `TermInfo`. + // However, we preferred not changing the codec too much and detect this + // instead by + // - looking at the size of the skip data for bitpacked blocks + // - observing the absence of remaining data after reading the docs for vint + // blocks. + // + // Overall the reliable way to know if we have actual frequencies loaded or not + // is to check whether the actual decoded array is empty or not. + if has_term_freq != !postings.block_cursor.freqs().is_empty() { + return Err(DataCorruption::comment_only( + "Term freqs are inconsistent across segments", + ) + .into()); + } + } + has_term_freq + }; + + field_serializer.new_term(term_bytes, total_doc_freq, has_term_freq)?; // We can now serialize this postings, by pushing each document to the // postings serializer. @@ -567,8 +601,13 @@ impl IndexMerger { if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] { // we make sure to only write the term if // there is at least one document. - let term_freq = segment_postings.term_freq(); - segment_postings.positions(&mut positions_buffer); + let term_freq = if has_term_freq { + segment_postings.positions(&mut positions_buffer); + segment_postings.term_freq() + } else { + 0u32 + }; + // if doc_id_mapping exists, the doc_ids are reordered, they are // not just stacked. The field serializer expects monotonically increasing // doc_ids, so we collect and sort them first, before writing. diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index db337f458..9f0d8eb06 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -11,6 +11,10 @@ use crate::schema::{Field, Type, JSON_END_OF_PATH}; use crate::tokenizer::TokenStream; use crate::{DocId, Term}; +/// The `JsonPostingsWriter` is odd in that it relies on a hidden contract: +/// +/// `subscribe` is called directly to index non-text tokens, while +/// `index_text` is used to index text. #[derive(Default)] pub(crate) struct JsonPostingsWriter { str_posting_writer: SpecializedPostingsWriter, diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 53f51ad2a..32c4b7bd8 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -63,7 +63,7 @@ pub mod tests { let mut segment = index.new_segment(); let mut posting_serializer = InvertedIndexSerializer::open(&mut segment)?; let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4, None)?; - field_serializer.new_term("abc".as_bytes(), 12u32)?; + field_serializer.new_term("abc".as_bytes(), 12u32, true)?; for doc_id in 0u32..120u32 { let delta_positions = vec![1, 2, 3, 2]; field_serializer.write_doc(doc_id, 4, &delta_positions); diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index c51d4d834..943bc11b5 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -194,7 +194,11 @@ impl SpecializedPostingsWriter { ) -> io::Result<()> { let recorder: Rec = ctx.term_index.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); - serializer.new_term(term, term_doc_freq)?; + serializer.new_term( + term, + term_doc_freq, + recorder.has_term_freq(), + )?; recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender); serializer.close_term()?; Ok(()) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 9620f155b..767441f64 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -79,6 +79,11 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static { /// /// Returns `None` if not available. fn term_doc_freq(&self) -> Option; + + #[inline] + fn has_term_freq(&self) -> bool { + true + } } /// Only records the doc ids @@ -136,6 +141,10 @@ impl Recorder for DocIdRecorder { fn term_doc_freq(&self) -> Option { None } + + fn has_term_freq(&self) -> bool { + false + } } /// Takes an Iterator of delta encoded elements and returns an iterator diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index 2b9ee8c5c..3d91cf2ee 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -71,7 +71,7 @@ impl SegmentPostings { { let mut postings_serializer = PostingsSerializer::new(&mut buffer, 0.0, IndexRecordOption::Basic, None); - postings_serializer.new_term(docs.len() as u32); + postings_serializer.new_term(docs.len() as u32, false); for &doc in docs { postings_serializer.write_doc(doc, 1u32); } @@ -120,7 +120,7 @@ impl SegmentPostings { IndexRecordOption::WithFreqs, fieldnorm_reader, ); - postings_serializer.new_term(doc_and_tfs.len() as u32); + postings_serializer.new_term(doc_and_tfs.len() as u32, true); for &(doc, tf) in doc_and_tfs { postings_serializer.write_doc(doc, tf); } @@ -238,14 +238,18 @@ impl Postings for SegmentPostings { } fn positions_with_offset(&mut self, offset: u32, output: &mut Vec) { - let term_freq = self.term_freq() as usize; + let term_freq = self.term_freq(); if let Some(position_reader) = self.position_reader.as_mut() { + debug_assert!( + !self.block_cursor.freqs().is_empty(), + "No positions available" + ); let read_offset = self.block_cursor.position_offset() + (self.block_cursor.freqs()[..self.cur] .iter() .cloned() .sum::() as u64); - output.resize(term_freq, 0u32); + output.resize(term_freq as usize, 0u32); position_reader.read(read_offset, &mut output[..]); let mut cum = offset; for output_mut in output.iter_mut() { diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index bf0d4d2ef..f433e8ed1 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -168,7 +168,12 @@ impl<'a> FieldSerializer<'a> { /// * term - the term. It needs to come after the previous term according to the lexicographical /// order. /// * term_doc_freq - return the number of document containing the term. - pub fn new_term(&mut self, term: &[u8], term_doc_freq: u32) -> io::Result<()> { + pub fn new_term( + &mut self, + term: &[u8], + term_doc_freq: u32, + record_term_freq: bool, + ) -> io::Result<()> { assert!( !self.term_open, "Called new_term, while the previous term was not closed." @@ -177,7 +182,8 @@ impl<'a> FieldSerializer<'a> { self.postings_serializer.clear(); self.current_term_info = self.current_term_info(); self.term_dictionary_builder.insert_key(term)?; - self.postings_serializer.new_term(term_doc_freq); + self.postings_serializer + .new_term(term_doc_freq, record_term_freq); Ok(()) } @@ -330,10 +336,10 @@ impl PostingsSerializer { } } - pub fn new_term(&mut self, term_doc_freq: u32) { + pub fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) { self.bm25_weight = None; - self.term_has_freq = self.mode.has_freq() && term_doc_freq != 0; + self.term_has_freq = self.mode.has_freq() && record_term_freq; if !self.term_has_freq { return; } From 828632e8c4dbe054a396fe5ae9c0f23526bed5bc Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 14 Nov 2023 15:05:16 +0100 Subject: [PATCH 03/12] rustfmt --- src/postings/postings_writer.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 943bc11b5..264392889 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -194,11 +194,7 @@ impl SpecializedPostingsWriter { ) -> io::Result<()> { let recorder: Rec = ctx.term_index.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); - serializer.new_term( - term, - term_doc_freq, - recorder.has_term_freq(), - )?; + serializer.new_term(term, term_doc_freq, recorder.has_term_freq())?; recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender); serializer.close_term()?; Ok(()) From 7a2c5804b12dfc31bd997667cdbf9153255d2a4a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 15 Nov 2023 01:03:08 +0100 Subject: [PATCH 04/12] Update itertools requirement from 0.11.0 to 0.12.0 (#2255) Updates the requirements on [itertools](https://github.com/rust-itertools/itertools) to permit the latest version. - [Changelog](https://github.com/rust-itertools/itertools/blob/master/CHANGELOG.md) - [Commits](https://github.com/rust-itertools/itertools/compare/v0.11.0...v0.12.0) --- updated-dependencies: - dependency-name: itertools dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.toml | 2 +- columnar/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c1512334e..1dfb6320c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ smallvec = "1.8.0" rayon = "1.5.2" lru = "0.12.0" fastdivide = "0.4.0" -itertools = "0.11.0" +itertools = "0.12.0" measure_time = "0.8.2" async-trait = "0.1.53" arc-swap = "1.5.0" diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml index 43da09afd..6728283df 100644 --- a/columnar/Cargo.toml +++ b/columnar/Cargo.toml @@ -9,7 +9,7 @@ description = "column oriented storage for tantivy" categories = ["database-implementations", "data-structures", "compression"] [dependencies] -itertools = "0.11.0" +itertools = "0.12.0" fnv = "1.0.7" fastdivide = "0.4.0" From 6d9a7b7eb07be58a4db076b517e7979632a3b76a Mon Sep 17 00:00:00 2001 From: Chris Tam Date: Tue, 14 Nov 2023 19:03:44 -0500 Subject: [PATCH 05/12] Derive Debug for SchemaBuilder (#2254) --- src/schema/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/schema/schema.rs b/src/schema/schema.rs index ed3a60193..c6bf7b932 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -30,7 +30,7 @@ use crate::TantivyError; /// let body_field = schema_builder.add_text_field("body", TEXT); /// let schema = schema_builder.build(); /// ``` -#[derive(Default)] +#[derive(Debug, Default)] pub struct SchemaBuilder { fields: Vec, fields_map: HashMap, From 9caab45136405c75c3f03ed2ee70473063b5b367 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 15 Nov 2023 10:43:36 +0900 Subject: [PATCH 06/12] Preparing for 0.21.2 release. (#2256) --- CHANGELOG.md | 7 +++++++ Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8410c180..d63ccef7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +Tantivy 0.21.2 +================================ +#### Bugfixes +- Bugfix: Merge operations would panic for JsonObject with position enabled, when they contain numbers or booleans. [#2251](https://github.com/quickwit-oss/tantivy/issues/2251). +#### Features/Improvements + + Tantivy 0.21.1 ================================ #### Bugfixes diff --git a/Cargo.toml b/Cargo.toml index 1dfb6320c..bf5dc14f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tantivy" -version = "0.21.0" +version = "0.21.2" authors = ["Paul Masurel "] license = "MIT" categories = ["database-implementations", "data-structures"] From 0aae31d7d75eea3453feb6f202642690bd9fe29e Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 16 Nov 2023 13:47:36 +0100 Subject: [PATCH 07/12] reduce number of allocations (#2257) * reduce number of allocations Explanation makes up around 50% of all allocations (numbers not perf). It's created during serialization but not called. - Make Explanation optional in BM25 - Avoid allocations when using Explanation * use Cow --- src/postings/serializer.rs | 2 +- src/query/bm25.rs | 27 +++++++++++++++++--- src/query/boost_query.rs | 5 ++-- src/query/const_score_query.rs | 6 ++--- src/query/explanation.rs | 40 ++++++++++++++++++++---------- src/query/term_query/term_query.rs | 2 +- 6 files changed, 58 insertions(+), 24 deletions(-) diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index f433e8ed1..b9bf8f0d3 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -355,7 +355,7 @@ impl PostingsSerializer { return; } - self.bm25_weight = Some(Bm25Weight::for_one_term( + self.bm25_weight = Some(Bm25Weight::for_one_term_without_explain( term_doc_freq as u64, num_docs_in_segment, self.avg_fieldnorm, diff --git a/src/query/bm25.rs b/src/query/bm25.rs index e06cf28d6..532660574 100644 --- a/src/query/bm25.rs +++ b/src/query/bm25.rs @@ -77,7 +77,7 @@ pub struct Bm25Params { /// A struct used for computing BM25 scores. #[derive(Clone)] pub struct Bm25Weight { - idf_explain: Explanation, + idf_explain: Option, weight: Score, cache: [Score; 256], average_fieldnorm: Score, @@ -147,11 +147,30 @@ impl Bm25Weight { idf_explain.add_const("N, total number of docs", total_num_docs as Score); Bm25Weight::new(idf_explain, avg_fieldnorm) } + /// Construct a [Bm25Weight] for a single term. + /// This method does not carry the [Explanation] for the idf. + pub fn for_one_term_without_explain( + term_doc_freq: u64, + total_num_docs: u64, + avg_fieldnorm: Score, + ) -> Bm25Weight { + let idf = idf(term_doc_freq, total_num_docs); + Bm25Weight::new_without_explain(idf, avg_fieldnorm) + } pub(crate) fn new(idf_explain: Explanation, average_fieldnorm: Score) -> Bm25Weight { let weight = idf_explain.value() * (1.0 + K1); Bm25Weight { - idf_explain, + idf_explain: Some(idf_explain), + weight, + cache: compute_tf_cache(average_fieldnorm), + average_fieldnorm, + } + } + pub(crate) fn new_without_explain(idf: f32, average_fieldnorm: Score) -> Bm25Weight { + let weight = idf * (1.0 + K1); + Bm25Weight { + idf_explain: None, weight, cache: compute_tf_cache(average_fieldnorm), average_fieldnorm, @@ -202,7 +221,9 @@ impl Bm25Weight { let mut explanation = Explanation::new("TermQuery, product of...", score); explanation.add_detail(Explanation::new("(K1+1)", K1 + 1.0)); - explanation.add_detail(self.idf_explain.clone()); + if let Some(idf_explain) = &self.idf_explain { + explanation.add_detail(idf_explain.clone()); + } explanation.add_detail(tf_explanation); explanation } diff --git a/src/query/boost_query.rs b/src/query/boost_query.rs index a72b9d4d2..e7c25114f 100644 --- a/src/query/boost_query.rs +++ b/src/query/boost_query.rs @@ -74,7 +74,8 @@ impl Weight for BoostWeight { fn explain(&self, reader: &SegmentReader, doc: u32) -> crate::Result { let underlying_explanation = self.weight.explain(reader, doc)?; let score = underlying_explanation.value() * self.boost; - let mut explanation = Explanation::new(format!("Boost x{} of ...", self.boost), score); + let mut explanation = + Explanation::new_with_string(format!("Boost x{} of ...", self.boost), score); explanation.add_detail(underlying_explanation); Ok(explanation) } @@ -151,7 +152,7 @@ mod tests { let explanation = query.explain(&searcher, DocAddress::new(0, 0u32)).unwrap(); assert_eq!( explanation.to_pretty_json(), - "{\n \"value\": 0.2,\n \"description\": \"Boost x0.2 of ...\",\n \"details\": [\n {\n \"value\": 1.0,\n \"description\": \"AllQuery\",\n \"context\": []\n }\n ],\n \"context\": []\n}" + "{\n \"value\": 0.2,\n \"description\": \"Boost x0.2 of ...\",\n \"details\": [\n {\n \"value\": 1.0,\n \"description\": \"AllQuery\"\n }\n ]\n}" ); Ok(()) } diff --git a/src/query/const_score_query.rs b/src/query/const_score_query.rs index 4201048fd..80f81fdfc 100644 --- a/src/query/const_score_query.rs +++ b/src/query/const_score_query.rs @@ -164,11 +164,9 @@ mod tests { "details": [ { "value": 1.0, - "description": "AllQuery", - "context": [] + "description": "AllQuery" } - ], - "context": [] + ] }"# ); Ok(()) diff --git a/src/query/explanation.rs b/src/query/explanation.rs index df9818e0c..7d6559a27 100644 --- a/src/query/explanation.rs +++ b/src/query/explanation.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::fmt; use serde::Serialize; @@ -16,12 +17,12 @@ pub(crate) fn does_not_match(doc: DocId) -> TantivyError { #[derive(Clone, Serialize)] pub struct Explanation { value: Score, - description: String, - #[serde(skip_serializing_if = "Vec::is_empty")] - details: Vec, - context: Vec, + description: Cow<'static, str>, + #[serde(skip_serializing_if = "Option::is_none")] + details: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + context: Option>, } - impl fmt::Debug for Explanation { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Explanation({})", self.to_pretty_json()) @@ -30,12 +31,21 @@ impl fmt::Debug for Explanation { impl Explanation { /// Creates a new explanation object. - pub fn new(description: T, value: Score) -> Explanation { + pub fn new_with_string(description: String, value: Score) -> Explanation { Explanation { value, - description: description.to_string(), - details: vec![], - context: vec![], + description: Cow::Owned(description), + details: None, + context: None, + } + } + /// Creates a new explanation object. + pub fn new(description: &'static str, value: Score) -> Explanation { + Explanation { + value, + description: Cow::Borrowed(description), + details: None, + context: None, } } @@ -48,17 +58,21 @@ impl Explanation { /// /// Details are treated as child of the current node. pub fn add_detail(&mut self, child_explanation: Explanation) { - self.details.push(child_explanation); + self.details + .get_or_insert_with(Vec::new) + .push(child_explanation); } /// Adds some extra context to the explanation. pub fn add_context(&mut self, context: String) { - self.context.push(context); + self.context.get_or_insert_with(Vec::new).push(context); } /// Shortcut for `self.details.push(Explanation::new(name, value));` - pub fn add_const(&mut self, name: T, value: Score) { - self.details.push(Explanation::new(name, value)); + pub fn add_const(&mut self, name: &'static str, value: Score) { + self.details + .get_or_insert_with(Vec::new) + .push(Explanation::new(name, value)); } /// Returns an indented json representation of the explanation tree for debug usage. diff --git a/src/query/term_query/term_query.rs b/src/query/term_query/term_query.rs index b07405292..832d07895 100644 --- a/src/query/term_query/term_query.rs +++ b/src/query/term_query/term_query.rs @@ -101,7 +101,7 @@ impl TermQuery { .. } => Bm25Weight::for_terms(statistics_provider, &[self.term.clone()])?, EnableScoring::Disabled { .. } => { - Bm25Weight::new(Explanation::new("".to_string(), 1.0f32), 1.0f32) + Bm25Weight::new(Explanation::new("", 1.0f32), 1.0f32) } }; let scoring_enabled = enable_scoring.is_scoring_enabled(); From 47009ed2d3e18a20ec42e95abcd59e46a1110546 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 20 Nov 2023 02:59:59 +0100 Subject: [PATCH 08/12] remove unused deps (#2264) found with cargo machete remove pprof (doesn't work) --- Cargo.toml | 4 ---- benches/index-bench.rs | 5 ++--- columnar/Cargo.toml | 1 - columnar/columnar-cli/Cargo.toml | 1 - 4 files changed, 2 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf5dc14f3..d6d6b918f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,6 @@ rustc-hash = "1.1.0" thiserror = "1.0.30" htmlescape = "0.3.1" fail = { version = "0.5.0", optional = true } -murmurhash32 = "0.3.0" time = { version = "0.3.10", features = ["serde-well-known"] } smallvec = "1.8.0" rayon = "1.5.2" @@ -51,7 +50,6 @@ lru = "0.12.0" fastdivide = "0.4.0" itertools = "0.12.0" measure_time = "0.8.2" -async-trait = "0.1.53" arc-swap = "1.5.0" columnar = { version= "0.2", path="./columnar", package ="tantivy-columnar" } @@ -75,7 +73,6 @@ matches = "0.1.9" pretty_assertions = "1.2.1" proptest = "1.0.0" test-log = "0.2.10" -env_logger = "0.10.0" futures = "0.3.21" paste = "1.0.11" more-asserts = "0.3.1" @@ -83,7 +80,6 @@ rand_distr = "0.4.3" [target.'cfg(not(windows))'.dev-dependencies] criterion = { version = "0.5" } -pprof = { version= "0.13", features = ["flamegraph", "criterion"] } [dev-dependencies.fail] version = "0.5.0" diff --git a/benches/index-bench.rs b/benches/index-bench.rs index 007859597..00a181982 100644 --- a/benches/index-bench.rs +++ b/benches/index-bench.rs @@ -1,5 +1,4 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use pprof::criterion::{Output, PProfProfiler}; use tantivy::schema::{TantivyDocument, FAST, INDEXED, STORED, STRING, TEXT}; use tantivy::{tokenizer, Index, IndexWriter}; @@ -253,12 +252,12 @@ criterion_group! { } criterion_group! { name = gh_benches; - config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + config = Criterion::default(); targets = gh_index_benchmark } criterion_group! { name = wiki_benches; - config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + config = Criterion::default(); targets = wiki_index_benchmark } criterion_main!(benches, gh_benches, wiki_benches); diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml index 6728283df..c100f185a 100644 --- a/columnar/Cargo.toml +++ b/columnar/Cargo.toml @@ -10,7 +10,6 @@ categories = ["database-implementations", "data-structures", "compression"] [dependencies] itertools = "0.12.0" -fnv = "1.0.7" fastdivide = "0.4.0" stacker = { version= "0.2", path = "../stacker", package="tantivy-stacker"} diff --git a/columnar/columnar-cli/Cargo.toml b/columnar/columnar-cli/Cargo.toml index 0c1fd9b67..277734b93 100644 --- a/columnar/columnar-cli/Cargo.toml +++ b/columnar/columnar-cli/Cargo.toml @@ -8,7 +8,6 @@ license = "MIT" columnar = {path="../", package="tantivy-columnar"} serde_json = "1" serde_json_borrow = {git="https://github.com/PSeitz/serde_json_borrow/"} -serde = "1" [workspace] members = [] From 054f49dc318baefe07e9167206fcec94cfdab5b5 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 20 Nov 2023 03:00:57 +0100 Subject: [PATCH 09/12] support escaped dot, add agg test (#2250) add agg test for nested JSON allow escaping of dot --- src/aggregation/agg_tests.rs | 59 ++++++++++++++++++++++++++++++++++++ src/core/json_utils.rs | 2 +- src/fastfield/mod.rs | 7 +++++ src/schema/schema.rs | 8 +++++ 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/src/aggregation/agg_tests.rs b/src/aggregation/agg_tests.rs index df3c85b50..1fea7fe6f 100644 --- a/src/aggregation/agg_tests.rs +++ b/src/aggregation/agg_tests.rs @@ -624,6 +624,65 @@ fn test_aggregation_on_json_object() { ); } +#[test] +fn test_aggregation_on_nested_json_object() { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json.blub", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer: IndexWriter = index.writer_for_tests().unwrap(); + index_writer + .add_document(doc!(json => json!({"color.dot": "red", "color": {"nested":"red"} }))) + .unwrap(); + index_writer + .add_document(doc!(json => json!({"color.dot": "blue", "color": {"nested":"blue"} }))) + .unwrap(); + index_writer.commit().unwrap(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + + let agg: Aggregations = serde_json::from_value(json!({ + "jsonagg1": { + "terms": { + "field": "json\\.blub.color\\.dot", + } + }, + "jsonagg2": { + "terms": { + "field": "json\\.blub.color.nested", + } + } + + })) + .unwrap(); + + let aggregation_collector = get_collector(agg); + let aggregation_results = searcher.search(&AllQuery, &aggregation_collector).unwrap(); + let aggregation_res_json = serde_json::to_value(aggregation_results).unwrap(); + assert_eq!( + &aggregation_res_json, + &serde_json::json!({ + "jsonagg1": { + "buckets": [ + {"doc_count": 1, "key": "blue"}, + {"doc_count": 1, "key": "red"} + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0 + }, + "jsonagg2": { + "buckets": [ + {"doc_count": 1, "key": "blue"}, + {"doc_count": 1, "key": "red"} + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0 + } + + }) + ); +} + #[test] fn test_aggregation_on_json_object_empty_columns() { let mut schema_builder = Schema::builder(); diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index 6a5387cf0..a9060bc6a 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -320,7 +320,7 @@ pub struct JsonTermWriter<'a> { /// In other words, /// - `k8s.node` ends up as `["k8s", "node"]`. /// - `k8s\.node` ends up as `["k8s.node"]`. -fn split_json_path(json_path: &str) -> Vec { +pub fn split_json_path(json_path: &str) -> Vec { let mut escaped_state: bool = false; let mut json_path_segments = Vec::new(); let mut buffer = String::new(); diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 1c9b3d5af..65e4c03bf 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -1288,11 +1288,18 @@ mod tests { index_writer.commit().unwrap(); let searcher = index.reader().unwrap().searcher(); let fast_field_reader = searcher.segment_reader(0u32).fast_fields(); + // Supported for now, maybe dropped in the future. let column = fast_field_reader .column_opt::("jsonfield.attr.age") .unwrap() .unwrap(); let vals: Vec = column.values_for_doc(0u32).collect(); assert_eq!(&vals, &[33]); + let column = fast_field_reader + .column_opt::("jsonfield\\.attr.age") + .unwrap() + .unwrap(); + let vals: Vec = column.values_for_doc(0u32).collect(); + assert_eq!(&vals, &[33]); } } diff --git a/src/schema/schema.rs b/src/schema/schema.rs index c6bf7b932..e19c942ef 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use super::ip_options::IpAddrOptions; use super::*; +use crate::json_utils::split_json_path; use crate::schema::bytes_options::BytesOptions; use crate::TantivyError; @@ -328,12 +329,19 @@ impl Schema { if let Some(field) = self.0.fields_map.get(full_path) { return Some((*field, "")); } + let mut splitting_period_pos: Vec = locate_splitting_dots(full_path); while let Some(pos) = splitting_period_pos.pop() { let (prefix, suffix) = full_path.split_at(pos); + if let Some(field) = self.0.fields_map.get(prefix) { return Some((*field, &suffix[1..])); } + // JSON path may contain a dot, for now we try both variants to find the field. + let prefix = split_json_path(prefix).join("."); + if let Some(field) = self.0.fields_map.get(&prefix) { + return Some((*field, &suffix[1..])); + } } None } From daad2dc1519d10914503ba8a7c02044b9e0030ba Mon Sep 17 00:00:00 2001 From: BlackHoleFox Date: Mon, 20 Nov 2023 02:40:44 -0600 Subject: [PATCH 10/12] Take string references instead of owned values building Facet paths (#2265) --- src/schema/facet.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/schema/facet.rs b/src/schema/facet.rs index 845829b30..d609e8d70 100644 --- a/src/schema/facet.rs +++ b/src/schema/facet.rs @@ -131,16 +131,16 @@ impl Facet { pub fn from_path(path: Path) -> Facet where Path: IntoIterator, - Path::Item: ToString, + Path::Item: AsRef, { let mut facet_string: String = String::with_capacity(100); let mut step_it = path.into_iter(); if let Some(step) = step_it.next() { - facet_string.push_str(&step.to_string()); + facet_string.push_str(step.as_ref()); } for step in step_it { facet_string.push(FACET_SEP_CHAR); - facet_string.push_str(&step.to_string()); + facet_string.push_str(step.as_ref()); } Facet(facet_string) } From 07573a7f19657b118a9d5b571ae15fca8e590103 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 21 Nov 2023 16:06:57 +0100 Subject: [PATCH 11/12] update fst (#2267) update fst to 0.5 (deduplicates regex-syntax in the dep tree) deps cleanup --- Cargo.toml | 4 ++-- sstable/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d6d6b918f..7bb412a7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ crc32fast = "1.3.2" once_cell = "1.10.0" regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] } aho-corasick = "1.0" -tantivy-fst = "0.4.0" +tantivy-fst = "0.5" memmap2 = { version = "0.9.0", optional = true } lz4_flex = { version = "0.11", default-features = false, optional = true } zstd = { version = "0.13", optional = true, default-features = false } @@ -79,7 +79,7 @@ more-asserts = "0.3.1" rand_distr = "0.4.3" [target.'cfg(not(windows))'.dev-dependencies] -criterion = { version = "0.5" } +criterion = { version = "0.5", default-features = false } [dev-dependencies.fail] version = "0.5.0" diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 10e94f75c..643d6b976 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -11,13 +11,13 @@ description = "sstables for tantivy" [dependencies] common = {version= "0.6", path="../common", package="tantivy-common"} -tantivy-fst = "0.4" +tantivy-fst = "0.5" # experimental gives us access to Decompressor::upper_bound zstd = { version = "0.13", features = ["experimental"] } [dev-dependencies] proptest = "1" -criterion = "0.5" +criterion = { version = "0.5", default-features = false } names = "0.14" rand = "0.8" From 1a9fc10be981b681f891359757926684ada2405c Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 22 Nov 2023 12:29:53 +0100 Subject: [PATCH 12/12] add fields_metadata to SegmentReader, add columnar docs (#2222) * add fields_metadata to SegmentReader, add columnar docs * use schema to resolve field, add test * normalize paths * merge for FieldsMetadata, add fields_metadata on Index * Update src/core/segment_reader.rs Co-authored-by: Paul Masurel * merge code paths * add Hash * move function oustide --------- Co-authored-by: Paul Masurel --- columnar/src/lib.rs | 19 ++ src/core/index.rs | 23 +++ src/core/inverted_index_reader.rs | 2 +- src/core/json_utils.rs | 8 + src/core/mod.rs | 2 +- src/core/segment_reader.rs | 283 +++++++++++++++++++++++++++++- src/indexer/mod.rs | 254 ++++++++++++++++++++++++++- src/lib.rs | 6 +- src/schema/field_entry.rs | 8 + src/schema/field_type.rs | 18 +- src/schema/schema.rs | 1 + 11 files changed, 608 insertions(+), 16 deletions(-) diff --git a/columnar/src/lib.rs b/columnar/src/lib.rs index a4b1430d6..a20b8363b 100644 --- a/columnar/src/lib.rs +++ b/columnar/src/lib.rs @@ -1,3 +1,22 @@ +//! # Tantivy-Columnar +//! +//! `tantivy-columnar`provides a columnar storage for tantivy. +//! The crate allows for efficient read operations on specific columns rather than entire records. +//! +//! ## Overview +//! +//! - **columnar**: Reading, writing, and merging multiple columns: +//! - **[ColumnarWriter]**: Makes it possible to create a new columnar. +//! - **[ColumnarReader]**: The ColumnarReader makes it possible to access a set of columns +//! associated to field names. +//! - **[merge_columnar]**: Contains the functionalities to merge multiple ColumnarReader or +//! segments into a single one. +//! +//! - **column**: A single column, which contains +//! - [column_index]: Resolves the rows for a document id. Manages the cardinality of the +//! column. +//! - [column_values]: Stores the values of a column in a dense format. + #![cfg_attr(all(feature = "unstable", test), feature(test))] #[cfg(test)] diff --git a/src/core/index.rs b/src/core/index.rs index 08da51444..efdff505b 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -23,6 +23,7 @@ use crate::reader::{IndexReader, IndexReaderBuilder}; use crate::schema::document::Document; use crate::schema::{Field, FieldType, Schema}; use crate::tokenizer::{TextAnalyzer, TokenizerManager}; +use crate::{merge_field_meta_data, FieldMetadata, SegmentReader}; fn load_metas( directory: &dyn Directory, @@ -489,6 +490,28 @@ impl Index { self.inventory.all() } + /// Returns the list of fields that have been indexed in the Index. + /// The field list includes the field defined in the schema as well as the fields + /// that have been indexed as a part of a JSON field. + /// The returned field name is the full field name, including the name of the JSON field. + /// + /// The returned field names can be used in queries. + /// + /// Notice: If your data contains JSON fields this is **very expensive**, as it requires + /// browsing through the inverted index term dictionary and the columnar field dictionary. + /// + /// Disclaimer: Some fields may not be listed here. For instance, if the schema contains a json + /// field that is not indexed nor a fast field but is stored, it is possible for the field + /// to not be listed. + pub fn fields_metadata(&self) -> crate::Result> { + let segments = self.searchable_segments()?; + let fields_metadata: Vec> = segments + .into_iter() + .map(|segment| SegmentReader::open(&segment)?.fields_metadata()) + .collect::>()?; + Ok(merge_field_meta_data(fields_metadata, &self.schema())) + } + /// Creates a new segment_meta (Advanced user only). /// /// As long as the `SegmentMeta` lives, the files associated with the diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index 5326ce7e9..059ec988c 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -75,7 +75,7 @@ impl InvertedIndexReader { /// /// Notice: This requires a full scan and therefore **very expensive**. /// TODO: Move to sstable to use the index. - pub fn list_fields(&self) -> io::Result> { + pub fn list_encoded_fields(&self) -> io::Result> { let mut stream = self.termdict.stream()?; let mut fields = Vec::new(); let mut fields_set = FnvHashSet::default(); diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index a9060bc6a..09059ddbf 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -62,6 +62,14 @@ impl IndexingPositionsPerPath { } } +/// Convert JSON_PATH_SEGMENT_SEP to a dot. +pub fn json_path_sep_to_dot(path: &mut str) { + // This is safe since we are replacing a ASCII character by another ASCII character. + unsafe { + replace_in_place(JSON_PATH_SEGMENT_SEP, b'.', path.as_bytes_mut()); + } +} + #[allow(clippy::too_many_arguments)] pub(crate) fn index_json_values<'a, V: Value<'a>>( doc: DocId, diff --git a/src/core/mod.rs b/src/core/mod.rs index b0b674c2e..6a98f6fe0 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -25,7 +25,7 @@ pub use self::searcher::{Searcher, SearcherGeneration}; pub use self::segment::Segment; pub use self::segment_component::SegmentComponent; pub use self::segment_id::SegmentId; -pub use self::segment_reader::SegmentReader; +pub use self::segment_reader::{merge_field_meta_data, FieldMetadata, SegmentReader}; pub use self::single_segment_index_writer::SingleSegmentIndexWriter; /// The meta file contains all the information about the list of segments and the schema diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index db3170a2c..cae1b537d 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -1,12 +1,17 @@ use std::collections::HashMap; +use std::ops::BitOrAssign; use std::sync::{Arc, RwLock}; use std::{fmt, io}; +use fnv::FnvHashMap; +use itertools::Itertools; + use crate::core::{InvertedIndexReader, Segment, SegmentComponent, SegmentId}; use crate::directory::{CompositeFile, FileSlice}; use crate::error::DataCorruption; use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders}; use crate::fieldnorm::{FieldNormReader, FieldNormReaders}; +use crate::json_utils::json_path_sep_to_dot; use crate::schema::{Field, IndexRecordOption, Schema, Type}; use crate::space_usage::SegmentSpaceUsage; use crate::store::StoreReader; @@ -280,6 +285,103 @@ impl SegmentReader { Ok(inv_idx_reader) } + /// Returns the list of fields that have been indexed in the segment. + /// The field list includes the field defined in the schema as well as the fields + /// that have been indexed as a part of a JSON field. + /// The returned field name is the full field name, including the name of the JSON field. + /// + /// The returned field names can be used in queries. + /// + /// Notice: If your data contains JSON fields this is **very expensive**, as it requires + /// browsing through the inverted index term dictionary and the columnar field dictionary. + /// + /// Disclaimer: Some fields may not be listed here. For instance, if the schema contains a json + /// field that is not indexed nor a fast field but is stored, it is possible for the field + /// to not be listed. + pub fn fields_metadata(&self) -> crate::Result> { + let mut indexed_fields: Vec = Vec::new(); + let mut map_to_canonical = FnvHashMap::default(); + for (field, field_entry) in self.schema().fields() { + let field_name = field_entry.name().to_string(); + let is_indexed = field_entry.is_indexed(); + + if is_indexed { + let is_json = field_entry.field_type().value_type() == Type::Json; + if is_json { + let inv_index = self.inverted_index(field)?; + let encoded_fields_in_index = inv_index.list_encoded_fields()?; + let mut build_path = |field_name: &str, mut json_path: String| { + // In this case we need to map the potential fast field to the field name + // accepted by the query parser. + let create_canonical = + !field_entry.is_expand_dots_enabled() && json_path.contains('.'); + if create_canonical { + // Without expand dots enabled dots need to be escaped. + let escaped_json_path = json_path.replace('.', "\\."); + let full_path = format!("{}.{}", field_name, escaped_json_path); + let full_path_unescaped = format!("{}.{}", field_name, &json_path); + map_to_canonical.insert(full_path_unescaped, full_path.to_string()); + full_path + } else { + // With expand dots enabled, we can use '.' instead of '\u{1}'. + json_path_sep_to_dot(&mut json_path); + format!("{}.{}", field_name, json_path) + } + }; + indexed_fields.extend( + encoded_fields_in_index + .into_iter() + .map(|(name, typ)| (build_path(&field_name, name), typ)) + .map(|(field_name, typ)| FieldMetadata { + indexed: true, + stored: false, + field_name, + fast: false, + typ, + }), + ); + } else { + indexed_fields.push(FieldMetadata { + indexed: true, + stored: false, + field_name: field_name.to_string(), + fast: false, + typ: field_entry.field_type().value_type(), + }); + } + } + } + let mut fast_fields: Vec = self + .fast_fields() + .columnar() + .iter_columns()? + .map(|(mut field_name, handle)| { + json_path_sep_to_dot(&mut field_name); + // map to canonical path, to avoid similar but different entries. + // Eventually we should just accept '.' seperated for all cases. + let field_name = map_to_canonical + .get(&field_name) + .unwrap_or(&field_name) + .to_string(); + FieldMetadata { + indexed: false, + stored: false, + field_name, + fast: true, + typ: Type::from(handle.column_type()), + } + }) + .collect(); + // Since the type is encoded differently in the fast field and in the inverted index, + // the order of the fields is not guaranteed to be the same. Therefore, we sort the fields. + // If we are sure that the order is the same, we can remove this sort. + indexed_fields.sort_unstable(); + fast_fields.sort_unstable(); + let merged = merge_field_meta_data(vec![indexed_fields, fast_fields], &self.schema); + + Ok(merged) + } + /// Returns the segment id pub fn segment_id(&self) -> SegmentId { self.segment_id @@ -330,6 +432,65 @@ impl SegmentReader { } } +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +/// FieldMetadata +pub struct FieldMetadata { + /// The field name + // Notice: Don't reorder the declaration of 1.field_name 2.typ, as it is used for ordering by + // field_name then typ. + pub field_name: String, + /// The field type + // Notice: Don't reorder the declaration of 1.field_name 2.typ, as it is used for ordering by + // field_name then typ. + pub typ: Type, + /// Is the field indexed for search + pub indexed: bool, + /// Is the field stored in the doc store + pub stored: bool, + /// Is the field stored in the columnar storage + pub fast: bool, +} +impl BitOrAssign for FieldMetadata { + fn bitor_assign(&mut self, rhs: Self) { + assert!(self.field_name == rhs.field_name); + assert!(self.typ == rhs.typ); + self.indexed |= rhs.indexed; + self.stored |= rhs.stored; + self.fast |= rhs.fast; + } +} + +// Maybe too slow for the high cardinality case +fn is_field_stored(field_name: &str, schema: &Schema) -> bool { + schema + .find_field(field_name) + .map(|(field, _path)| schema.get_field_entry(field).is_stored()) + .unwrap_or(false) +} + +/// Helper to merge the field metadata from multiple segments. +pub fn merge_field_meta_data( + field_metadatas: Vec>, + schema: &Schema, +) -> Vec { + let mut merged_field_metadata = Vec::new(); + for (_key, mut group) in &field_metadatas + .into_iter() + .kmerge_by(|left, right| left < right) + // TODO: Remove allocation + .group_by(|el| (el.field_name.to_string(), el.typ)) + { + let mut merged: FieldMetadata = group.next().unwrap(); + for el in group { + merged |= el; + } + // Currently is_field_stored is maybe too slow for the high cardinality case + merged.stored = is_field_stored(&merged.field_name, schema); + merged_field_metadata.push(merged); + } + merged_field_metadata +} + fn intersect_alive_bitset( left_opt: Option, right_opt: Option, @@ -353,9 +514,127 @@ impl fmt::Debug for SegmentReader { #[cfg(test)] mod test { + use super::*; use crate::core::Index; - use crate::schema::{Schema, Term, STORED, TEXT}; - use crate::{DocId, IndexWriter}; + use crate::schema::{Schema, SchemaBuilder, Term, STORED, TEXT}; + use crate::{DocId, FieldMetadata, IndexWriter}; + + #[test] + fn test_merge_field_meta_data_same() { + let schema = SchemaBuilder::new().build(); + let field_metadata1 = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: true, + stored: false, + fast: true, + }; + let field_metadata2 = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: true, + stored: false, + fast: true, + }; + let res = merge_field_meta_data( + vec![vec![field_metadata1.clone()], vec![field_metadata2]], + &schema, + ); + assert_eq!(res, vec![field_metadata1]); + } + #[test] + fn test_merge_field_meta_data_different() { + let schema = SchemaBuilder::new().build(); + let field_metadata1 = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: false, + stored: false, + fast: true, + }; + let field_metadata2 = FieldMetadata { + field_name: "b".to_string(), + typ: crate::schema::Type::Str, + indexed: false, + stored: false, + fast: true, + }; + let field_metadata3 = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: true, + stored: false, + fast: false, + }; + let res = merge_field_meta_data( + vec![ + vec![field_metadata1.clone(), field_metadata2.clone()], + vec![field_metadata3], + ], + &schema, + ); + let field_metadata_expected1 = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: true, + stored: false, + fast: true, + }; + assert_eq!(res, vec![field_metadata_expected1, field_metadata2.clone()]); + } + #[test] + fn test_merge_field_meta_data_merge() { + use pretty_assertions::assert_eq; + let get_meta_data = |name: &str, typ: Type| FieldMetadata { + field_name: name.to_string(), + typ, + indexed: false, + stored: false, + fast: true, + }; + let schema = SchemaBuilder::new().build(); + let mut metas = vec![get_meta_data("d", Type::Str), get_meta_data("e", Type::U64)]; + metas.sort(); + let res = merge_field_meta_data(vec![vec![get_meta_data("e", Type::Str)], metas], &schema); + assert_eq!( + res, + vec![ + get_meta_data("d", Type::Str), + get_meta_data("e", Type::Str), + get_meta_data("e", Type::U64), + ] + ); + } + #[test] + fn test_merge_field_meta_data_bitxor() { + let field_metadata1 = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: false, + stored: false, + fast: true, + }; + let field_metadata2 = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: true, + stored: false, + fast: false, + }; + let field_metadata_expected = FieldMetadata { + field_name: "a".to_string(), + typ: crate::schema::Type::Str, + indexed: true, + stored: false, + fast: true, + }; + let mut res1 = field_metadata1.clone(); + res1 |= field_metadata2.clone(); + let mut res2 = field_metadata2.clone(); + res2 |= field_metadata1; + assert_eq!(res1, field_metadata_expected); + assert_eq!(res2, field_metadata_expected); + } #[test] fn test_num_alive() -> crate::Result<()> { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 13731444a..204ce134b 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -59,10 +59,13 @@ type AddBatchReceiver = channel::Receiver>; #[cfg(test)] mod tests_mmap { - use crate::collector::Count; - use crate::query::QueryParser; - use crate::schema::{JsonObjectOptions, Schema, Type, TEXT}; - use crate::{Index, IndexWriter, Term}; + use crate::aggregation::agg_req::Aggregations; + use crate::aggregation::agg_result::AggregationResults; + use crate::aggregation::AggregationCollector; + use crate::collector::{Count, TopDocs}; + use crate::query::{AllQuery, QueryParser}; + use crate::schema::{JsonObjectOptions, Schema, Type, FAST, INDEXED, STORED, TEXT}; + use crate::{FieldMetadata, Index, IndexWriter, Term}; #[test] fn test_advance_delete_bug() -> crate::Result<()> { @@ -173,8 +176,7 @@ mod tests_mmap { #[test] fn test_json_field_list_fields() { let mut schema_builder = Schema::builder(); - let json_options: JsonObjectOptions = - JsonObjectOptions::from(TEXT).set_expand_dots_enabled(); + let json_options: JsonObjectOptions = JsonObjectOptions::from(TEXT); let json_field = schema_builder.add_json_field("json", json_options); let index = Index::create_in_ram(schema_builder.build()); let mut index_writer = index.writer_for_tests().unwrap(); @@ -193,9 +195,9 @@ mod tests_mmap { let reader = &searcher.segment_readers()[0]; let inverted_index = reader.inverted_index(json_field).unwrap(); assert_eq!( - inverted_index.list_fields().unwrap(), + inverted_index.list_encoded_fields().unwrap(), [ - ("k8s\u{1}container\u{1}name".to_string(), Type::Str), + ("k8s.container.name".to_string(), Type::Str), ("sub\u{1}a".to_string(), Type::I64), ("sub\u{1}b".to_string(), Type::I64), ("suber\u{1}a".to_string(), Type::I64), @@ -205,4 +207,240 @@ mod tests_mmap { ] ); } + + #[test] + fn test_json_fields_metadata_expanded_dots_one_segment() { + test_json_fields_metadata(true, true); + } + #[test] + fn test_json_fields_metadata_expanded_dots_multi_segment() { + test_json_fields_metadata(true, false); + } + #[test] + fn test_json_fields_metadata_no_expanded_dots_one_segment() { + test_json_fields_metadata(false, true); + } + #[test] + fn test_json_fields_metadata_no_expanded_dots_multi_segment() { + test_json_fields_metadata(false, false); + } + + fn test_json_fields_metadata(expanded_dots: bool, one_segment: bool) { + use pretty_assertions::assert_eq; + let mut schema_builder = Schema::builder(); + let json_options: JsonObjectOptions = + JsonObjectOptions::from(TEXT).set_fast(None).set_stored(); + let json_options = if expanded_dots { + json_options.set_expand_dots_enabled() + } else { + json_options + }; + schema_builder.add_json_field("json.confusing", json_options.clone()); + let json_field = schema_builder.add_json_field("json.shadow", json_options.clone()); + let json_field2 = schema_builder.add_json_field("json", json_options.clone()); + schema_builder.add_json_field("empty_json", json_options); + let number_field = schema_builder.add_u64_field("numbers", FAST); + schema_builder.add_u64_field("empty", FAST | INDEXED | STORED); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests().unwrap(); + let json = + serde_json::json!({"k8s.container.name": "a", "val": "a", "sub": {"a": 1, "b": 1}}); + index_writer.add_document(doc!(json_field=>json)).unwrap(); + let json = + serde_json::json!({"k8s.container.name": "a", "val": "a", "suber": {"a": 1, "b": 1}}); + if !one_segment { + index_writer.commit().unwrap(); + } + index_writer.add_document(doc!(json_field=>json)).unwrap(); + let json = serde_json::json!({"k8s.container.name": "a", "k8s.container.name": "a", "val": "a", "suber": {"a": "a", "b": 1}}); + index_writer + .add_document(doc!(number_field => 50u64, json_field=>json, json_field2=>json!({"shadow": {"val": "a"}}))) + .unwrap(); + index_writer.commit().unwrap(); + let reader = index.reader().unwrap(); + + let searcher = reader.searcher(); + assert_eq!(searcher.num_docs(), 3); + + let fields_metadata = index.fields_metadata().unwrap(); + assert_eq!( + fields_metadata, + [ + FieldMetadata { + field_name: "empty".to_string(), + indexed: true, + stored: true, + fast: true, + typ: Type::U64 + }, + FieldMetadata { + field_name: if expanded_dots { + "json.shadow.k8s.container.name".to_string() + } else { + "json.shadow.k8s\\.container\\.name".to_string() + }, + indexed: true, + stored: true, + fast: true, + typ: Type::Str + }, + FieldMetadata { + field_name: "json.shadow.sub.a".to_string(), + indexed: true, + stored: true, + fast: true, + typ: Type::I64 + }, + FieldMetadata { + field_name: "json.shadow.sub.b".to_string(), + indexed: true, + stored: true, + fast: true, + typ: Type::I64 + }, + FieldMetadata { + field_name: "json.shadow.suber.a".to_string(), + indexed: true, + stored: true, + fast: true, + typ: Type::I64 + }, + FieldMetadata { + field_name: "json.shadow.suber.a".to_string(), + indexed: true, + stored: true, + fast: true, + typ: Type::Str + }, + FieldMetadata { + field_name: "json.shadow.suber.b".to_string(), + indexed: true, + stored: true, + fast: true, + typ: Type::I64 + }, + FieldMetadata { + field_name: "json.shadow.val".to_string(), + indexed: true, + stored: true, + fast: true, + typ: Type::Str + }, + FieldMetadata { + field_name: "numbers".to_string(), + indexed: false, + stored: false, + fast: true, + typ: Type::U64 + } + ] + ); + let query_parser = QueryParser::for_index(&index, vec![]); + // Test if returned field name can be queried + for indexed_field in fields_metadata.iter().filter(|meta| meta.indexed) { + let val = if indexed_field.typ == Type::Str { + "a" + } else { + "1" + }; + let query_str = &format!("{}:{}", indexed_field.field_name, val); + let query = query_parser.parse_query(query_str).unwrap(); + let count_docs = searcher.search(&*query, &TopDocs::with_limit(2)).unwrap(); + if indexed_field.field_name.contains("empty") || indexed_field.typ == Type::Json { + assert_eq!(count_docs.len(), 0); + } else { + assert!(!count_docs.is_empty(), "{}", indexed_field.field_name); + } + } + // Test if returned field name can be used for aggregation + for fast_field in fields_metadata.iter().filter(|meta| meta.fast) { + let agg_req_str = json!( + { + "termagg": { + "terms": { + "field": fast_field.field_name, + } + } + }); + + let agg_req: Aggregations = serde_json::from_value(agg_req_str).unwrap(); + let collector = AggregationCollector::from_aggs(agg_req, Default::default()); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + let res = serde_json::to_value(agg_res).unwrap(); + if !fast_field.field_name.contains("empty") && fast_field.typ != Type::Json { + assert!( + !res["termagg"]["buckets"].as_array().unwrap().is_empty(), + "{}", + fast_field.field_name + ); + } + } + } + + #[test] + fn test_json_field_shadowing_field_name_bug() { + /// This test is only there to display a bug on addressing a field if it gets shadowed + /// The issues only occurs if the field name that shadows contains a dot. + /// + /// Happens independently of the `expand_dots` option. Since that option does not + /// affect the field name itself. + use pretty_assertions::assert_eq; + let mut schema_builder = Schema::builder(); + let json_options: JsonObjectOptions = + JsonObjectOptions::from(TEXT).set_fast(None).set_stored(); + // let json_options = json_options.set_expand_dots_enabled(); + let json_field_shadow = schema_builder.add_json_field("json.shadow", json_options.clone()); + let json_field = schema_builder.add_json_field("json", json_options.clone()); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests().unwrap(); + index_writer + .add_document( + doc!(json_field_shadow=>json!({"val": "b"}), json_field=>json!({"shadow": {"val": "a"}})), + ) + .unwrap(); + index_writer.commit().unwrap(); + let reader = index.reader().unwrap(); + + let searcher = reader.searcher(); + + let fields_and_vals = vec![ + // Only way to address or it gets shadowed by `json.shadow` field + ("json.shadow\u{1}val".to_string(), "a"), // Succeeds + //("json.shadow.val".to_string(), "a"), // Fails + ("json.shadow.val".to_string(), "b"), // Succeeds + ]; + + let query_parser = QueryParser::for_index(&index, vec![]); + // Test if field name can be queried + for (indexed_field, val) in fields_and_vals.iter() { + let query_str = &format!("{}:{}", indexed_field, val); + let query = query_parser.parse_query(query_str).unwrap(); + let count_docs = searcher.search(&*query, &TopDocs::with_limit(2)).unwrap(); + assert!(!count_docs.is_empty(), "{}:{}", indexed_field, val); + } + // Test if field name can be used for aggregation + for (field_name, val) in fields_and_vals.iter() { + let agg_req_str = json!( + { + "termagg": { + "terms": { + "field": field_name, + } + } + }); + + let agg_req: Aggregations = serde_json::from_value(agg_req_str).unwrap(); + let collector = AggregationCollector::from_aggs(agg_req, Default::default()); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + let res = serde_json::to_value(agg_res).unwrap(); + assert_eq!( + res["termagg"]["buckets"].as_array().unwrap()[0]["key"] + .as_str() + .unwrap(), + *val, + "{}", + field_name + ); + } + } } diff --git a/src/lib.rs b/src/lib.rs index c682643ec..e14d02a6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -221,9 +221,9 @@ pub use self::snippet::{Snippet, SnippetGenerator}; #[doc(hidden)] pub use crate::core::json_utils; pub use crate::core::{ - Executor, Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, InvertedIndexReader, - Order, Searcher, SearcherGeneration, Segment, SegmentComponent, SegmentId, SegmentMeta, - SegmentReader, SingleSegmentIndexWriter, + merge_field_meta_data, Executor, FieldMetadata, Index, IndexBuilder, IndexMeta, IndexSettings, + IndexSortByField, InvertedIndexReader, Order, Searcher, SearcherGeneration, Segment, + SegmentComponent, SegmentId, SegmentMeta, SegmentReader, SingleSegmentIndexWriter, }; pub use crate::directory::Directory; pub use crate::indexer::IndexWriter; diff --git a/src/schema/field_entry.rs b/src/schema/field_entry.rs index beb23b4f0..9fa643ca0 100644 --- a/src/schema/field_entry.rs +++ b/src/schema/field_entry.rs @@ -108,6 +108,14 @@ impl FieldEntry { self.field_type.is_fast() } + /// Returns true if the field has the expand dots option set (for json fields) + pub fn is_expand_dots_enabled(&self) -> bool { + match self.field_type { + FieldType::JsonObject(ref options) => options.is_expand_dots_enabled(), + _ => false, + } + } + /// Returns true if the field is stored #[inline] pub fn is_stored(&self) -> bool { diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 04e71394b..828cf238e 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use base64::engine::general_purpose::STANDARD as BASE64; use base64::Engine; +use columnar::ColumnType; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use thiserror::Error; @@ -47,7 +48,7 @@ pub enum ValueParsingError { /// /// Contrary to FieldType, this does /// not include the way the field must be indexed. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] #[repr(u8)] pub enum Type { /// `&str` @@ -72,6 +73,21 @@ pub enum Type { IpAddr = b'p', } +impl From for Type { + fn from(value: ColumnType) -> Self { + match value { + ColumnType::Str => Type::Str, + ColumnType::U64 => Type::U64, + ColumnType::I64 => Type::I64, + ColumnType::F64 => Type::F64, + ColumnType::Bool => Type::Bool, + ColumnType::DateTime => Type::Date, + ColumnType::Bytes => Type::Bytes, + ColumnType::IpAddr => Type::IpAddr, + } + } +} + const ALL_TYPES: [Type; 10] = [ Type::Str, Type::U64, diff --git a/src/schema/schema.rs b/src/schema/schema.rs index e19c942ef..9fec25c05 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -357,6 +357,7 @@ impl Schema { pub fn find_field_with_default<'a>( &self, full_path: &'a str, + default_field_opt: Option, ) -> Option<(Field, &'a str)> { let (field, json_path) = self