Fixes bug that causes out-of-order sstable key. (#2445)

The previous way to address the problem was to replace \u{0000}
with 0 in different places.

This logic had several flaws:
Done on the serializer side (like it was for the columnar), there was
a collision problem.

If a document in the segment contained a json field with a \0 and
antoher doc contained the same json field but `0` then we were sending
the same field path twice to the serializer.

Another option would have been to normalizes all values on the writer
side.

This PR simplifies the logic and simply ignore json path containing a
\0, both in the columnar and the inverted index.

Closes #2442
This commit is contained in:
Paul Masurel
2024-07-01 16:40:07 +09:00
committed by GitHub
parent f9ae295507
commit 0f4c2e27cf
8 changed files with 91 additions and 47 deletions

View File

@@ -8,6 +8,7 @@ use std::net::Ipv6Addr;
use column_operation::ColumnOperation; use column_operation::ColumnOperation;
pub(crate) use column_writers::CompatibleNumericalTypes; pub(crate) use column_writers::CompatibleNumericalTypes;
use common::json_path_writer::JSON_END_OF_PATH;
use common::CountingWriter; use common::CountingWriter;
pub(crate) use serializer::ColumnarSerializer; pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena}; use stacker::{Addr, ArenaHashMap, MemoryArena};
@@ -247,6 +248,7 @@ impl ColumnarWriter {
} }
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> { pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt); let mut serializer = ColumnarSerializer::new(wrt);
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
.numerical_field_hash_map .numerical_field_hash_map
.iter() .iter()
@@ -260,7 +262,7 @@ impl ColumnarWriter {
columns.extend( columns.extend(
self.bytes_field_hash_map self.bytes_field_hash_map
.iter() .iter()
.map(|(term, addr)| (term, ColumnType::Bytes, addr)), .map(|(column_name, addr)| (column_name, ColumnType::Bytes, addr)),
); );
columns.extend( columns.extend(
self.str_field_hash_map self.str_field_hash_map
@@ -287,6 +289,12 @@ impl ColumnarWriter {
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new(); let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns { for (column_name, column_type, addr) in columns {
if column_name.contains(&JSON_END_OF_PATH) {
// Tantivy uses b'0' as a separator for nested fields in JSON.
// Column names with a b'0' are not simply ignored by the columnar (and the inverted
// index).
continue;
}
match column_type { match column_type {
ColumnType::Bool => { ColumnType::Bool => {
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr); let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);

View File

@@ -1,6 +1,7 @@
use std::io; use std::io;
use std::io::Write; use std::io::Write;
use common::json_path_writer::JSON_END_OF_PATH;
use common::{BinarySerializable, CountingWriter}; use common::{BinarySerializable, CountingWriter};
use sstable::value::RangeValueWriter; use sstable::value::RangeValueWriter;
use sstable::RangeSSTable; use sstable::RangeSSTable;
@@ -18,13 +19,8 @@ pub struct ColumnarSerializer<W: io::Write> {
/// code. /// code.
fn prepare_key(key: &[u8], column_type: ColumnType, buffer: &mut Vec<u8>) { fn prepare_key(key: &[u8], column_type: ColumnType, buffer: &mut Vec<u8>) {
buffer.clear(); buffer.clear();
// Convert 0 bytes to '0' string, as 0 bytes are reserved for the end of the path. buffer.extend_from_slice(key);
if key.contains(&0u8) { buffer.push(JSON_END_OF_PATH);
buffer.extend(key.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
buffer.extend_from_slice(key);
}
buffer.push(0u8);
buffer.push(column_type.to_code()); buffer.push(column_type.to_code());
} }
@@ -97,18 +93,3 @@ impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
self.columnar_serializer.wrt.write_all(buf) self.columnar_serializer.wrt.write_all(buf)
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prepare_key_bytes() {
let mut buffer: Vec<u8> = b"somegarbage".to_vec();
prepare_key(b"root\0child", ColumnType::Str, &mut buffer);
assert_eq!(buffer.len(), 12);
assert_eq!(&buffer[..10], b"root0child");
assert_eq!(buffer[10], 0u8);
assert_eq!(buffer[11], ColumnType::Str.to_code());
}
}

View File

@@ -1,4 +1,4 @@
use common::json_path_writer::JSON_PATH_SEGMENT_SEP; use common::json_path_writer::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
use common::{replace_in_place, JsonPathWriter}; use common::{replace_in_place, JsonPathWriter};
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
@@ -83,6 +83,9 @@ fn index_json_object<'a, V: Value<'a>>(
positions_per_path: &mut IndexingPositionsPerPath, positions_per_path: &mut IndexingPositionsPerPath,
) { ) {
for (json_path_segment, json_value_visitor) in json_visitor { for (json_path_segment, json_value_visitor) in json_visitor {
if json_path_segment.as_bytes().contains(&JSON_END_OF_PATH) {
continue;
}
json_path_writer.push(json_path_segment); json_path_writer.push(json_path_segment);
index_json_value( index_json_value(
doc, doc,

View File

@@ -815,8 +815,9 @@ mod tests {
use crate::indexer::NoMergePolicy; use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery}; use crate::query::{QueryParser, TermQuery};
use crate::schema::{ use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, NumericOptions, self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED, STRING, TEXT, NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED,
STRING, TEXT,
}; };
use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{ use crate::{
@@ -2378,11 +2379,11 @@ mod tests {
#[test] #[test]
fn test_bug_1617_2() { fn test_bug_1617_2() {
assert!(test_operation_strategy( test_operation_strategy(
&[ &[
IndexingOp::AddDoc { IndexingOp::AddDoc {
id: 13, id: 13,
value: Default::default() value: Default::default(),
}, },
IndexingOp::DeleteDoc { id: 13 }, IndexingOp::DeleteDoc { id: 13 },
IndexingOp::Commit, IndexingOp::Commit,
@@ -2390,9 +2391,9 @@ mod tests {
IndexingOp::Commit, IndexingOp::Commit,
IndexingOp::Merge, IndexingOp::Merge,
], ],
true true,
) )
.is_ok()); .unwrap();
} }
#[test] #[test]
@@ -2490,4 +2491,46 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn test_bug_2442_reserved_character_fast_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST | TEXT);
let schema = schema_builder.build();
let index = Index::builder().schema(schema).create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
index_writer
.add_document(doc!(
json_field=>json!({"\u{0000}B":"1"})
))
.unwrap();
index_writer
.add_document(doc!(
json_field=>json!({" A":"1"})
))
.unwrap();
index_writer.commit()?;
Ok(())
}
#[test]
fn test_bug_2442_reserved_character_columnar() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let options = JsonObjectOptions::from(FAST).set_expand_dots_enabled();
let field = schema_builder.add_json_field("json", options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(field=>json!({"\u{0000}": "A"})))
.unwrap();
index_writer
.add_document(doc!(field=>json!({format!("\u{0000}\u{0000}"): "A"})))
.unwrap();
index_writer.commit().unwrap();
Ok(())
}
} }

View File

@@ -145,15 +145,27 @@ mod tests_mmap {
} }
} }
#[test] #[test]
fn test_json_field_null_byte() { fn test_json_field_null_byte_is_ignored() {
// Test when field name contains a zero byte, which has special meaning in tantivy. let mut schema_builder = Schema::builder();
// As a workaround, we convert the zero byte to the ASCII character '0'. let options = JsonObjectOptions::from(TEXT | FAST).set_expand_dots_enabled();
// https://github.com/quickwit-oss/tantivy/issues/2340 let field = schema_builder.add_json_field("json", options);
// https://github.com/quickwit-oss/tantivy/issues/2193 let index = Index::create_in_ram(schema_builder.build());
let field_name_in = "\u{0000}"; let mut index_writer = index.writer_for_tests().unwrap();
let field_name_out = "0"; index_writer
test_json_field_name(field_name_in, field_name_out); .add_document(doc!(field=>json!({"key": "test1", "invalidkey\u{0000}": "test2"})))
.unwrap();
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
let inv_indexer = segment_reader.inverted_index(field).unwrap();
let term_dict = inv_indexer.terms();
assert_eq!(term_dict.num_terms(), 1);
let mut term_bytes = Vec::new();
term_dict.ord_to_term(0, &mut term_bytes).unwrap();
assert_eq!(term_bytes, b"key\0stest1");
} }
#[test] #[test]
fn test_json_field_1byte() { fn test_json_field_1byte() {
// Test when field name contains a '1' byte, which has special meaning in tantivy. // Test when field name contains a '1' byte, which has special meaning in tantivy.

View File

@@ -38,7 +38,8 @@ impl PathToUnorderedId {
#[cold] #[cold]
fn insert_new_path(&mut self, path: &str) -> u32 { fn insert_new_path(&mut self, path: &str) -> u32 {
let next_id = self.map.len() as u32; let next_id = self.map.len() as u32;
self.map.insert(path.to_string(), next_id); let new_path = path.to_string();
self.map.insert(new_path, next_id);
next_id next_id
} }

View File

@@ -59,7 +59,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
/// The actual serialization format is handled by the `PostingsSerializer`. /// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize( fn serialize(
&self, &self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)], ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str], ordered_id_to_path: &[&str],
ctx: &IndexingContext, ctx: &IndexingContext,
serializer: &mut FieldSerializer, serializer: &mut FieldSerializer,
@@ -69,7 +69,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0)); term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
let mut prev_term_id = u32::MAX; let mut prev_term_id = u32::MAX;
let mut term_path_len = 0; // this will be set in the first iteration let mut term_path_len = 0; // this will be set in the first iteration
for (_field, path_id, term, addr) in term_addrs { for (_field, path_id, term, addr) in ordered_term_addrs {
if prev_term_id != path_id.path_id() { if prev_term_id != path_id.path_id() {
term_buffer.truncate_value_bytes(0); term_buffer.truncate_value_bytes(0);
term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes()); term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes());

View File

@@ -249,12 +249,8 @@ impl Term {
#[inline] #[inline]
pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] { pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len(); let len_before = self.0.len();
if bytes.contains(&0u8) { assert!(!bytes.contains(&JSON_END_OF_PATH));
self.0 self.0.extend_from_slice(bytes);
.extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
self.0.extend_from_slice(bytes);
}
&mut self.0[len_before..] &mut self.0[len_before..]
} }
} }