Compare commits

..

2 Commits

Author SHA1 Message Date
Paul Masurel
24954d4b27 Fixes bug that causes out-of-order sstable key.
The previous way to address the problem was to replace \u{0000}
with 0 in different places.

This logic had several flaws:
Done on the serializer side (like it was for the columnar), there was
a collision problem.

If a document in the segment contained a json field with a \0 and
antoher doc contained the same json field but `0` then we were sending
the same field path twice to the serializer.

Another option would have been to normalizes all values on the writer
side.

This PR simplifies the logic and simply ignore json path containing a
\0, both in the columnar and the inverted index.

Closes #2442
2024-06-25 14:37:40 +09:00
Paul Masurel
e453848134 Recycling buffer in PrefixPhraseScorer (#2443) 2024-06-24 17:11:53 +09:00
10 changed files with 67 additions and 56 deletions

View File

@@ -8,6 +8,7 @@ use std::net::Ipv6Addr;
use column_operation::ColumnOperation;
pub(crate) use column_writers::CompatibleNumericalTypes;
use common::json_path_writer::JSON_END_OF_PATH;
use common::CountingWriter;
pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
@@ -283,12 +284,17 @@ impl ColumnarWriter {
.iter()
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
);
// TODO: replace JSON_END_OF_PATH with b'0' in columns
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {
if column_name.contains(&JSON_END_OF_PATH) {
// Tantivy uses b'0' as a separator for nested fields in JSON.
// Column names with a b'0' are not simply ignored by the columnar (and the inverted
// index).
continue;
}
match column_type {
ColumnType::Bool => {
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);

View File

@@ -93,18 +93,3 @@ impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
self.columnar_serializer.wrt.write_all(buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prepare_key_bytes() {
let mut buffer: Vec<u8> = b"somegarbage".to_vec();
prepare_key(b"root\0child", ColumnType::Str, &mut buffer);
assert_eq!(buffer.len(), 12);
assert_eq!(&buffer[..10], b"root0child");
assert_eq!(buffer[10], 0u8);
assert_eq!(buffer[11], ColumnType::Str.to_code());
}
}

View File

@@ -1,4 +1,4 @@
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::json_path_writer::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
use common::{replace_in_place, JsonPathWriter};
use rustc_hash::FxHashMap;
@@ -83,6 +83,9 @@ fn index_json_object<'a, V: Value<'a>>(
positions_per_path: &mut IndexingPositionsPerPath,
) {
for (json_path_segment, json_value_visitor) in json_visitor {
if json_path_segment.as_bytes().contains(&JSON_END_OF_PATH) {
continue;
}
json_path_writer.push(json_path_segment);
index_json_value(
doc,

View File

@@ -815,8 +815,9 @@ mod tests {
use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, NumericOptions,
TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED, STRING, TEXT,
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED,
STRING, TEXT,
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{
@@ -2378,11 +2379,11 @@ mod tests {
#[test]
fn test_bug_1617_2() {
assert!(test_operation_strategy(
test_operation_strategy(
&[
IndexingOp::AddDoc {
id: 13,
value: Default::default()
value: Default::default(),
},
IndexingOp::DeleteDoc { id: 13 },
IndexingOp::Commit,
@@ -2390,9 +2391,9 @@ mod tests {
IndexingOp::Commit,
IndexingOp::Merge,
],
true
true,
)
.is_ok());
.unwrap();
}
#[test]
@@ -2492,9 +2493,9 @@ mod tests {
}
#[test]
fn test_bug_2442() -> crate::Result<()> {
fn test_bug_2442_reserved_character_fast_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let json_field = schema_builder.add_json_field("json", TEXT | FAST);
let json_field = schema_builder.add_json_field("json", FAST | TEXT);
let schema = schema_builder.build();
let index = Index::builder().schema(schema).create_in_ram()?;
@@ -2515,4 +2516,21 @@ mod tests {
Ok(())
}
#[test]
fn test_bug_2442_reserved_character_columnar() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let options = JsonObjectOptions::from(FAST).set_expand_dots_enabled();
let field = schema_builder.add_json_field("json", options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(field=>json!({"\u{0000}": "A"})))
.unwrap();
index_writer
.add_document(doc!(field=>json!({format!("\u{0000}\u{0000}"): "A"})))
.unwrap();
index_writer.commit().unwrap();
Ok(())
}
}

View File

@@ -145,15 +145,27 @@ mod tests_mmap {
}
}
#[test]
fn test_json_field_null_byte() {
// Test when field name contains a zero byte, which has special meaning in tantivy.
// As a workaround, we convert the zero byte to the ASCII character '0'.
// https://github.com/quickwit-oss/tantivy/issues/2340
// https://github.com/quickwit-oss/tantivy/issues/2193
let field_name_in = "\u{0000}";
let field_name_out = "0";
test_json_field_name(field_name_in, field_name_out);
fn test_json_field_null_byte_is_ignored() {
let mut schema_builder = Schema::builder();
let options = JsonObjectOptions::from(TEXT | FAST).set_expand_dots_enabled();
let field = schema_builder.add_json_field("json", options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(field=>json!({"key": "test1", "invalidkey\u{0000}": "test2"})))
.unwrap();
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
let inv_indexer = segment_reader.inverted_index(field).unwrap();
let term_dict = inv_indexer.terms();
assert_eq!(term_dict.num_terms(), 1);
let mut term_bytes = Vec::new();
term_dict.ord_to_term(0, &mut term_bytes).unwrap();
assert_eq!(term_bytes, b"key\0stest1");
}
#[test]
fn test_json_field_1byte() {
// Test when field name contains a '1' byte, which has special meaning in tantivy.

View File

@@ -1,5 +1,3 @@
use common::json_path_writer::JSON_END_OF_PATH;
use common::replace_in_place;
use fnv::FnvHashMap;
/// `Field` is represented by an unsigned 32-bit integer type.
@@ -40,13 +38,7 @@ impl PathToUnorderedId {
#[cold]
fn insert_new_path(&mut self, path: &str) -> u32 {
let next_id = self.map.len() as u32;
let mut new_path = path.to_string();
// The unsafe below is safe as long as b'.' and JSON_PATH_SEGMENT_SEP are
// valid single byte ut8 strings.
// By utf-8 design, they cannot be part of another codepoint.
unsafe { replace_in_place(JSON_END_OF_PATH, b'0', new_path.as_bytes_mut()) };
let new_path = path.to_string();
self.map.insert(new_path, next_id);
next_id
}

View File

@@ -59,7 +59,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
@@ -69,7 +69,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
let mut prev_term_id = u32::MAX;
let mut term_path_len = 0; // this will be set in the first iteration
for (_field, path_id, term, addr) in term_addrs {
for (_field, path_id, term, addr) in ordered_term_addrs {
if prev_term_id != path_id.path_id() {
term_buffer.truncate_value_bytes(0);
term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes());

View File

@@ -15,6 +15,7 @@ pub trait Postings: DocSet + 'static {
fn term_freq(&self) -> u32;
/// Returns the positions offsetted with a given value.
/// It is not necessary to clear the `output` before calling this method.
/// The output vector will be resized to the `term_freq`.
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>);

View File

@@ -97,6 +97,7 @@ pub struct PhrasePrefixScorer<TPostings: Postings> {
suffixes: Vec<TPostings>,
suffix_offset: u32,
phrase_count: u32,
suffix_position_buffer: Vec<u32>,
}
impl<TPostings: Postings> PhrasePrefixScorer<TPostings> {
@@ -140,6 +141,7 @@ impl<TPostings: Postings> PhrasePrefixScorer<TPostings> {
suffixes,
suffix_offset: (max_offset - suffix_pos) as u32,
phrase_count: 0,
suffix_position_buffer: Vec::with_capacity(100),
};
if phrase_prefix_scorer.doc() != TERMINATED && !phrase_prefix_scorer.matches_prefix() {
phrase_prefix_scorer.advance();
@@ -153,7 +155,6 @@ impl<TPostings: Postings> PhrasePrefixScorer<TPostings> {
fn matches_prefix(&mut self) -> bool {
let mut count = 0;
let mut positions = Vec::new();
let current_doc = self.doc();
let pos_matching = self.phrase_scorer.get_intersection();
for suffix in &mut self.suffixes {
@@ -162,8 +163,8 @@ impl<TPostings: Postings> PhrasePrefixScorer<TPostings> {
}
let doc = suffix.seek(current_doc);
if doc == current_doc {
suffix.positions_with_offset(self.suffix_offset, &mut positions);
count += intersection_count(pos_matching, &positions);
suffix.positions_with_offset(self.suffix_offset, &mut self.suffix_position_buffer);
count += intersection_count(pos_matching, &self.suffix_position_buffer);
}
}
self.phrase_count = count as u32;

View File

@@ -249,15 +249,8 @@ impl Term {
#[inline]
pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
if bytes.contains(&JSON_END_OF_PATH) {
self.0.extend(
bytes
.iter()
.map(|&b| if b == JSON_END_OF_PATH { b'0' } else { b }),
);
} else {
self.0.extend_from_slice(bytes);
}
assert!(!bytes.contains(&JSON_END_OF_PATH));
self.0.extend_from_slice(bytes);
&mut self.0[len_before..]
}
}