add JsonTermSerializer

This commit is contained in:
Pascal Seitz
2024-04-20 18:56:27 +08:00
parent 87b9f0678c
commit 2d7483e3d4
2 changed files with 45 additions and 110 deletions

View File

@@ -8,8 +8,8 @@ use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder};
use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter};
use crate::schema::indexing_term::{IndexingTerm, ValueBytes};
use crate::schema::{Field, Type};
use crate::schema::indexing_term::IndexingTerm;
use crate::schema::{Field, Type, ValueBytes};
use crate::tokenizer::TokenStream;
use crate::DocId;
@@ -67,26 +67,25 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut term_buffer = IndexingTerm::with_capacity(48);
let mut term_buffer = JsonTermSerializer(Vec::with_capacity(48));
let mut buffer_lender = BufferLender::default();
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
let mut prev_term_id = u32::MAX;
let mut term_path_len = 0; // this will be set in the first iteration
for (_field, path_id, term, addr) in term_addrs {
if prev_term_id != path_id.path_id() {
term_buffer.truncate_value_bytes(0);
term_buffer.clear();
term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes());
term_buffer.append_bytes(&[JSON_END_OF_PATH]);
term_path_len = term_buffer.len_bytes();
term_path_len = term_buffer.len();
prev_term_id = path_id.path_id();
}
term_buffer.truncate_value_bytes(term_path_len);
term_buffer.truncate(term_path_len);
term_buffer.append_bytes(term);
let json_value = ValueBytes::wrap(term);
let typ = json_value.typ();
if typ == Type::Str {
SpecializedPostingsWriter::<Rec>::serialize_one_term(
term_buffer.serialized_value_bytes(),
term_buffer.as_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
@@ -95,7 +94,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
)?;
} else {
SpecializedPostingsWriter::<DocIdRecorder>::serialize_one_term(
term_buffer.serialized_value_bytes(),
term_buffer.as_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
@@ -111,3 +110,40 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
self.str_posting_writer.total_num_tokens() + self.non_str_posting_writer.total_num_tokens()
}
}
struct JsonTermSerializer(Vec<u8>);
impl JsonTermSerializer {
#[inline]
pub fn append_path(&mut self, bytes: &[u8]) {
if bytes.contains(&0u8) {
self.0
.extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
self.0.extend_from_slice(bytes);
}
}
/// Appends value bytes to the Term.
///
/// This function returns the segment that has just been added.
#[inline]
pub fn append_bytes(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
self.0.extend_from_slice(bytes);
&mut self.0[len_before..]
}
fn clear(&mut self) {
self.0.clear();
}
fn truncate(&mut self, len: usize) {
self.0.truncate(len);
}
fn len(&self) -> usize {
self.0.len()
}
fn as_bytes(&self) -> &[u8] {
&self.0
}
}

View File

@@ -1,4 +1,3 @@
use std::hash::{Hash, Hasher};
use std::net::Ipv6Addr;
use columnar::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
@@ -128,23 +127,6 @@ impl IndexingTerm {
self.0.extend_from_slice(bytes);
&mut self.0[len_before..]
}
/// Appends json path bytes to the Term.
/// If the path contains 0 bytes, they are replaced by a "0" string.
/// The 0 byte is used to mark the end of the path.
///
/// This function returns the segment that has just been added.
#[inline]
pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
if bytes.contains(&0u8) {
self.0
.extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
self.0.extend_from_slice(bytes);
}
&mut self.0[len_before..]
}
}
impl<B> IndexingTerm<B>
@@ -162,16 +144,6 @@ where
Field::from_field_id(u32::from_be_bytes(field_id_bytes))
}
/// Returns the serialized representation of the value.
/// (this does neither include the field id nor the value type.)
///
/// If the term is a string, its value is utf-8 encoded.
/// If the term is a u64, its value is encoded according
/// to `byteorder::BigEndian`.
pub fn serialized_value_bytes(&self) -> &[u8] {
&self.0.as_ref()[TERM_METADATA_LENGTH..]
}
/// Returns the serialized representation of Term.
/// This includes field_id, value type and value.
///
@@ -182,76 +154,3 @@ where
self.0.as_ref()
}
}
/// ValueBytes represents a serialized value.
/// The value can be of any type of [`Type`] (e.g. string, u64, f64, bool, date, JSON).
/// The serialized representation matches the lexographical order of the type.
///
/// The `ValueBytes` format is as follow:
/// `[type code: u8][serialized value]`
///
/// For JSON `ValueBytes` equals to:
/// `[type code=JSON][JSON path][JSON_END_OF_PATH][ValueBytes]`
///
/// The nested ValueBytes in JSON is never of type JSON. (there's no recursion)
#[derive(Clone)]
pub struct ValueBytes<B>(B)
where
B: AsRef<[u8]>;
impl<B> ValueBytes<B>
where
B: AsRef<[u8]>,
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> ValueBytes<B> {
ValueBytes(data)
}
fn typ_code(&self) -> u8 {
self.0.as_ref()[0]
}
/// Return the type of the term.
pub fn typ(&self) -> Type {
Type::from_code(self.typ_code()).expect("The term has an invalid type code")
}
}
impl<B> Ord for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.serialized_term().cmp(other.serialized_term())
}
}
impl<B> PartialOrd for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<B> PartialEq for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn eq(&self, other: &Self) -> bool {
self.serialized_term() == other.serialized_term()
}
}
impl<B> Eq for IndexingTerm<B> where B: AsRef<[u8]> {}
impl<B> Hash for IndexingTerm<B>
where
B: AsRef<[u8]>,
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.as_ref().hash(state)
}
}