From e2dae2f43344e75b6d510ef1a3a2ee2168346d6f Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 25 Jun 2024 08:35:58 +0800 Subject: [PATCH] fix out of order bug --- columnar/src/columnar/writer/mod.rs | 4 +++- columnar/src/columnar/writer/serializer.rs | 10 +++------ src/indexer/index_writer.rs | 25 ++++++++++++++++++++++ src/indexer/path_to_unordered_id.rs | 11 +++++++++- src/schema/term.rs | 9 +++++--- 5 files changed, 47 insertions(+), 12 deletions(-) diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 239a7422d..44ff4875f 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -247,6 +247,7 @@ impl ColumnarWriter { } pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> { let mut serializer = ColumnarSerializer::new(wrt); + let mut columns: Vec<(&[u8], ColumnType, Addr)> = self .numerical_field_hash_map .iter() @@ -260,7 +261,7 @@ impl ColumnarWriter { columns.extend( self.bytes_field_hash_map .iter() - .map(|(term, addr)| (term, ColumnType::Bytes, addr)), + .map(|(column_name, addr)| (column_name, ColumnType::Bytes, addr)), ); columns.extend( self.str_field_hash_map @@ -282,6 +283,7 @@ impl ColumnarWriter { .iter() .map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)), ); + // TODO: replace JSON_END_OF_PATH with b'0' in columns columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type)); let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); diff --git a/columnar/src/columnar/writer/serializer.rs b/columnar/src/columnar/writer/serializer.rs index 394e61cd9..73919b40d 100644 --- a/columnar/src/columnar/writer/serializer.rs +++ b/columnar/src/columnar/writer/serializer.rs @@ -1,6 +1,7 @@ use std::io; use std::io::Write; +use common::json_path_writer::JSON_END_OF_PATH; use common::{BinarySerializable, CountingWriter}; use sstable::value::RangeValueWriter; use sstable::RangeSSTable; @@ -18,13 +19,8 @@ pub struct ColumnarSerializer { /// code. fn prepare_key(key: &[u8], column_type: ColumnType, buffer: &mut Vec) { buffer.clear(); - // Convert 0 bytes to '0' string, as 0 bytes are reserved for the end of the path. - if key.contains(&0u8) { - buffer.extend(key.iter().map(|&b| if b == 0 { b'0' } else { b })); - } else { - buffer.extend_from_slice(key); - } - buffer.push(0u8); + buffer.extend_from_slice(key); + buffer.push(JSON_END_OF_PATH); buffer.push(column_type.to_code()); } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 4792574d0..6290e80fe 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -2490,4 +2490,29 @@ mod tests { Ok(()) } + + #[test] + fn test_bug_2442() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let json_field = schema_builder.add_json_field("json", TEXT | FAST); + + let schema = schema_builder.build(); + let index = Index::builder().schema(schema).create_in_ram()?; + let mut index_writer = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + + index_writer + .add_document(doc!( + json_field=>json!({"\u{0000}B":"1"}) + )) + .unwrap(); + index_writer + .add_document(doc!( + json_field=>json!({" A":"1"}) + )) + .unwrap(); + index_writer.commit()?; + + Ok(()) + } } diff --git a/src/indexer/path_to_unordered_id.rs b/src/indexer/path_to_unordered_id.rs index 054654f94..72fe4bcb2 100644 --- a/src/indexer/path_to_unordered_id.rs +++ b/src/indexer/path_to_unordered_id.rs @@ -1,3 +1,5 @@ +use common::json_path_writer::JSON_END_OF_PATH; +use common::replace_in_place; use fnv::FnvHashMap; /// `Field` is represented by an unsigned 32-bit integer type. @@ -38,7 +40,14 @@ impl PathToUnorderedId { #[cold] fn insert_new_path(&mut self, path: &str) -> u32 { let next_id = self.map.len() as u32; - self.map.insert(path.to_string(), next_id); + let mut new_path = path.to_string(); + + // The unsafe below is safe as long as b'.' and JSON_PATH_SEGMENT_SEP are + // valid single byte ut8 strings. + // By utf-8 design, they cannot be part of another codepoint. + unsafe { replace_in_place(JSON_END_OF_PATH, b'0', new_path.as_bytes_mut()) }; + + self.map.insert(new_path, next_id); next_id } diff --git a/src/schema/term.rs b/src/schema/term.rs index b4d0d6288..c8cdcdbba 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -249,9 +249,12 @@ impl Term { #[inline] pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] { let len_before = self.0.len(); - if bytes.contains(&0u8) { - self.0 - .extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b })); + if bytes.contains(&JSON_END_OF_PATH) { + self.0.extend( + bytes + .iter() + .map(|&b| if b == JSON_END_OF_PATH { b'0' } else { b }), + ); } else { self.0.extend_from_slice(bytes); }