diff --git a/columnar/benches/bench_first_vals.rs b/columnar/benches/bench_first_vals.rs index 0ce02ee6a..7b75bbc10 100644 --- a/columnar/benches/bench_first_vals.rs +++ b/columnar/benches/bench_first_vals.rs @@ -89,13 +89,6 @@ fn main() { black_box(sum); }); - group.register("first_block_fetch", |column| { - let mut block: Vec> = vec![None; 64]; - let fetch_docids = (0..64).collect::>(); - column.first_vals(&fetch_docids, &mut block); - black_box(block[0]); - }); - group.register("first_block_single_calls", |column| { let mut block: Vec> = vec![None; 64]; let fetch_docids = (0..64).collect::>(); diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index d40a3fcf6..ade65fa11 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -3,6 +3,7 @@ use common::json_path_writer::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP}; use common::{replace_in_place, JsonPathWriter}; use rustc_hash::FxHashMap; +use crate::indexer::indexing_term::IndexingTerm; use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter}; use crate::schema::document::{ReferenceValue, ReferenceValueLeaf, Value}; use crate::schema::{Type, DATE_TIME_PRECISION_INDEXED}; @@ -77,7 +78,7 @@ fn index_json_object<'a, V: Value<'a>>( doc: DocId, json_visitor: V::ObjectIter, text_analyzer: &mut TextAnalyzer, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, @@ -107,17 +108,17 @@ pub(crate) fn index_json_value<'a, V: Value<'a>>( doc: DocId, json_value: V, text_analyzer: &mut TextAnalyzer, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, positions_per_path: &mut IndexingPositionsPerPath, ) { - let set_path_id = |term_buffer: &mut Term, unordered_id: u32| { + let set_path_id = |term_buffer: &mut IndexingTerm, unordered_id: u32| { term_buffer.truncate_value_bytes(0); term_buffer.append_bytes(&unordered_id.to_be_bytes()); }; - let set_type = |term_buffer: &mut Term, typ: Type| { + let set_type = |term_buffer: &mut IndexingTerm, typ: Type| { term_buffer.append_bytes(&[typ.to_code()]); }; diff --git a/src/indexer/indexing_term.rs b/src/indexer/indexing_term.rs new file mode 100644 index 000000000..e4ca2b219 --- /dev/null +++ b/src/indexer/indexing_term.rs @@ -0,0 +1,184 @@ +use std::net::Ipv6Addr; + +use columnar::MonotonicallyMappableToU128; + +use crate::fastfield::FastValue; +use crate::schema::{Field, Type}; + +/// Term represents the value that the token can take. +/// It's a serialized representation over different types. +/// +/// It actually wraps a `Vec`. The first 5 bytes are metadata. +/// 4 bytes are the field id, and the last byte is the type. +/// +/// The serialized value `ValueBytes` is considered everything after the 4 first bytes (term id). +#[derive(Clone)] +pub(crate) struct IndexingTerm>(B) +where B: AsRef<[u8]>; + +/// The number of bytes used as metadata by `Term`. +const TERM_METADATA_LENGTH: usize = 5; + +impl IndexingTerm { + /// Create a new Term with a buffer with a given capacity. + pub fn with_capacity(capacity: usize) -> IndexingTerm { + let mut data = Vec::with_capacity(TERM_METADATA_LENGTH + capacity); + data.resize(TERM_METADATA_LENGTH, 0u8); + IndexingTerm(data) + } + + /// Panics when the term is not empty... ie: some value is set. + /// Use `clear_with_field_and_type` in that case. + /// + /// Sets field and the type. + pub(crate) fn set_field_and_type(&mut self, field: Field, typ: Type) { + assert!(self.is_empty()); + self.0[0..4].clone_from_slice(field.field_id().to_be_bytes().as_ref()); + self.0[4] = typ.to_code(); + } + + /// Is empty if there are no value bytes. + pub fn is_empty(&self) -> bool { + self.0.len() == TERM_METADATA_LENGTH + } + + /// Removes the value_bytes and set the field and type code. + pub(crate) fn clear_with_field_and_type(&mut self, typ: Type, field: Field) { + self.truncate_value_bytes(0); + self.set_field_and_type(field, typ); + } + + /// Sets a u64 value in the term. + /// + /// U64 are serialized using (8-byte) BigEndian + /// representation. + /// The use of BigEndian has the benefit of preserving + /// the natural order of the values. + pub fn set_u64(&mut self, val: u64) { + self.set_fast_value(val); + } + + /// Sets a `i64` value in the term. + pub fn set_i64(&mut self, val: i64) { + self.set_fast_value(val); + } + + /// Sets a `f64` value in the term. + pub fn set_f64(&mut self, val: f64) { + self.set_fast_value(val); + } + + /// Sets a `bool` value in the term. + pub fn set_bool(&mut self, val: bool) { + self.set_fast_value(val); + } + + fn set_fast_value(&mut self, val: T) { + self.set_bytes(val.to_u64().to_be_bytes().as_ref()); + } + + /// Append a type marker + fast value to a term. + /// This is used in JSON type to append a fast value after the path. + /// + /// It will not clear existing bytes. + pub fn append_type_and_fast_value(&mut self, val: T) { + self.0.push(T::to_type().to_code()); + let value = val.to_u64(); + self.0.extend(value.to_be_bytes().as_ref()); + } + + /// Sets a `Ipv6Addr` value in the term. + pub fn set_ip_addr(&mut self, val: Ipv6Addr) { + self.set_bytes(val.to_u128().to_be_bytes().as_ref()); + } + + /// Sets the value of a `Bytes` field. + pub fn set_bytes(&mut self, bytes: &[u8]) { + self.truncate_value_bytes(0); + self.0.extend(bytes); + } + + /// Truncates the value bytes of the term. Value and field type stays the same. + pub fn truncate_value_bytes(&mut self, len: usize) { + self.0.truncate(len + TERM_METADATA_LENGTH); + } + + /// The length of the bytes. + pub fn len_bytes(&self) -> usize { + self.0.len() - TERM_METADATA_LENGTH + } + + /// Appends value bytes to the Term. + /// + /// This function returns the segment that has just been added. + #[inline] + pub fn append_bytes(&mut self, bytes: &[u8]) -> &mut [u8] { + let len_before = self.0.len(); + self.0.extend_from_slice(bytes); + &mut self.0[len_before..] + } +} + +impl IndexingTerm +where B: AsRef<[u8]> +{ + /// Returns the serialized representation of Term. + /// This includes field_id, value type and value. + /// + /// Do NOT rely on this byte representation in the index. + /// This value is likely to change in the future. + #[inline] + pub fn serialized_term(&self) -> &[u8] { + self.0.as_ref() + } +} + +#[cfg(test)] +mod tests { + + use crate::schema::*; + + #[test] + pub fn test_term_str() { + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("text", STRING); + let title_field = schema_builder.add_text_field("title", STRING); + let term = Term::from_field_text(title_field, "test"); + assert_eq!(term.field(), title_field); + assert_eq!(term.typ(), Type::Str); + assert_eq!(term.value().as_str(), Some("test")) + } + + /// Size (in bytes) of the buffer of a fast value (u64, i64, f64, or date) term. + /// + + + /// + /// - is a big endian encoded u32 field id + /// - 's most significant bit expresses whether the term is a json term or not The + /// remaining 7 bits are used to encode the type of the value. If this is a JSON term, the + /// type is the type of the leaf of the json. + /// - is, if this is not the json term, a binary representation specific to the type. + /// If it is a JSON Term, then it is prepended with the path that leads to this leaf value. + const FAST_VALUE_TERM_LEN: usize = 4 + 1 + 8; + + #[test] + pub fn test_term_u64() { + let mut schema_builder = Schema::builder(); + let count_field = schema_builder.add_u64_field("count", INDEXED); + let term = Term::from_field_u64(count_field, 983u64); + assert_eq!(term.field(), count_field); + assert_eq!(term.typ(), Type::U64); + assert_eq!(term.serialized_term().len(), FAST_VALUE_TERM_LEN); + assert_eq!(term.value().as_u64(), Some(983u64)) + } + + #[test] + pub fn test_term_bool() { + let mut schema_builder = Schema::builder(); + let bool_field = schema_builder.add_bool_field("bool", INDEXED); + let term = Term::from_field_bool(bool_field, true); + assert_eq!(term.field(), bool_field); + assert_eq!(term.typ(), Type::Bool); + assert_eq!(term.serialized_term().len(), FAST_VALUE_TERM_LEN); + assert_eq!(term.value().as_bool(), Some(true)) + } +} diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index b7b1b2340..687a6b119 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -12,6 +12,7 @@ mod doc_opstamp_mapping; mod flat_map_with_buffer; pub(crate) mod index_writer; pub(crate) mod index_writer_status; +pub(crate) mod indexing_term; mod log_merge_policy; mod merge_index_test; mod merge_operation; diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index ad2ffa067..cfeea1177 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -7,6 +7,7 @@ use super::operation::AddOperation; use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::index::{Segment, SegmentComponent}; +use crate::indexer::indexing_term::IndexingTerm; use crate::indexer::segment_serializer::SegmentSerializer; use crate::json_utils::{index_json_value, IndexingPositionsPerPath}; use crate::postings::{ @@ -14,7 +15,7 @@ use crate::postings::{ PerFieldPostingsWriter, PostingsWriter, }; use crate::schema::document::{Document, Value}; -use crate::schema::{FieldEntry, FieldType, Schema, Term, DATE_TIME_PRECISION_INDEXED}; +use crate::schema::{FieldEntry, FieldType, Schema, DATE_TIME_PRECISION_INDEXED}; use crate::tokenizer::{FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer}; use crate::{DocId, Opstamp, TantivyError}; @@ -55,7 +56,7 @@ pub struct SegmentWriter { pub(crate) json_positions_per_path: IndexingPositionsPerPath, pub(crate) doc_opstamps: Vec, per_field_text_analyzers: Vec, - term_buffer: Term, + term_buffer: IndexingTerm, schema: Schema, } @@ -112,7 +113,7 @@ impl SegmentWriter { )?, doc_opstamps: Vec::with_capacity(1_000), per_field_text_analyzers, - term_buffer: Term::with_capacity(16), + term_buffer: IndexingTerm::with_capacity(16), schema, }) } diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index 6d4e8addc..477b73ba6 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -3,13 +3,14 @@ use std::io; use common::json_path_writer::JSON_END_OF_PATH; use stacker::Addr; +use crate::indexer::indexing_term::IndexingTerm; use crate::indexer::path_to_unordered_id::OrderedPathId; use crate::postings::postings_writer::SpecializedPostingsWriter; use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder}; use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter}; -use crate::schema::{Field, Type}; +use crate::schema::{Field, Type, ValueBytes}; use crate::tokenizer::TokenStream; -use crate::{DocId, Term}; +use crate::DocId; /// The `JsonPostingsWriter` is odd in that it relies on a hidden contract: /// @@ -33,7 +34,7 @@ impl PostingsWriter for JsonPostingsWriter { &mut self, doc: crate::DocId, pos: u32, - term: &crate::Term, + term: &IndexingTerm, ctx: &mut IndexingContext, ) { self.non_str_posting_writer.subscribe(doc, pos, term, ctx); @@ -43,7 +44,7 @@ impl PostingsWriter for JsonPostingsWriter { &mut self, doc_id: DocId, token_stream: &mut dyn TokenStream, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, ctx: &mut IndexingContext, indexing_position: &mut IndexingPosition, ) { @@ -64,40 +65,38 @@ impl PostingsWriter for JsonPostingsWriter { ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()> { - let mut term_buffer = Term::with_capacity(48); + let mut term_buffer = JsonTermSerializer(Vec::with_capacity(48)); let mut buffer_lender = BufferLender::default(); - term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0)); let mut prev_term_id = u32::MAX; let mut term_path_len = 0; // this will be set in the first iteration for (_field, path_id, term, addr) in ordered_term_addrs { if prev_term_id != path_id.path_id() { - term_buffer.truncate_value_bytes(0); - term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes()); - term_buffer.append_bytes(&[JSON_END_OF_PATH]); - term_path_len = term_buffer.len_bytes(); + term_buffer.clear(); + term_buffer.append_json_path(ordered_id_to_path[path_id.path_id() as usize]); + term_path_len = term_buffer.len(); prev_term_id = path_id.path_id(); } - term_buffer.truncate_value_bytes(term_path_len); + term_buffer.truncate(term_path_len); term_buffer.append_bytes(term); - if let Some(json_value) = term_buffer.value().as_json_value_bytes() { - let typ = json_value.typ(); - if typ == Type::Str { - SpecializedPostingsWriter::::serialize_one_term( - term_buffer.serialized_value_bytes(), - *addr, - &mut buffer_lender, - ctx, - serializer, - )?; - } else { - SpecializedPostingsWriter::::serialize_one_term( - term_buffer.serialized_value_bytes(), - *addr, - &mut buffer_lender, - ctx, - serializer, - )?; - } + + let json_value = ValueBytes::wrap(term); + let typ = json_value.typ(); + if typ == Type::Str { + SpecializedPostingsWriter::::serialize_one_term( + term_buffer.as_bytes(), + *addr, + &mut buffer_lender, + ctx, + serializer, + )?; + } else { + SpecializedPostingsWriter::::serialize_one_term( + term_buffer.as_bytes(), + *addr, + &mut buffer_lender, + ctx, + serializer, + )?; } } Ok(()) @@ -107,3 +106,48 @@ impl PostingsWriter for JsonPostingsWriter { self.str_posting_writer.total_num_tokens() + self.non_str_posting_writer.total_num_tokens() } } + +struct JsonTermSerializer(Vec); +impl JsonTermSerializer { + /// Appends a JSON path to the Term. + /// The path is terminated by a special end-of-path 0 byte. + #[inline] + pub fn append_json_path(&mut self, path: &str) { + let bytes = path.as_bytes(); + // Replace any occurrence of the end-of-path byte with Ascii '0' byte. + if bytes.contains(&JSON_END_OF_PATH) { + self.0.extend( + bytes + .iter() + .map(|&b| if b == JSON_END_OF_PATH { b'0' } else { b }), + ); + } else { + self.0.extend_from_slice(bytes); + } + self.0.push(JSON_END_OF_PATH); + } + + /// Appends value bytes to the Term. + /// + /// This function returns the segment that has just been added. + #[inline] + pub fn append_bytes(&mut self, bytes: &[u8]) -> &mut [u8] { + let len_before = self.0.len(); + self.0.extend_from_slice(bytes); + &mut self.0[len_before..] + } + + fn clear(&mut self) { + self.0.clear(); + } + fn truncate(&mut self, len: usize) { + self.0.truncate(len); + } + fn len(&self) -> usize { + self.0.len() + } + + fn as_bytes(&self) -> &[u8] { + &self.0 + } +} diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index fcbf55f98..1bf8262b9 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -5,6 +5,7 @@ use std::ops::Range; use stacker::Addr; use crate::fieldnorm::FieldNormReaders; +use crate::indexer::indexing_term::IndexingTerm; use crate::indexer::path_to_unordered_id::OrderedPathId; use crate::postings::recorder::{BufferLender, Recorder}; use crate::postings::{ @@ -111,7 +112,7 @@ pub(crate) trait PostingsWriter: Send + Sync { /// * term - the term /// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list /// information. - fn subscribe(&mut self, doc: DocId, pos: u32, term: &Term, ctx: &mut IndexingContext); + fn subscribe(&mut self, doc: DocId, pos: u32, term: &IndexingTerm, ctx: &mut IndexingContext); /// Serializes the postings on disk. /// The actual serialization format is handled by the `PostingsSerializer`. @@ -128,7 +129,7 @@ pub(crate) trait PostingsWriter: Send + Sync { &mut self, doc_id: DocId, token_stream: &mut dyn TokenStream, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, ctx: &mut IndexingContext, indexing_position: &mut IndexingPosition, ) { @@ -198,7 +199,13 @@ impl SpecializedPostingsWriter { impl PostingsWriter for SpecializedPostingsWriter { #[inline] - fn subscribe(&mut self, doc: DocId, position: u32, term: &Term, ctx: &mut IndexingContext) { + fn subscribe( + &mut self, + doc: DocId, + position: u32, + term: &IndexingTerm, + ctx: &mut IndexingContext, + ) { debug_assert!(term.serialized_term().len() >= 4); self.total_num_tokens += 1; let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena); diff --git a/src/schema/term.rs b/src/schema/term.rs index 0d6305665..2dd78b82a 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -95,7 +95,7 @@ impl Term { pub(crate) fn from_fast_value(field: Field, val: &T) -> Term { let mut term = Self::with_type_and_field(T::to_type(), field); - term.set_u64(val.to_u64()); + term.set_bytes(val.to_u64().to_be_bytes().as_ref()); term } @@ -117,7 +117,7 @@ impl Term { /// Builds a term given a field, and a `Ipv6Addr`-value pub fn from_field_ip_addr(field: Field, ip_addr: Ipv6Addr) -> Term { let mut term = Self::with_type_and_field(Type::IpAddr, field); - term.set_ip_addr(ip_addr); + term.set_bytes(ip_addr.to_u128().to_be_bytes().as_ref()); term } @@ -174,52 +174,12 @@ impl Term { Term::with_bytes_and_field_and_payload(Type::Bytes, field, bytes) } - /// Removes the value_bytes and set the field and type code. - pub(crate) fn clear_with_field_and_type(&mut self, typ: Type, field: Field) { - self.truncate_value_bytes(0); - self.set_field_and_type(field, typ); - } - /// Removes the value_bytes and set the type code. pub fn clear_with_type(&mut self, typ: Type) { self.truncate_value_bytes(0); self.0[4] = typ.to_code(); } - /// Sets a u64 value in the term. - /// - /// U64 are serialized using (8-byte) BigEndian - /// representation. - /// The use of BigEndian has the benefit of preserving - /// the natural order of the values. - pub fn set_u64(&mut self, val: u64) { - self.set_fast_value(val); - } - - /// Sets a `i64` value in the term. - pub fn set_i64(&mut self, val: i64) { - self.set_fast_value(val); - } - - /// Sets a `DateTime` value in the term. - pub fn set_date(&mut self, date: DateTime) { - self.set_fast_value(date); - } - - /// Sets a `f64` value in the term. - pub fn set_f64(&mut self, val: f64) { - self.set_fast_value(val); - } - - /// Sets a `bool` value in the term. - pub fn set_bool(&mut self, val: bool) { - self.set_fast_value(val); - } - - fn set_fast_value(&mut self, val: T) { - self.set_bytes(val.to_u64().to_be_bytes().as_ref()); - } - /// Append a type marker + fast value to a term. /// This is used in JSON type to append a fast value after the path. /// @@ -239,11 +199,6 @@ impl Term { self.0.extend(val.as_bytes().as_ref()); } - /// Sets a `Ipv6Addr` value in the term. - pub fn set_ip_addr(&mut self, val: Ipv6Addr) { - self.set_bytes(val.to_u128().to_be_bytes().as_ref()); - } - /// Sets the value of a `Bytes` field. pub fn set_bytes(&mut self, bytes: &[u8]) { self.truncate_value_bytes(0); @@ -269,19 +224,6 @@ impl Term { self.0.extend_from_slice(bytes); &mut self.0[len_before..] } - - /// Appends json path bytes to the Term. - /// If the path contains 0 bytes, they are replaced by a "0" string. - /// The 0 byte is used to mark the end of the path. - /// - /// This function returns the segment that has just been added. - #[inline] - pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] { - let len_before = self.0.len(); - assert!(!bytes.contains(&JSON_END_OF_PATH)); - self.0.extend_from_slice(bytes); - &mut self.0[len_before..] - } } impl Term