From 87b9f0678cf4768d04ec631e7cb4bc6045b1b82b Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 18 Apr 2024 23:38:21 +0800 Subject: [PATCH] split term and indexing term --- src/core/json_utils.rs | 21 ++- src/indexer/segment_writer.rs | 7 +- src/postings/json_postings_writer.rs | 50 +++--- src/postings/postings_writer.rs | 17 +- src/schema/indexing_term.rs | 257 +++++++++++++++++++++++++++ src/schema/mod.rs | 1 + src/schema/term.rs | 57 ++---- 7 files changed, 328 insertions(+), 82 deletions(-) create mode 100644 src/schema/indexing_term.rs diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index d7ac29ad7..ae04cf43f 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -4,6 +4,7 @@ use rustc_hash::FxHashMap; use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter}; use crate::schema::document::{ReferenceValue, ReferenceValueLeaf, Value}; +use crate::schema::indexing_term::IndexingTerm; use crate::schema::{Field, Type}; use crate::time::format_description::well_known::Rfc3339; use crate::time::{OffsetDateTime, UtcOffset}; @@ -74,7 +75,7 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>( json_visitors: impl Iterator>, text_analyzer: &mut TextAnalyzer, expand_dots_enabled: bool, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, postings_writer: &mut dyn PostingsWriter, json_path_writer: &mut JsonPathWriter, ctx: &mut IndexingContext, @@ -103,7 +104,7 @@ fn index_json_object<'a, V: Value<'a>>( doc: DocId, json_visitor: V::ObjectIter, text_analyzer: &mut TextAnalyzer, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, @@ -130,17 +131,17 @@ fn index_json_value<'a, V: Value<'a>>( doc: DocId, json_value: V, text_analyzer: &mut TextAnalyzer, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, positions_per_path: &mut IndexingPositionsPerPath, ) { - let set_path_id = |term_buffer: &mut Term, unordered_id: u32| { + let set_path_id = |term_buffer: &mut IndexingTerm, unordered_id: u32| { term_buffer.truncate_value_bytes(0); term_buffer.append_bytes(&unordered_id.to_be_bytes()); }; - let set_type = |term_buffer: &mut Term, typ: Type| { + let set_type = |term_buffer: &mut IndexingTerm, typ: Type| { term_buffer.append_bytes(&[typ.to_code()]); }; @@ -211,18 +212,16 @@ fn index_json_value<'a, V: Value<'a>>( postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::PreTokStr(_) => { - unimplemented!( - "Pre-tokenized string support in dynamic fields is not yet implemented" - ) + unimplemented!("Pre-tokenized string support in JSON fields is not yet implemented") } ReferenceValueLeaf::Bytes(_) => { - unimplemented!("Bytes support in dynamic fields is not yet implemented") + unimplemented!("Bytes support in JSON fields is not yet implemented") } ReferenceValueLeaf::Facet(_) => { - unimplemented!("Facet support in dynamic fields is not yet implemented") + unimplemented!("Facet support in JSON fields is not yet implemented") } ReferenceValueLeaf::IpAddr(_) => { - unimplemented!("IP address support in dynamic fields is not yet implemented") + unimplemented!("IP address support in JSON fields is not yet implemented") } }, ReferenceValue::Array(elements) => { diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 384a939e6..43cdf5fc5 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -15,7 +15,8 @@ use crate::postings::{ PerFieldPostingsWriter, PostingsWriter, }; use crate::schema::document::{Document, ReferenceValue, Value}; -use crate::schema::{FieldEntry, FieldType, Schema, Term, DATE_TIME_PRECISION_INDEXED}; +use crate::schema::indexing_term::IndexingTerm; +use crate::schema::{FieldEntry, FieldType, Schema, DATE_TIME_PRECISION_INDEXED}; use crate::store::{StoreReader, StoreWriter}; use crate::tokenizer::{FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer}; use crate::{DocId, Opstamp, SegmentComponent, TantivyError}; @@ -70,7 +71,7 @@ pub struct SegmentWriter { pub(crate) json_path_writer: JsonPathWriter, pub(crate) doc_opstamps: Vec, per_field_text_analyzers: Vec, - term_buffer: Term, + term_buffer: IndexingTerm, schema: Schema, } @@ -126,7 +127,7 @@ impl SegmentWriter { )?, doc_opstamps: Vec::with_capacity(1_000), per_field_text_analyzers, - term_buffer: Term::with_capacity(16), + term_buffer: IndexingTerm::with_capacity(16), schema, }) } diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index ed3d5c24f..70e35b9c1 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -8,9 +8,10 @@ use crate::indexer::path_to_unordered_id::OrderedPathId; use crate::postings::postings_writer::SpecializedPostingsWriter; use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder}; use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter}; +use crate::schema::indexing_term::{IndexingTerm, ValueBytes}; use crate::schema::{Field, Type}; use crate::tokenizer::TokenStream; -use crate::{DocId, Term}; +use crate::DocId; /// The `JsonPostingsWriter` is odd in that it relies on a hidden contract: /// @@ -34,7 +35,7 @@ impl PostingsWriter for JsonPostingsWriter { &mut self, doc: crate::DocId, pos: u32, - term: &crate::Term, + term: &IndexingTerm, ctx: &mut IndexingContext, ) { self.non_str_posting_writer.subscribe(doc, pos, term, ctx); @@ -44,7 +45,7 @@ impl PostingsWriter for JsonPostingsWriter { &mut self, doc_id: DocId, token_stream: &mut dyn TokenStream, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, ctx: &mut IndexingContext, indexing_position: &mut IndexingPosition, ) { @@ -66,7 +67,7 @@ impl PostingsWriter for JsonPostingsWriter { ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()> { - let mut term_buffer = Term::with_capacity(48); + let mut term_buffer = IndexingTerm::with_capacity(48); let mut buffer_lender = BufferLender::default(); term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0)); let mut prev_term_id = u32::MAX; @@ -81,27 +82,26 @@ impl PostingsWriter for JsonPostingsWriter { } term_buffer.truncate_value_bytes(term_path_len); term_buffer.append_bytes(term); - if let Some(json_value) = term_buffer.value().as_json_value_bytes() { - let typ = json_value.typ(); - if typ == Type::Str { - SpecializedPostingsWriter::::serialize_one_term( - term_buffer.serialized_value_bytes(), - *addr, - doc_id_map, - &mut buffer_lender, - ctx, - serializer, - )?; - } else { - SpecializedPostingsWriter::::serialize_one_term( - term_buffer.serialized_value_bytes(), - *addr, - doc_id_map, - &mut buffer_lender, - ctx, - serializer, - )?; - } + let json_value = ValueBytes::wrap(term); + let typ = json_value.typ(); + if typ == Type::Str { + SpecializedPostingsWriter::::serialize_one_term( + term_buffer.serialized_value_bytes(), + *addr, + doc_id_map, + &mut buffer_lender, + ctx, + serializer, + )?; + } else { + SpecializedPostingsWriter::::serialize_one_term( + term_buffer.serialized_value_bytes(), + *addr, + doc_id_map, + &mut buffer_lender, + ctx, + serializer, + )?; } } Ok(()) diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 264392889..f95524f65 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -11,7 +11,8 @@ use crate::postings::recorder::{BufferLender, Recorder}; use crate::postings::{ FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter, }; -use crate::schema::{Field, Schema, Term, Type}; +use crate::schema::indexing_term::IndexingTerm; +use crate::schema::{Field, Schema, Type}; use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN}; use crate::DocId; @@ -60,7 +61,7 @@ pub(crate) fn serialize_postings( let mut term_offsets: Vec<(Field, OrderedPathId, &[u8], Addr)> = Vec::with_capacity(ctx.term_index.len()); term_offsets.extend(ctx.term_index.iter().map(|(key, addr)| { - let field = Term::wrap(key).field(); + let field = IndexingTerm::wrap(key).field(); if schema.get_field_entry(field).field_type().value_type() == Type::Json { let byte_range_path = 5..5 + 4; let unordered_id = u32::from_be_bytes(key[byte_range_path.clone()].try_into().unwrap()); @@ -114,7 +115,7 @@ pub(crate) trait PostingsWriter: Send + Sync { /// * term - the term /// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list /// information. - fn subscribe(&mut self, doc: DocId, pos: u32, term: &Term, ctx: &mut IndexingContext); + fn subscribe(&mut self, doc: DocId, pos: u32, term: &IndexingTerm, ctx: &mut IndexingContext); /// Serializes the postings on disk. /// The actual serialization format is handled by the `PostingsSerializer`. @@ -132,7 +133,7 @@ pub(crate) trait PostingsWriter: Send + Sync { &mut self, doc_id: DocId, token_stream: &mut dyn TokenStream, - term_buffer: &mut Term, + term_buffer: &mut IndexingTerm, ctx: &mut IndexingContext, indexing_position: &mut IndexingPosition, ) { @@ -203,7 +204,13 @@ impl SpecializedPostingsWriter { impl PostingsWriter for SpecializedPostingsWriter { #[inline] - fn subscribe(&mut self, doc: DocId, position: u32, term: &Term, ctx: &mut IndexingContext) { + fn subscribe( + &mut self, + doc: DocId, + position: u32, + term: &IndexingTerm, + ctx: &mut IndexingContext, + ) { debug_assert!(term.serialized_term().len() >= 4); self.total_num_tokens += 1; let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena); diff --git a/src/schema/indexing_term.rs b/src/schema/indexing_term.rs new file mode 100644 index 000000000..5573d1239 --- /dev/null +++ b/src/schema/indexing_term.rs @@ -0,0 +1,257 @@ +use std::hash::{Hash, Hasher}; +use std::net::Ipv6Addr; + +use columnar::{MonotonicallyMappableToU128, MonotonicallyMappableToU64}; + +use super::date_time_options::DATE_TIME_PRECISION_INDEXED; +use super::Field; +use crate::fastfield::FastValue; +use crate::schema::Type; +use crate::DateTime; + +/// Term represents the value that the token can take. +/// It's a serialized representation over different types. +/// +/// It actually wraps a `Vec`. The first 5 bytes are metadata. +/// 4 bytes are the field id, and the last byte is the type. +/// +/// The serialized value `ValueBytes` is considered everything after the 4 first bytes (term id). +#[derive(Clone)] +pub struct IndexingTerm>(B) +where + B: AsRef<[u8]>; + +/// The number of bytes used as metadata by `Term`. +const TERM_METADATA_LENGTH: usize = 5; + +impl IndexingTerm { + /// Create a new Term with a buffer with a given capacity. + pub fn with_capacity(capacity: usize) -> IndexingTerm { + let mut data = Vec::with_capacity(TERM_METADATA_LENGTH + capacity); + data.resize(TERM_METADATA_LENGTH, 0u8); + IndexingTerm(data) + } + + /// Panics when the term is not empty... ie: some value is set. + /// Use `clear_with_field_and_type` in that case. + /// + /// Sets field and the type. + pub(crate) fn set_field_and_type(&mut self, field: Field, typ: Type) { + assert!(self.is_empty()); + self.0[0..4].clone_from_slice(field.field_id().to_be_bytes().as_ref()); + self.0[4] = typ.to_code(); + } + + /// Is empty if there are no value bytes. + pub fn is_empty(&self) -> bool { + self.0.len() == TERM_METADATA_LENGTH + } + + /// Removes the value_bytes and set the field and type code. + pub(crate) fn clear_with_field_and_type(&mut self, typ: Type, field: Field) { + self.truncate_value_bytes(0); + self.set_field_and_type(field, typ); + } + + /// Sets a u64 value in the term. + /// + /// U64 are serialized using (8-byte) BigEndian + /// representation. + /// The use of BigEndian has the benefit of preserving + /// the natural order of the values. + pub fn set_u64(&mut self, val: u64) { + self.set_fast_value(val); + } + + /// Sets a `i64` value in the term. + pub fn set_i64(&mut self, val: i64) { + self.set_fast_value(val); + } + + /// Sets a `f64` value in the term. + pub fn set_f64(&mut self, val: f64) { + self.set_fast_value(val); + } + + /// Sets a `bool` value in the term. + pub fn set_bool(&mut self, val: bool) { + self.set_fast_value(val); + } + + fn set_fast_value(&mut self, val: T) { + self.set_bytes(val.to_u64().to_be_bytes().as_ref()); + } + + /// Append a type marker + fast value to a term. + /// This is used in JSON type to append a fast value after the path. + /// + /// It will not clear existing bytes. + pub(crate) fn append_type_and_fast_value(&mut self, val: T) { + self.0.push(T::to_type().to_code()); + let value = if T::to_type() == Type::Date { + DateTime::from_u64(val.to_u64()) + .truncate(DATE_TIME_PRECISION_INDEXED) + .to_u64() + } else { + val.to_u64() + }; + self.0.extend(value.to_be_bytes().as_ref()); + } + + /// Sets a `Ipv6Addr` value in the term. + pub fn set_ip_addr(&mut self, val: Ipv6Addr) { + self.set_bytes(val.to_u128().to_be_bytes().as_ref()); + } + + /// Sets the value of a `Bytes` field. + pub fn set_bytes(&mut self, bytes: &[u8]) { + self.truncate_value_bytes(0); + self.0.extend(bytes); + } + + /// Truncates the value bytes of the term. Value and field type stays the same. + pub fn truncate_value_bytes(&mut self, len: usize) { + self.0.truncate(len + TERM_METADATA_LENGTH); + } + + /// The length of the bytes. + pub fn len_bytes(&self) -> usize { + self.0.len() - TERM_METADATA_LENGTH + } + + /// Appends value bytes to the Term. + /// + /// This function returns the segment that has just been added. + #[inline] + pub fn append_bytes(&mut self, bytes: &[u8]) -> &mut [u8] { + let len_before = self.0.len(); + self.0.extend_from_slice(bytes); + &mut self.0[len_before..] + } + + /// Appends json path bytes to the Term. + /// If the path contains 0 bytes, they are replaced by a "0" string. + /// The 0 byte is used to mark the end of the path. + /// + /// This function returns the segment that has just been added. + #[inline] + pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] { + let len_before = self.0.len(); + if bytes.contains(&0u8) { + self.0 + .extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b })); + } else { + self.0.extend_from_slice(bytes); + } + &mut self.0[len_before..] + } +} + +impl IndexingTerm +where + B: AsRef<[u8]>, +{ + /// Wraps a object holding bytes + pub fn wrap(data: B) -> IndexingTerm { + IndexingTerm(data) + } + + /// Returns the field. + pub fn field(&self) -> Field { + let field_id_bytes: [u8; 4] = (&self.0.as_ref()[..4]).try_into().unwrap(); + Field::from_field_id(u32::from_be_bytes(field_id_bytes)) + } + + /// Returns the serialized representation of the value. + /// (this does neither include the field id nor the value type.) + /// + /// If the term is a string, its value is utf-8 encoded. + /// If the term is a u64, its value is encoded according + /// to `byteorder::BigEndian`. + pub fn serialized_value_bytes(&self) -> &[u8] { + &self.0.as_ref()[TERM_METADATA_LENGTH..] + } + + /// Returns the serialized representation of Term. + /// This includes field_id, value type and value. + /// + /// Do NOT rely on this byte representation in the index. + /// This value is likely to change in the future. + #[inline] + pub fn serialized_term(&self) -> &[u8] { + self.0.as_ref() + } +} + +/// ValueBytes represents a serialized value. +/// The value can be of any type of [`Type`] (e.g. string, u64, f64, bool, date, JSON). +/// The serialized representation matches the lexographical order of the type. +/// +/// The `ValueBytes` format is as follow: +/// `[type code: u8][serialized value]` +/// +/// For JSON `ValueBytes` equals to: +/// `[type code=JSON][JSON path][JSON_END_OF_PATH][ValueBytes]` +/// +/// The nested ValueBytes in JSON is never of type JSON. (there's no recursion) +#[derive(Clone)] +pub struct ValueBytes(B) +where + B: AsRef<[u8]>; + +impl ValueBytes +where + B: AsRef<[u8]>, +{ + /// Wraps a object holding bytes + pub fn wrap(data: B) -> ValueBytes { + ValueBytes(data) + } + + fn typ_code(&self) -> u8 { + self.0.as_ref()[0] + } + + /// Return the type of the term. + pub fn typ(&self) -> Type { + Type::from_code(self.typ_code()).expect("The term has an invalid type code") + } +} + +impl Ord for IndexingTerm +where + B: AsRef<[u8]>, +{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.serialized_term().cmp(other.serialized_term()) + } +} + +impl PartialOrd for IndexingTerm +where + B: AsRef<[u8]>, +{ + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for IndexingTerm +where + B: AsRef<[u8]>, +{ + fn eq(&self, other: &Self) -> bool { + self.serialized_term() == other.serialized_term() + } +} + +impl Eq for IndexingTerm where B: AsRef<[u8]> {} + +impl Hash for IndexingTerm +where + B: AsRef<[u8]>, +{ + fn hash(&self, state: &mut H) { + self.0.as_ref().hash(state) + } +} diff --git a/src/schema/mod.rs b/src/schema/mod.rs index b4c3b037e..56ceb86b7 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -109,6 +109,7 @@ pub mod document; mod facet; mod facet_options; +pub(crate) mod indexing_term; mod schema; pub(crate) mod term; diff --git a/src/schema/term.rs b/src/schema/term.rs index 3ac5d0ac4..05d0935d5 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -20,7 +20,8 @@ use crate::DateTime; /// The serialized value `ValueBytes` is considered everything after the 4 first bytes (term id). #[derive(Clone)] pub struct Term>(B) -where B: AsRef<[u8]>; +where + B: AsRef<[u8]>; /// The number of bytes used as metadata by `Term`. const TERM_METADATA_LENGTH: usize = 5; @@ -115,12 +116,6 @@ impl Term { Term::with_bytes_and_field_and_payload(Type::Bytes, field, bytes) } - /// Removes the value_bytes and set the field and type code. - pub(crate) fn clear_with_field_and_type(&mut self, typ: Type, field: Field) { - self.truncate_value_bytes(0); - self.set_field_and_type(field, typ); - } - /// Removes the value_bytes and set the type code. pub fn clear_with_type(&mut self, typ: Type) { self.truncate_value_bytes(0); @@ -202,11 +197,6 @@ impl Term { self.0.truncate(len + TERM_METADATA_LENGTH); } - /// The length of the bytes. - pub fn len_bytes(&self) -> usize { - self.0.len() - TERM_METADATA_LENGTH - } - /// Appends value bytes to the Term. /// /// This function returns the segment that has just been added. @@ -216,27 +206,11 @@ impl Term { self.0.extend_from_slice(bytes); &mut self.0[len_before..] } - - /// Appends json path bytes to the Term. - /// If the path contains 0 bytes, they are replaced by a "0" string. - /// The 0 byte is used to mark the end of the path. - /// - /// This function returns the segment that has just been added. - #[inline] - pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] { - let len_before = self.0.len(); - if bytes.contains(&0u8) { - self.0 - .extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b })); - } else { - self.0.extend_from_slice(bytes); - } - &mut self.0[len_before..] - } } impl Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { /// Wraps a object holding bytes pub fn wrap(data: B) -> Term { @@ -260,7 +234,7 @@ where B: AsRef<[u8]> /// If the term is a string, its value is utf-8 encoded. /// If the term is a u64, its value is encoded according /// to `byteorder::BigEndian`. - pub fn serialized_value_bytes(&self) -> &[u8] { + pub(crate) fn serialized_value_bytes(&self) -> &[u8] { &self.0.as_ref()[TERM_METADATA_LENGTH..] } @@ -294,10 +268,12 @@ where B: AsRef<[u8]> /// The nested ValueBytes in JSON is never of type JSON. (there's no recursion) #[derive(Clone)] pub struct ValueBytes(B) -where B: AsRef<[u8]>; +where + B: AsRef<[u8]>; impl ValueBytes -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { /// Wraps a object holding bytes pub fn wrap(data: B) -> ValueBytes { @@ -503,7 +479,8 @@ where B: AsRef<[u8]> } impl Ord for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.serialized_term().cmp(other.serialized_term()) @@ -511,7 +488,8 @@ where B: AsRef<[u8]> } impl PartialOrd for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -519,7 +497,8 @@ where B: AsRef<[u8]> } impl PartialEq for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn eq(&self, other: &Self) -> bool { self.serialized_term() == other.serialized_term() @@ -529,7 +508,8 @@ where B: AsRef<[u8]> impl Eq for Term where B: AsRef<[u8]> {} impl Hash for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn hash(&self, state: &mut H) { self.0.as_ref().hash(state) @@ -544,7 +524,8 @@ fn write_opt(f: &mut fmt::Formatter, val_opt: Option) -> } impl fmt::Debug for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let field_id = self.field().field_id();