Merge pull request #2253 from quickwit-oss/issue/2251-bug-merge-json-object-with-number

Fix bug occuring when merging JSON object indexed with positions.
This commit is contained in:
François Massot
2023-11-14 17:28:29 +01:00
committed by GitHub
8 changed files with 209 additions and 17 deletions

View File

@@ -1,12 +1,13 @@
use crate::collector::Count;
use crate::directory::{RamDirectory, WatchCallback};
use crate::indexer::NoMergePolicy;
use crate::indexer::{LogMergePolicy, NoMergePolicy};
use crate::json_utils::JsonTermWriter;
use crate::query::TermQuery;
use crate::schema::{Field, IndexRecordOption, Schema, INDEXED, STRING, TEXT};
use crate::schema::{Field, IndexRecordOption, Schema, Type, INDEXED, STRING, TEXT};
use crate::tokenizer::TokenizerManager;
use crate::{
Directory, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, ReloadPolicy,
SegmentId, TantivyDocument, Term,
Directory, DocSet, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, Postings,
ReloadPolicy, SegmentId, TantivyDocument, Term,
};
#[test]
@@ -344,3 +345,132 @@ fn test_merging_segment_update_docfreq() {
let term_info = inv_index.get_term_info(&term).unwrap().unwrap();
assert_eq!(term_info.doc_freq, 12);
}
// motivated by https://github.com/quickwit-oss/quickwit/issues/4130
#[test]
fn test_positions_merge_bug_non_text_json_vint() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_json_field("dynamic", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let mut writer: IndexWriter = index.writer_for_tests().unwrap();
let mut merge_policy = LogMergePolicy::default();
merge_policy.set_min_num_segments(2);
writer.set_merge_policy(Box::new(merge_policy));
// Here a string would work.
let doc_json = r#"{"tenant_id":75}"#;
let vals = serde_json::from_str(doc_json).unwrap();
let mut doc = TantivyDocument::default();
doc.add_object(field, vals);
writer.add_document(doc.clone()).unwrap();
writer.commit().unwrap();
writer.add_document(doc.clone()).unwrap();
writer.commit().unwrap();
writer.wait_merging_threads().unwrap();
let reader = index.reader().unwrap();
assert_eq!(reader.searcher().segment_readers().len(), 1);
}
// Same as above but with bitpacked blocks
#[test]
fn test_positions_merge_bug_non_text_json_bitpacked_block() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_json_field("dynamic", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let mut writer: IndexWriter = index.writer_for_tests().unwrap();
let mut merge_policy = LogMergePolicy::default();
merge_policy.set_min_num_segments(2);
writer.set_merge_policy(Box::new(merge_policy));
// Here a string would work.
let doc_json = r#"{"tenant_id":75}"#;
let vals = serde_json::from_str(doc_json).unwrap();
let mut doc = TantivyDocument::default();
doc.add_object(field, vals);
for _ in 0..128 {
writer.add_document(doc.clone()).unwrap();
}
writer.commit().unwrap();
writer.add_document(doc.clone()).unwrap();
writer.commit().unwrap();
writer.wait_merging_threads().unwrap();
let reader = index.reader().unwrap();
assert_eq!(reader.searcher().segment_readers().len(), 1);
}
#[test]
fn test_non_text_json_term_freq() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_json_field("dynamic", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let mut writer: IndexWriter = index.writer_for_tests().unwrap();
// Here a string would work.
let doc_json = r#"{"tenant_id":75}"#;
let vals = serde_json::from_str(doc_json).unwrap();
let mut doc = TantivyDocument::default();
doc.add_object(field, vals);
writer.add_document(doc.clone()).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
assert_eq!(reader.searcher().segment_readers().len(), 1);
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
let inv_idx = segment_reader.inverted_index(field).unwrap();
let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term, false);
json_term_writer.push_path_segment("tenant_id");
json_term_writer.close_path_and_set_type(Type::U64);
json_term_writer.set_fast_value(75u64);
let postings = inv_idx
.read_postings(
&json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
.unwrap();
assert_eq!(postings.doc(), 0);
assert_eq!(postings.term_freq(), 1u32);
}
#[test]
fn test_non_text_json_term_freq_bitpacked() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_json_field("dynamic", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let mut writer: IndexWriter = index.writer_for_tests().unwrap();
// Here a string would work.
let doc_json = r#"{"tenant_id":75}"#;
let vals = serde_json::from_str(doc_json).unwrap();
let mut doc = TantivyDocument::default();
doc.add_object(field, vals);
let num_docs = 132;
for _ in 0..num_docs {
writer.add_document(doc.clone()).unwrap();
}
writer.commit().unwrap();
let reader = index.reader().unwrap();
assert_eq!(reader.searcher().segment_readers().len(), 1);
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
let inv_idx = segment_reader.inverted_index(field).unwrap();
let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term, false);
json_term_writer.push_path_segment("tenant_id");
json_term_writer.close_path_and_set_type(Type::U64);
json_term_writer.set_fast_value(75u64);
let mut postings = inv_idx
.read_postings(
&json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
.unwrap();
assert_eq!(postings.doc(), 0);
assert_eq!(postings.term_freq(), 1u32);
for i in 1..num_docs {
assert_eq!(postings.advance(), i);
assert_eq!(postings.term_freq(), 1u32);
}
}

View File

@@ -552,7 +552,41 @@ impl IndexMerger {
continue;
}
field_serializer.new_term(term_bytes, total_doc_freq)?;
// This should never happen as we early exited for total_doc_freq == 0.
assert!(!segment_postings_containing_the_term.is_empty());
let has_term_freq = {
let has_term_freq = !segment_postings_containing_the_term[0]
.1
.block_cursor
.freqs()
.is_empty();
for (_, postings) in &segment_postings_containing_the_term[1..] {
// This may look at a strange way to test whether we have term freq or not.
// With JSON object, the schema is not sufficient to know whether a term
// has its term frequency encoded or not:
// strings may have term frequencies, while number terms never have one.
//
// Ideally, we should have burnt one bit of two in the `TermInfo`.
// However, we preferred not changing the codec too much and detect this
// instead by
// - looking at the size of the skip data for bitpacked blocks
// - observing the absence of remaining data after reading the docs for vint
// blocks.
//
// Overall the reliable way to know if we have actual frequencies loaded or not
// is to check whether the actual decoded array is empty or not.
if has_term_freq != !postings.block_cursor.freqs().is_empty() {
return Err(DataCorruption::comment_only(
"Term freqs are inconsistent across segments",
)
.into());
}
}
has_term_freq
};
field_serializer.new_term(term_bytes, total_doc_freq, has_term_freq)?;
// We can now serialize this postings, by pushing each document to the
// postings serializer.
@@ -567,8 +601,13 @@ impl IndexMerger {
if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] {
// we make sure to only write the term if
// there is at least one document.
let term_freq = segment_postings.term_freq();
segment_postings.positions(&mut positions_buffer);
let term_freq = if has_term_freq {
segment_postings.positions(&mut positions_buffer);
segment_postings.term_freq()
} else {
0u32
};
// if doc_id_mapping exists, the doc_ids are reordered, they are
// not just stacked. The field serializer expects monotonically increasing
// doc_ids, so we collect and sort them first, before writing.

View File

@@ -11,6 +11,10 @@ use crate::schema::{Field, Type, JSON_END_OF_PATH};
use crate::tokenizer::TokenStream;
use crate::{DocId, Term};
/// The `JsonPostingsWriter` is odd in that it relies on a hidden contract:
///
/// `subscribe` is called directly to index non-text tokens, while
/// `index_text` is used to index text.
#[derive(Default)]
pub(crate) struct JsonPostingsWriter<Rec: Recorder> {
str_posting_writer: SpecializedPostingsWriter<Rec>,

View File

@@ -63,7 +63,7 @@ pub mod tests {
let mut segment = index.new_segment();
let mut posting_serializer = InvertedIndexSerializer::open(&mut segment)?;
let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4, None)?;
field_serializer.new_term("abc".as_bytes(), 12u32)?;
field_serializer.new_term("abc".as_bytes(), 12u32, true)?;
for doc_id in 0u32..120u32 {
let delta_positions = vec![1, 2, 3, 2];
field_serializer.write_doc(doc_id, 4, &delta_positions);

View File

@@ -194,7 +194,7 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
) -> io::Result<()> {
let recorder: Rec = ctx.term_index.read(addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term, term_doc_freq)?;
serializer.new_term(term, term_doc_freq, recorder.has_term_freq())?;
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
serializer.close_term()?;
Ok(())

View File

@@ -79,6 +79,11 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static {
///
/// Returns `None` if not available.
fn term_doc_freq(&self) -> Option<u32>;
#[inline]
fn has_term_freq(&self) -> bool {
true
}
}
/// Only records the doc ids
@@ -136,6 +141,10 @@ impl Recorder for DocIdRecorder {
fn term_doc_freq(&self) -> Option<u32> {
None
}
fn has_term_freq(&self) -> bool {
false
}
}
/// Takes an Iterator of delta encoded elements and returns an iterator

View File

@@ -71,7 +71,7 @@ impl SegmentPostings {
{
let mut postings_serializer =
PostingsSerializer::new(&mut buffer, 0.0, IndexRecordOption::Basic, None);
postings_serializer.new_term(docs.len() as u32);
postings_serializer.new_term(docs.len() as u32, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32);
}
@@ -120,7 +120,7 @@ impl SegmentPostings {
IndexRecordOption::WithFreqs,
fieldnorm_reader,
);
postings_serializer.new_term(doc_and_tfs.len() as u32);
postings_serializer.new_term(doc_and_tfs.len() as u32, true);
for &(doc, tf) in doc_and_tfs {
postings_serializer.write_doc(doc, tf);
}
@@ -238,14 +238,18 @@ impl Postings for SegmentPostings {
}
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq() as usize;
let term_freq = self.term_freq();
if let Some(position_reader) = self.position_reader.as_mut() {
debug_assert!(
!self.block_cursor.freqs().is_empty(),
"No positions available"
);
let read_offset = self.block_cursor.position_offset()
+ (self.block_cursor.freqs()[..self.cur]
.iter()
.cloned()
.sum::<u32>() as u64);
output.resize(term_freq, 0u32);
output.resize(term_freq as usize, 0u32);
position_reader.read(read_offset, &mut output[..]);
let mut cum = offset;
for output_mut in output.iter_mut() {

View File

@@ -168,7 +168,12 @@ impl<'a> FieldSerializer<'a> {
/// * term - the term. It needs to come after the previous term according to the lexicographical
/// order.
/// * term_doc_freq - return the number of document containing the term.
pub fn new_term(&mut self, term: &[u8], term_doc_freq: u32) -> io::Result<()> {
pub fn new_term(
&mut self,
term: &[u8],
term_doc_freq: u32,
record_term_freq: bool,
) -> io::Result<()> {
assert!(
!self.term_open,
"Called new_term, while the previous term was not closed."
@@ -177,7 +182,8 @@ impl<'a> FieldSerializer<'a> {
self.postings_serializer.clear();
self.current_term_info = self.current_term_info();
self.term_dictionary_builder.insert_key(term)?;
self.postings_serializer.new_term(term_doc_freq);
self.postings_serializer
.new_term(term_doc_freq, record_term_freq);
Ok(())
}
@@ -330,10 +336,10 @@ impl<W: Write> PostingsSerializer<W> {
}
}
pub fn new_term(&mut self, term_doc_freq: u32) {
pub fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
self.bm25_weight = None;
self.term_has_freq = self.mode.has_freq() && term_doc_freq != 0;
self.term_has_freq = self.mode.has_freq() && record_term_freq;
if !self.term_has_freq {
return;
}