split term and indexing term

This commit is contained in:
Pascal Seitz
2024-04-18 23:38:21 +08:00
parent 0e9fced336
commit 87b9f0678c
7 changed files with 328 additions and 82 deletions

View File

@@ -4,6 +4,7 @@ use rustc_hash::FxHashMap;
use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter};
use crate::schema::document::{ReferenceValue, ReferenceValueLeaf, Value};
use crate::schema::indexing_term::IndexingTerm;
use crate::schema::{Field, Type};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::{OffsetDateTime, UtcOffset};
@@ -74,7 +75,7 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>(
json_visitors: impl Iterator<Item = crate::Result<V::ObjectIter>>,
text_analyzer: &mut TextAnalyzer,
expand_dots_enabled: bool,
term_buffer: &mut Term,
term_buffer: &mut IndexingTerm,
postings_writer: &mut dyn PostingsWriter,
json_path_writer: &mut JsonPathWriter,
ctx: &mut IndexingContext,
@@ -103,7 +104,7 @@ fn index_json_object<'a, V: Value<'a>>(
doc: DocId,
json_visitor: V::ObjectIter,
text_analyzer: &mut TextAnalyzer,
term_buffer: &mut Term,
term_buffer: &mut IndexingTerm,
json_path_writer: &mut JsonPathWriter,
postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext,
@@ -130,17 +131,17 @@ fn index_json_value<'a, V: Value<'a>>(
doc: DocId,
json_value: V,
text_analyzer: &mut TextAnalyzer,
term_buffer: &mut Term,
term_buffer: &mut IndexingTerm,
json_path_writer: &mut JsonPathWriter,
postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext,
positions_per_path: &mut IndexingPositionsPerPath,
) {
let set_path_id = |term_buffer: &mut Term, unordered_id: u32| {
let set_path_id = |term_buffer: &mut IndexingTerm, unordered_id: u32| {
term_buffer.truncate_value_bytes(0);
term_buffer.append_bytes(&unordered_id.to_be_bytes());
};
let set_type = |term_buffer: &mut Term, typ: Type| {
let set_type = |term_buffer: &mut IndexingTerm, typ: Type| {
term_buffer.append_bytes(&[typ.to_code()]);
};
@@ -211,18 +212,16 @@ fn index_json_value<'a, V: Value<'a>>(
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::PreTokStr(_) => {
unimplemented!(
"Pre-tokenized string support in dynamic fields is not yet implemented"
)
unimplemented!("Pre-tokenized string support in JSON fields is not yet implemented")
}
ReferenceValueLeaf::Bytes(_) => {
unimplemented!("Bytes support in dynamic fields is not yet implemented")
unimplemented!("Bytes support in JSON fields is not yet implemented")
}
ReferenceValueLeaf::Facet(_) => {
unimplemented!("Facet support in dynamic fields is not yet implemented")
unimplemented!("Facet support in JSON fields is not yet implemented")
}
ReferenceValueLeaf::IpAddr(_) => {
unimplemented!("IP address support in dynamic fields is not yet implemented")
unimplemented!("IP address support in JSON fields is not yet implemented")
}
},
ReferenceValue::Array(elements) => {

View File

@@ -15,7 +15,8 @@ use crate::postings::{
PerFieldPostingsWriter, PostingsWriter,
};
use crate::schema::document::{Document, ReferenceValue, Value};
use crate::schema::{FieldEntry, FieldType, Schema, Term, DATE_TIME_PRECISION_INDEXED};
use crate::schema::indexing_term::IndexingTerm;
use crate::schema::{FieldEntry, FieldType, Schema, DATE_TIME_PRECISION_INDEXED};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer};
use crate::{DocId, Opstamp, SegmentComponent, TantivyError};
@@ -70,7 +71,7 @@ pub struct SegmentWriter {
pub(crate) json_path_writer: JsonPathWriter,
pub(crate) doc_opstamps: Vec<Opstamp>,
per_field_text_analyzers: Vec<TextAnalyzer>,
term_buffer: Term,
term_buffer: IndexingTerm,
schema: Schema,
}
@@ -126,7 +127,7 @@ impl SegmentWriter {
)?,
doc_opstamps: Vec::with_capacity(1_000),
per_field_text_analyzers,
term_buffer: Term::with_capacity(16),
term_buffer: IndexingTerm::with_capacity(16),
schema,
})
}

View File

@@ -8,9 +8,10 @@ use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder};
use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter};
use crate::schema::indexing_term::{IndexingTerm, ValueBytes};
use crate::schema::{Field, Type};
use crate::tokenizer::TokenStream;
use crate::{DocId, Term};
use crate::DocId;
/// The `JsonPostingsWriter` is odd in that it relies on a hidden contract:
///
@@ -34,7 +35,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
&mut self,
doc: crate::DocId,
pos: u32,
term: &crate::Term,
term: &IndexingTerm,
ctx: &mut IndexingContext,
) {
self.non_str_posting_writer.subscribe(doc, pos, term, ctx);
@@ -44,7 +45,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
&mut self,
doc_id: DocId,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
term_buffer: &mut IndexingTerm,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
@@ -66,7 +67,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut term_buffer = Term::with_capacity(48);
let mut term_buffer = IndexingTerm::with_capacity(48);
let mut buffer_lender = BufferLender::default();
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
let mut prev_term_id = u32::MAX;
@@ -81,27 +82,26 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
}
term_buffer.truncate_value_bytes(term_path_len);
term_buffer.append_bytes(term);
if let Some(json_value) = term_buffer.value().as_json_value_bytes() {
let typ = json_value.typ();
if typ == Type::Str {
SpecializedPostingsWriter::<Rec>::serialize_one_term(
term_buffer.serialized_value_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,
)?;
} else {
SpecializedPostingsWriter::<DocIdRecorder>::serialize_one_term(
term_buffer.serialized_value_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,
)?;
}
let json_value = ValueBytes::wrap(term);
let typ = json_value.typ();
if typ == Type::Str {
SpecializedPostingsWriter::<Rec>::serialize_one_term(
term_buffer.serialized_value_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,
)?;
} else {
SpecializedPostingsWriter::<DocIdRecorder>::serialize_one_term(
term_buffer.serialized_value_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,
)?;
}
}
Ok(())

View File

@@ -11,7 +11,8 @@ use crate::postings::recorder::{BufferLender, Recorder};
use crate::postings::{
FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter,
};
use crate::schema::{Field, Schema, Term, Type};
use crate::schema::indexing_term::IndexingTerm;
use crate::schema::{Field, Schema, Type};
use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN};
use crate::DocId;
@@ -60,7 +61,7 @@ pub(crate) fn serialize_postings(
let mut term_offsets: Vec<(Field, OrderedPathId, &[u8], Addr)> =
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter().map(|(key, addr)| {
let field = Term::wrap(key).field();
let field = IndexingTerm::wrap(key).field();
if schema.get_field_entry(field).field_type().value_type() == Type::Json {
let byte_range_path = 5..5 + 4;
let unordered_id = u32::from_be_bytes(key[byte_range_path.clone()].try_into().unwrap());
@@ -114,7 +115,7 @@ pub(crate) trait PostingsWriter: Send + Sync {
/// * term - the term
/// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list
/// information.
fn subscribe(&mut self, doc: DocId, pos: u32, term: &Term, ctx: &mut IndexingContext);
fn subscribe(&mut self, doc: DocId, pos: u32, term: &IndexingTerm, ctx: &mut IndexingContext);
/// Serializes the postings on disk.
/// The actual serialization format is handled by the `PostingsSerializer`.
@@ -132,7 +133,7 @@ pub(crate) trait PostingsWriter: Send + Sync {
&mut self,
doc_id: DocId,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
term_buffer: &mut IndexingTerm,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
@@ -203,7 +204,13 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
#[inline]
fn subscribe(&mut self, doc: DocId, position: u32, term: &Term, ctx: &mut IndexingContext) {
fn subscribe(
&mut self,
doc: DocId,
position: u32,
term: &IndexingTerm,
ctx: &mut IndexingContext,
) {
debug_assert!(term.serialized_term().len() >= 4);
self.total_num_tokens += 1;
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);

257
src/schema/indexing_term.rs Normal file
View File

@@ -0,0 +1,257 @@
use std::hash::{Hash, Hasher};
use std::net::Ipv6Addr;
use columnar::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
use super::date_time_options::DATE_TIME_PRECISION_INDEXED;
use super::Field;
use crate::fastfield::FastValue;
use crate::schema::Type;
use crate::DateTime;
/// Term represents the value that the token can take.
/// It's a serialized representation over different types.
///
/// It actually wraps a `Vec<u8>`. The first 5 bytes are metadata.
/// 4 bytes are the field id, and the last byte is the type.
///
/// The serialized value `ValueBytes` is considered everything after the 4 first bytes (term id).
#[derive(Clone)]
pub struct IndexingTerm<B = Vec<u8>>(B)
where
B: AsRef<[u8]>;
/// The number of bytes used as metadata by `Term`.
const TERM_METADATA_LENGTH: usize = 5;
impl IndexingTerm {
/// Create a new Term with a buffer with a given capacity.
pub fn with_capacity(capacity: usize) -> IndexingTerm {
let mut data = Vec::with_capacity(TERM_METADATA_LENGTH + capacity);
data.resize(TERM_METADATA_LENGTH, 0u8);
IndexingTerm(data)
}
/// Panics when the term is not empty... ie: some value is set.
/// Use `clear_with_field_and_type` in that case.
///
/// Sets field and the type.
pub(crate) fn set_field_and_type(&mut self, field: Field, typ: Type) {
assert!(self.is_empty());
self.0[0..4].clone_from_slice(field.field_id().to_be_bytes().as_ref());
self.0[4] = typ.to_code();
}
/// Is empty if there are no value bytes.
pub fn is_empty(&self) -> bool {
self.0.len() == TERM_METADATA_LENGTH
}
/// Removes the value_bytes and set the field and type code.
pub(crate) fn clear_with_field_and_type(&mut self, typ: Type, field: Field) {
self.truncate_value_bytes(0);
self.set_field_and_type(field, typ);
}
/// Sets a u64 value in the term.
///
/// U64 are serialized using (8-byte) BigEndian
/// representation.
/// The use of BigEndian has the benefit of preserving
/// the natural order of the values.
pub fn set_u64(&mut self, val: u64) {
self.set_fast_value(val);
}
/// Sets a `i64` value in the term.
pub fn set_i64(&mut self, val: i64) {
self.set_fast_value(val);
}
/// Sets a `f64` value in the term.
pub fn set_f64(&mut self, val: f64) {
self.set_fast_value(val);
}
/// Sets a `bool` value in the term.
pub fn set_bool(&mut self, val: bool) {
self.set_fast_value(val);
}
fn set_fast_value<T: FastValue>(&mut self, val: T) {
self.set_bytes(val.to_u64().to_be_bytes().as_ref());
}
/// Append a type marker + fast value to a term.
/// This is used in JSON type to append a fast value after the path.
///
/// It will not clear existing bytes.
pub(crate) fn append_type_and_fast_value<T: FastValue>(&mut self, val: T) {
self.0.push(T::to_type().to_code());
let value = if T::to_type() == Type::Date {
DateTime::from_u64(val.to_u64())
.truncate(DATE_TIME_PRECISION_INDEXED)
.to_u64()
} else {
val.to_u64()
};
self.0.extend(value.to_be_bytes().as_ref());
}
/// Sets a `Ipv6Addr` value in the term.
pub fn set_ip_addr(&mut self, val: Ipv6Addr) {
self.set_bytes(val.to_u128().to_be_bytes().as_ref());
}
/// Sets the value of a `Bytes` field.
pub fn set_bytes(&mut self, bytes: &[u8]) {
self.truncate_value_bytes(0);
self.0.extend(bytes);
}
/// Truncates the value bytes of the term. Value and field type stays the same.
pub fn truncate_value_bytes(&mut self, len: usize) {
self.0.truncate(len + TERM_METADATA_LENGTH);
}
/// The length of the bytes.
pub fn len_bytes(&self) -> usize {
self.0.len() - TERM_METADATA_LENGTH
}
/// Appends value bytes to the Term.
///
/// This function returns the segment that has just been added.
#[inline]
pub fn append_bytes(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
self.0.extend_from_slice(bytes);
&mut self.0[len_before..]
}
/// Appends json path bytes to the Term.
/// If the path contains 0 bytes, they are replaced by a "0" string.
/// The 0 byte is used to mark the end of the path.
///
/// This function returns the segment that has just been added.
#[inline]
pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
if bytes.contains(&0u8) {
self.0
.extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
self.0.extend_from_slice(bytes);
}
&mut self.0[len_before..]
}
}
impl<B> IndexingTerm<B>
where
B: AsRef<[u8]>,
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> IndexingTerm<B> {
IndexingTerm(data)
}
/// Returns the field.
pub fn field(&self) -> Field {
let field_id_bytes: [u8; 4] = (&self.0.as_ref()[..4]).try_into().unwrap();
Field::from_field_id(u32::from_be_bytes(field_id_bytes))
}
/// Returns the serialized representation of the value.
/// (this does neither include the field id nor the value type.)
///
/// If the term is a string, its value is utf-8 encoded.
/// If the term is a u64, its value is encoded according
/// to `byteorder::BigEndian`.
pub fn serialized_value_bytes(&self) -> &[u8] {
&self.0.as_ref()[TERM_METADATA_LENGTH..]
}
/// Returns the serialized representation of Term.
/// This includes field_id, value type and value.
///
/// Do NOT rely on this byte representation in the index.
/// This value is likely to change in the future.
#[inline]
pub fn serialized_term(&self) -> &[u8] {
self.0.as_ref()
}
}
/// ValueBytes represents a serialized value.
/// The value can be of any type of [`Type`] (e.g. string, u64, f64, bool, date, JSON).
/// The serialized representation matches the lexographical order of the type.
///
/// The `ValueBytes` format is as follow:
/// `[type code: u8][serialized value]`
///
/// For JSON `ValueBytes` equals to:
/// `[type code=JSON][JSON path][JSON_END_OF_PATH][ValueBytes]`
///
/// The nested ValueBytes in JSON is never of type JSON. (there's no recursion)
#[derive(Clone)]
pub struct ValueBytes<B>(B)
where
B: AsRef<[u8]>;
impl<B> ValueBytes<B>
where
B: AsRef<[u8]>,
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> ValueBytes<B> {
ValueBytes(data)
}
fn typ_code(&self) -> u8 {
self.0.as_ref()[0]
}
/// Return the type of the term.
pub fn typ(&self) -> Type {
Type::from_code(self.typ_code()).expect("The term has an invalid type code")
}
}
impl<B> Ord for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.serialized_term().cmp(other.serialized_term())
}
}
impl<B> PartialOrd for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<B> PartialEq for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn eq(&self, other: &Self) -> bool {
self.serialized_term() == other.serialized_term()
}
}
impl<B> Eq for IndexingTerm<B> where B: AsRef<[u8]> {}
impl<B> Hash for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.as_ref().hash(state)
}
}

View File

@@ -109,6 +109,7 @@
pub mod document;
mod facet;
mod facet_options;
pub(crate) mod indexing_term;
mod schema;
pub(crate) mod term;

View File

@@ -20,7 +20,8 @@ use crate::DateTime;
/// The serialized value `ValueBytes` is considered everything after the 4 first bytes (term id).
#[derive(Clone)]
pub struct Term<B = Vec<u8>>(B)
where B: AsRef<[u8]>;
where
B: AsRef<[u8]>;
/// The number of bytes used as metadata by `Term`.
const TERM_METADATA_LENGTH: usize = 5;
@@ -115,12 +116,6 @@ impl Term {
Term::with_bytes_and_field_and_payload(Type::Bytes, field, bytes)
}
/// Removes the value_bytes and set the field and type code.
pub(crate) fn clear_with_field_and_type(&mut self, typ: Type, field: Field) {
self.truncate_value_bytes(0);
self.set_field_and_type(field, typ);
}
/// Removes the value_bytes and set the type code.
pub fn clear_with_type(&mut self, typ: Type) {
self.truncate_value_bytes(0);
@@ -202,11 +197,6 @@ impl Term {
self.0.truncate(len + TERM_METADATA_LENGTH);
}
/// The length of the bytes.
pub fn len_bytes(&self) -> usize {
self.0.len() - TERM_METADATA_LENGTH
}
/// Appends value bytes to the Term.
///
/// This function returns the segment that has just been added.
@@ -216,27 +206,11 @@ impl Term {
self.0.extend_from_slice(bytes);
&mut self.0[len_before..]
}
/// Appends json path bytes to the Term.
/// If the path contains 0 bytes, they are replaced by a "0" string.
/// The 0 byte is used to mark the end of the path.
///
/// This function returns the segment that has just been added.
#[inline]
pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
if bytes.contains(&0u8) {
self.0
.extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
self.0.extend_from_slice(bytes);
}
&mut self.0[len_before..]
}
}
impl<B> Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> Term<B> {
@@ -260,7 +234,7 @@ where B: AsRef<[u8]>
/// If the term is a string, its value is utf-8 encoded.
/// If the term is a u64, its value is encoded according
/// to `byteorder::BigEndian`.
pub fn serialized_value_bytes(&self) -> &[u8] {
pub(crate) fn serialized_value_bytes(&self) -> &[u8] {
&self.0.as_ref()[TERM_METADATA_LENGTH..]
}
@@ -294,10 +268,12 @@ where B: AsRef<[u8]>
/// The nested ValueBytes in JSON is never of type JSON. (there's no recursion)
#[derive(Clone)]
pub struct ValueBytes<B>(B)
where B: AsRef<[u8]>;
where
B: AsRef<[u8]>;
impl<B> ValueBytes<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> ValueBytes<B> {
@@ -503,7 +479,8 @@ where B: AsRef<[u8]>
}
impl<B> Ord for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.serialized_term().cmp(other.serialized_term())
@@ -511,7 +488,8 @@ where B: AsRef<[u8]>
}
impl<B> PartialOrd for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
@@ -519,7 +497,8 @@ where B: AsRef<[u8]>
}
impl<B> PartialEq for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn eq(&self, other: &Self) -> bool {
self.serialized_term() == other.serialized_term()
@@ -529,7 +508,8 @@ where B: AsRef<[u8]>
impl<B> Eq for Term<B> where B: AsRef<[u8]> {}
impl<B> Hash for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.as_ref().hash(state)
@@ -544,7 +524,8 @@ fn write_opt<T: std::fmt::Debug>(f: &mut fmt::Formatter, val_opt: Option<T>) ->
}
impl<B> fmt::Debug for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let field_id = self.field().field_id();