mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-23 02:29:57 +00:00
collect json paths in indexing (#2231)
* collect json paths in indexing * remove unsafe iter_mut_keys
This commit is contained in:
@@ -118,6 +118,7 @@ pub fn u64_to_f64(val: u64) -> f64 {
|
||||
///
|
||||
/// This function assumes that the needle is rarely contained in the bytes string
|
||||
/// and offers a fast path if the needle is not present.
|
||||
#[inline]
|
||||
pub fn replace_in_place(needle: u8, replacement: u8, bytes: &mut [u8]) {
|
||||
if !bytes.contains(&needle) {
|
||||
return;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use columnar::MonotonicallyMappableToU64;
|
||||
use common::{replace_in_place, JsonPathWriter};
|
||||
use murmurhash32::murmurhash2;
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
use crate::fastfield::FastValue;
|
||||
@@ -58,13 +57,12 @@ struct IndexingPositionsPerPath {
|
||||
}
|
||||
|
||||
impl IndexingPositionsPerPath {
|
||||
fn get_position(&mut self, term: &Term) -> &mut IndexingPosition {
|
||||
self.positions_per_path
|
||||
.entry(murmurhash2(term.serialized_term()))
|
||||
.or_default()
|
||||
fn get_position_from_id(&mut self, id: u32) -> &mut IndexingPosition {
|
||||
self.positions_per_path.entry(id).or_default()
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn index_json_values<'a, V: Value<'a>>(
|
||||
doc: DocId,
|
||||
json_visitors: impl Iterator<Item = crate::Result<V::ObjectIter>>,
|
||||
@@ -72,9 +70,11 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>(
|
||||
expand_dots_enabled: bool,
|
||||
term_buffer: &mut Term,
|
||||
postings_writer: &mut dyn PostingsWriter,
|
||||
json_path_writer: &mut JsonPathWriter,
|
||||
ctx: &mut IndexingContext,
|
||||
) -> crate::Result<()> {
|
||||
let mut json_term_writer = JsonTermWriter::wrap(term_buffer, expand_dots_enabled);
|
||||
json_path_writer.clear();
|
||||
json_path_writer.set_expand_dots(expand_dots_enabled);
|
||||
let mut positions_per_path: IndexingPositionsPerPath = Default::default();
|
||||
for json_visitor_res in json_visitors {
|
||||
let json_visitor = json_visitor_res?;
|
||||
@@ -82,7 +82,8 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>(
|
||||
doc,
|
||||
json_visitor,
|
||||
text_analyzer,
|
||||
&mut json_term_writer,
|
||||
term_buffer,
|
||||
json_path_writer,
|
||||
postings_writer,
|
||||
ctx,
|
||||
&mut positions_per_path,
|
||||
@@ -91,75 +92,117 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn index_json_object<'a, V: Value<'a>>(
|
||||
doc: DocId,
|
||||
json_visitor: V::ObjectIter,
|
||||
text_analyzer: &mut TextAnalyzer,
|
||||
json_term_writer: &mut JsonTermWriter,
|
||||
term_buffer: &mut Term,
|
||||
json_path_writer: &mut JsonPathWriter,
|
||||
postings_writer: &mut dyn PostingsWriter,
|
||||
ctx: &mut IndexingContext,
|
||||
positions_per_path: &mut IndexingPositionsPerPath,
|
||||
) {
|
||||
for (json_path_segment, json_value_visitor) in json_visitor {
|
||||
json_term_writer.push_path_segment(json_path_segment);
|
||||
json_path_writer.push(json_path_segment);
|
||||
index_json_value(
|
||||
doc,
|
||||
json_value_visitor,
|
||||
text_analyzer,
|
||||
json_term_writer,
|
||||
term_buffer,
|
||||
json_path_writer,
|
||||
postings_writer,
|
||||
ctx,
|
||||
positions_per_path,
|
||||
);
|
||||
json_term_writer.pop_path_segment();
|
||||
json_path_writer.pop();
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn index_json_value<'a, V: Value<'a>>(
|
||||
doc: DocId,
|
||||
json_value: V,
|
||||
text_analyzer: &mut TextAnalyzer,
|
||||
json_term_writer: &mut JsonTermWriter,
|
||||
term_buffer: &mut Term,
|
||||
json_path_writer: &mut JsonPathWriter,
|
||||
postings_writer: &mut dyn PostingsWriter,
|
||||
ctx: &mut IndexingContext,
|
||||
positions_per_path: &mut IndexingPositionsPerPath,
|
||||
) {
|
||||
let set_path_id = |term_buffer: &mut Term, unordered_id: u32| {
|
||||
term_buffer.truncate_value_bytes(0);
|
||||
term_buffer.append_bytes(&unordered_id.to_be_bytes());
|
||||
};
|
||||
let set_type = |term_buffer: &mut Term, typ: Type| {
|
||||
term_buffer.append_bytes(&[typ.to_code()]);
|
||||
};
|
||||
|
||||
match json_value.as_value() {
|
||||
ReferenceValue::Leaf(leaf) => match leaf {
|
||||
ReferenceValueLeaf::Null => {}
|
||||
ReferenceValueLeaf::Str(val) => {
|
||||
let mut token_stream = text_analyzer.token_stream(val);
|
||||
let unordered_id = ctx
|
||||
.path_to_unordered_id
|
||||
.get_or_allocate_unordered_id(json_path_writer.as_str());
|
||||
|
||||
// TODO: make sure the chain position works out.
|
||||
json_term_writer.close_path_and_set_type(Type::Str);
|
||||
let indexing_position = positions_per_path.get_position(json_term_writer.term());
|
||||
set_path_id(term_buffer, unordered_id);
|
||||
set_type(term_buffer, Type::Str);
|
||||
let indexing_position = positions_per_path.get_position_from_id(unordered_id);
|
||||
postings_writer.index_text(
|
||||
doc,
|
||||
&mut *token_stream,
|
||||
json_term_writer.term_buffer,
|
||||
term_buffer,
|
||||
ctx,
|
||||
indexing_position,
|
||||
);
|
||||
}
|
||||
ReferenceValueLeaf::U64(val) => {
|
||||
json_term_writer.set_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
|
||||
set_path_id(
|
||||
term_buffer,
|
||||
ctx.path_to_unordered_id
|
||||
.get_or_allocate_unordered_id(json_path_writer.as_str()),
|
||||
);
|
||||
term_buffer.append_type_and_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
|
||||
}
|
||||
ReferenceValueLeaf::I64(val) => {
|
||||
json_term_writer.set_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
|
||||
set_path_id(
|
||||
term_buffer,
|
||||
ctx.path_to_unordered_id
|
||||
.get_or_allocate_unordered_id(json_path_writer.as_str()),
|
||||
);
|
||||
term_buffer.append_type_and_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
|
||||
}
|
||||
ReferenceValueLeaf::F64(val) => {
|
||||
json_term_writer.set_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
|
||||
set_path_id(
|
||||
term_buffer,
|
||||
ctx.path_to_unordered_id
|
||||
.get_or_allocate_unordered_id(json_path_writer.as_str()),
|
||||
);
|
||||
term_buffer.append_type_and_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
|
||||
}
|
||||
ReferenceValueLeaf::Bool(val) => {
|
||||
json_term_writer.set_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
|
||||
set_path_id(
|
||||
term_buffer,
|
||||
ctx.path_to_unordered_id
|
||||
.get_or_allocate_unordered_id(json_path_writer.as_str()),
|
||||
);
|
||||
term_buffer.append_type_and_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
|
||||
}
|
||||
ReferenceValueLeaf::Date(val) => {
|
||||
json_term_writer.set_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
|
||||
set_path_id(
|
||||
term_buffer,
|
||||
ctx.path_to_unordered_id
|
||||
.get_or_allocate_unordered_id(json_path_writer.as_str()),
|
||||
);
|
||||
term_buffer.append_type_and_fast_value(val);
|
||||
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
|
||||
}
|
||||
ReferenceValueLeaf::PreTokStr(_) => {
|
||||
unimplemented!(
|
||||
@@ -182,7 +225,8 @@ fn index_json_value<'a, V: Value<'a>>(
|
||||
doc,
|
||||
val,
|
||||
text_analyzer,
|
||||
json_term_writer,
|
||||
term_buffer,
|
||||
json_path_writer,
|
||||
postings_writer,
|
||||
ctx,
|
||||
positions_per_path,
|
||||
@@ -194,7 +238,8 @@ fn index_json_value<'a, V: Value<'a>>(
|
||||
doc,
|
||||
object,
|
||||
text_analyzer,
|
||||
json_term_writer,
|
||||
term_buffer,
|
||||
json_path_writer,
|
||||
postings_writer,
|
||||
ctx,
|
||||
positions_per_path,
|
||||
@@ -361,6 +406,7 @@ impl<'a> JsonTermWriter<'a> {
|
||||
self.term_buffer.append_bytes(&[typ.to_code()]);
|
||||
}
|
||||
|
||||
// TODO: Remove this function and use JsonPathWriter instead.
|
||||
pub fn push_path_segment(&mut self, segment: &str) {
|
||||
// the path stack should never be empty.
|
||||
self.trim_to_end_of_path();
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
//! [`Index::writer`](crate::Index::writer).
|
||||
|
||||
pub(crate) mod delete_queue;
|
||||
pub(crate) mod path_to_unordered_id;
|
||||
|
||||
pub(crate) mod doc_id_mapping;
|
||||
mod doc_opstamp_mapping;
|
||||
|
||||
92
src/indexer/path_to_unordered_id.rs
Normal file
92
src/indexer/path_to_unordered_id.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use fnv::FnvHashMap;
|
||||
|
||||
/// `Field` is represented by an unsigned 32-bit integer type.
|
||||
/// The schema holds the mapping between field names and `Field` objects.
|
||||
#[derive(Copy, Default, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
|
||||
pub struct OrderedPathId(u32);
|
||||
|
||||
impl OrderedPathId {
|
||||
/// Create a new field object for the given PathId.
|
||||
pub const fn from_ordered_id(field_id: u32) -> OrderedPathId {
|
||||
OrderedPathId(field_id)
|
||||
}
|
||||
|
||||
/// Returns a u32 identifying uniquely a path within a schema.
|
||||
pub const fn path_id(self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
impl From<u32> for OrderedPathId {
|
||||
fn from(id: u32) -> Self {
|
||||
Self(id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct PathToUnorderedId {
|
||||
map: FnvHashMap<String, u32>,
|
||||
}
|
||||
|
||||
impl PathToUnorderedId {
|
||||
#[inline]
|
||||
pub(crate) fn get_or_allocate_unordered_id(&mut self, path: &str) -> u32 {
|
||||
if let Some(id) = self.map.get(path) {
|
||||
return *id;
|
||||
}
|
||||
self.insert_new_path(path)
|
||||
}
|
||||
#[cold]
|
||||
fn insert_new_path(&mut self, path: &str) -> u32 {
|
||||
let next_id = self.map.len() as u32;
|
||||
self.map.insert(path.to_string(), next_id);
|
||||
next_id
|
||||
}
|
||||
|
||||
/// Retuns ids which reflect the lexical order of the paths.
|
||||
///
|
||||
/// The returned vec can be indexed with the unordered id to get the ordered id.
|
||||
pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec<OrderedPathId> {
|
||||
let mut sorted_ids: Vec<(&str, &u32)> =
|
||||
self.map.iter().map(|(k, v)| (k.as_str(), v)).collect();
|
||||
sorted_ids.sort_unstable_by_key(|(path, _)| *path);
|
||||
let mut result = vec![OrderedPathId::default(); sorted_ids.len()];
|
||||
for (ordered, unordered) in sorted_ids.iter().map(|(_k, v)| v).enumerate() {
|
||||
result[**unordered as usize] = OrderedPathId::from_ordered_id(ordered as u32);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Retuns the paths so they can be queried by the ordered id (which is the index).
|
||||
pub(crate) fn ordered_id_to_path(&self) -> Vec<&str> {
|
||||
let mut paths = self.map.keys().map(String::as_str).collect::<Vec<_>>();
|
||||
paths.sort_unstable();
|
||||
paths
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn path_to_unordered_test() {
|
||||
let mut path_to_id = PathToUnorderedId::default();
|
||||
let terms = vec!["b", "a", "b", "c"];
|
||||
let ids = terms
|
||||
.iter()
|
||||
.map(|term| path_to_id.get_or_allocate_unordered_id(term))
|
||||
.collect::<Vec<u32>>();
|
||||
assert_eq!(ids, vec![0, 1, 0, 2]);
|
||||
let ordered_ids = ids
|
||||
.iter()
|
||||
.map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize])
|
||||
.collect::<Vec<OrderedPathId>>();
|
||||
assert_eq!(ordered_ids, vec![1.into(), 0.into(), 1.into(), 2.into()]);
|
||||
// Fetch terms
|
||||
let terms_fetched = ordered_ids
|
||||
.iter()
|
||||
.map(|id| path_to_id.ordered_id_to_path()[id.path_id() as usize])
|
||||
.collect::<Vec<&str>>();
|
||||
assert_eq!(terms_fetched, terms);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
use columnar::MonotonicallyMappableToU64;
|
||||
use common::JsonPathWriter;
|
||||
use itertools::Itertools;
|
||||
use tokenizer_api::BoxTokenStream;
|
||||
|
||||
@@ -66,6 +67,7 @@ pub struct SegmentWriter {
|
||||
pub(crate) segment_serializer: SegmentSerializer,
|
||||
pub(crate) fast_field_writers: FastFieldsWriter,
|
||||
pub(crate) fieldnorms_writer: FieldNormsWriter,
|
||||
pub(crate) json_path_writer: JsonPathWriter,
|
||||
pub(crate) doc_opstamps: Vec<Opstamp>,
|
||||
per_field_text_analyzers: Vec<TextAnalyzer>,
|
||||
term_buffer: Term,
|
||||
@@ -116,6 +118,7 @@ impl SegmentWriter {
|
||||
ctx: IndexingContext::new(table_size),
|
||||
per_field_postings_writers,
|
||||
fieldnorms_writer: FieldNormsWriter::for_schema(&schema),
|
||||
json_path_writer: JsonPathWriter::default(),
|
||||
segment_serializer,
|
||||
fast_field_writers: FastFieldsWriter::from_schema_and_tokenizer_manager(
|
||||
&schema,
|
||||
@@ -144,6 +147,7 @@ impl SegmentWriter {
|
||||
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
|
||||
.transpose()?;
|
||||
remap_and_write(
|
||||
self.schema,
|
||||
&self.per_field_postings_writers,
|
||||
self.ctx,
|
||||
self.fast_field_writers,
|
||||
@@ -355,6 +359,7 @@ impl SegmentWriter {
|
||||
json_options.is_expand_dots_enabled(),
|
||||
term_buffer,
|
||||
postings_writer,
|
||||
&mut self.json_path_writer,
|
||||
ctx,
|
||||
)?;
|
||||
}
|
||||
@@ -422,6 +427,7 @@ impl SegmentWriter {
|
||||
///
|
||||
/// `doc_id_map` is used to map to the new doc_id order.
|
||||
fn remap_and_write(
|
||||
schema: Schema,
|
||||
per_field_postings_writers: &PerFieldPostingsWriter,
|
||||
ctx: IndexingContext,
|
||||
fast_field_writers: FastFieldsWriter,
|
||||
@@ -439,6 +445,7 @@ fn remap_and_write(
|
||||
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
|
||||
serialize_postings(
|
||||
ctx,
|
||||
schema,
|
||||
per_field_postings_writers,
|
||||
fieldnorm_readers,
|
||||
doc_id_map,
|
||||
@@ -489,11 +496,11 @@ mod tests {
|
||||
use tempfile::TempDir;
|
||||
|
||||
use super::compute_initial_table_size;
|
||||
use crate::collector::Count;
|
||||
use crate::collector::{Count, TopDocs};
|
||||
use crate::core::json_utils::JsonTermWriter;
|
||||
use crate::directory::RamDirectory;
|
||||
use crate::postings::TermInfo;
|
||||
use crate::query::PhraseQuery;
|
||||
use crate::query::{PhraseQuery, QueryParser};
|
||||
use crate::schema::document::Value;
|
||||
use crate::schema::{
|
||||
Document, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, Type, STORED, STRING,
|
||||
@@ -552,6 +559,43 @@ mod tests {
|
||||
assert_eq!(doc.field_values()[0].value().as_str(), Some("A"));
|
||||
assert_eq!(doc.field_values()[1].value().as_str(), Some("title"));
|
||||
}
|
||||
#[test]
|
||||
fn test_simple_json_indexing() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let json_field = schema_builder.add_json_field("json", STORED | STRING);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer
|
||||
.add_document(doc!(json_field=>json!({"my_field": "b"})))
|
||||
.unwrap();
|
||||
writer
|
||||
.add_document(doc!(json_field=>json!({"my_field": "a"})))
|
||||
.unwrap();
|
||||
writer
|
||||
.add_document(doc!(json_field=>json!({"my_field": "b"})))
|
||||
.unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
let query_parser = QueryParser::for_index(&index, vec![json_field]);
|
||||
let text_query = query_parser.parse_query("my_field:a").unwrap();
|
||||
let score_docs: Vec<(_, DocAddress)> = index
|
||||
.reader()
|
||||
.unwrap()
|
||||
.searcher()
|
||||
.search(&text_query, &TopDocs::with_limit(4))
|
||||
.unwrap();
|
||||
assert_eq!(score_docs.len(), 1);
|
||||
|
||||
let text_query = query_parser.parse_query("my_field:b").unwrap();
|
||||
let score_docs: Vec<(_, DocAddress)> = index
|
||||
.reader()
|
||||
.unwrap()
|
||||
.searcher()
|
||||
.search(&text_query, &TopDocs::with_limit(4))
|
||||
.unwrap();
|
||||
assert_eq!(score_docs.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_indexing() {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use stacker::{ArenaHashMap, MemoryArena};
|
||||
|
||||
use crate::indexer::path_to_unordered_id::PathToUnorderedId;
|
||||
|
||||
/// IndexingContext contains all of the transient memory arenas
|
||||
/// required for building the inverted index.
|
||||
pub(crate) struct IndexingContext {
|
||||
@@ -8,6 +10,7 @@ pub(crate) struct IndexingContext {
|
||||
pub term_index: ArenaHashMap,
|
||||
/// Arena is a memory arena that stores posting lists / term frequencies / positions.
|
||||
pub arena: MemoryArena,
|
||||
pub path_to_unordered_id: PathToUnorderedId,
|
||||
}
|
||||
|
||||
impl IndexingContext {
|
||||
@@ -17,6 +20,7 @@ impl IndexingContext {
|
||||
IndexingContext {
|
||||
arena: MemoryArena::default(),
|
||||
term_index,
|
||||
path_to_unordered_id: PathToUnorderedId::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,10 +3,11 @@ use std::io;
|
||||
use stacker::Addr;
|
||||
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::indexer::path_to_unordered_id::OrderedPathId;
|
||||
use crate::postings::postings_writer::SpecializedPostingsWriter;
|
||||
use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder};
|
||||
use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter};
|
||||
use crate::schema::Type;
|
||||
use crate::schema::{Field, Type, JSON_END_OF_PATH};
|
||||
use crate::tokenizer::TokenStream;
|
||||
use crate::{DocId, Term};
|
||||
|
||||
@@ -54,18 +55,24 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
|
||||
/// The actual serialization format is handled by the `PostingsSerializer`.
|
||||
fn serialize(
|
||||
&self,
|
||||
term_addrs: &[(Term<&[u8]>, Addr)],
|
||||
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
|
||||
ordered_id_to_path: &[&str],
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
ctx: &IndexingContext,
|
||||
serializer: &mut FieldSerializer,
|
||||
) -> io::Result<()> {
|
||||
let mut term_buffer = Term::with_capacity(48);
|
||||
let mut buffer_lender = BufferLender::default();
|
||||
for (term, addr) in term_addrs {
|
||||
if let Some(json_value) = term.value().as_json_value_bytes() {
|
||||
for (_field, path_id, term, addr) in term_addrs {
|
||||
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
|
||||
term_buffer.append_bytes(ordered_id_to_path[path_id.path_id() as usize].as_bytes());
|
||||
term_buffer.append_bytes(&[JSON_END_OF_PATH]);
|
||||
term_buffer.append_bytes(term);
|
||||
if let Some(json_value) = term_buffer.value().as_json_value_bytes() {
|
||||
let typ = json_value.typ();
|
||||
if typ == Type::Str {
|
||||
SpecializedPostingsWriter::<Rec>::serialize_one_term(
|
||||
term,
|
||||
term_buffer.serialized_value_bytes(),
|
||||
*addr,
|
||||
doc_id_map,
|
||||
&mut buffer_lender,
|
||||
@@ -74,7 +81,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
|
||||
)?;
|
||||
} else {
|
||||
SpecializedPostingsWriter::<DocIdRecorder>::serialize_one_term(
|
||||
term,
|
||||
term_buffer.serialized_value_bytes(),
|
||||
*addr,
|
||||
doc_id_map,
|
||||
&mut buffer_lender,
|
||||
|
||||
@@ -6,20 +6,23 @@ use stacker::Addr;
|
||||
|
||||
use crate::fieldnorm::FieldNormReaders;
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::indexer::path_to_unordered_id::OrderedPathId;
|
||||
use crate::postings::recorder::{BufferLender, Recorder};
|
||||
use crate::postings::{
|
||||
FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter,
|
||||
};
|
||||
use crate::schema::{Field, Term};
|
||||
use crate::schema::{Field, Schema, Term, Type};
|
||||
use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN};
|
||||
use crate::DocId;
|
||||
|
||||
const POSITION_GAP: u32 = 1;
|
||||
|
||||
fn make_field_partition(term_offsets: &[(Term<&[u8]>, Addr)]) -> Vec<(Field, Range<usize>)> {
|
||||
fn make_field_partition(
|
||||
term_offsets: &[(Field, OrderedPathId, &[u8], Addr)],
|
||||
) -> Vec<(Field, Range<usize>)> {
|
||||
let term_offsets_it = term_offsets
|
||||
.iter()
|
||||
.map(|(term, _)| term.field())
|
||||
.map(|(field, _, _, _)| *field)
|
||||
.enumerate();
|
||||
let mut prev_field_opt = None;
|
||||
let mut fields = vec![];
|
||||
@@ -44,19 +47,36 @@ fn make_field_partition(term_offsets: &[(Term<&[u8]>, Addr)]) -> Vec<(Field, Ran
|
||||
/// postings serializer.
|
||||
pub(crate) fn serialize_postings(
|
||||
ctx: IndexingContext,
|
||||
schema: Schema,
|
||||
per_field_postings_writers: &PerFieldPostingsWriter,
|
||||
fieldnorm_readers: FieldNormReaders,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
) -> crate::Result<()> {
|
||||
let mut term_offsets: Vec<(Term<&[u8]>, Addr)> = Vec::with_capacity(ctx.term_index.len());
|
||||
term_offsets.extend(
|
||||
ctx.term_index
|
||||
.iter()
|
||||
.map(|(bytes, addr)| (Term::wrap(bytes), addr)),
|
||||
);
|
||||
term_offsets.sort_unstable_by_key(|(k, _)| k.clone());
|
||||
// Replace unordered ids by ordered ids to be able to sort
|
||||
let unordered_id_to_ordered_id: Vec<OrderedPathId> =
|
||||
ctx.path_to_unordered_id.unordered_id_to_ordered_id();
|
||||
|
||||
let mut term_offsets: Vec<(Field, OrderedPathId, &[u8], Addr)> =
|
||||
Vec::with_capacity(ctx.term_index.len());
|
||||
term_offsets.extend(ctx.term_index.iter().map(|(key, addr)| {
|
||||
let field = Term::wrap(key).field();
|
||||
if schema.get_field_entry(field).field_type().value_type() == Type::Json {
|
||||
let byte_range_path = 5..5 + 4;
|
||||
let unordered_id = u32::from_be_bytes(key[byte_range_path.clone()].try_into().unwrap());
|
||||
let path_id = unordered_id_to_ordered_id[unordered_id as usize];
|
||||
(field, path_id, &key[byte_range_path.end..], addr)
|
||||
} else {
|
||||
(field, 0.into(), &key[5..], addr)
|
||||
}
|
||||
}));
|
||||
// Sort by field, path, and term
|
||||
term_offsets.sort_unstable_by(
|
||||
|(field1, path_id1, bytes1, _), (field2, path_id2, bytes2, _)| {
|
||||
(field1, path_id1, bytes1).cmp(&(field2, path_id2, bytes2))
|
||||
},
|
||||
);
|
||||
let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path();
|
||||
let field_offsets = make_field_partition(&term_offsets);
|
||||
for (field, byte_offsets) in field_offsets {
|
||||
let postings_writer = per_field_postings_writers.get_for_field(field);
|
||||
@@ -65,12 +85,14 @@ pub(crate) fn serialize_postings(
|
||||
serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?;
|
||||
postings_writer.serialize(
|
||||
&term_offsets[byte_offsets],
|
||||
&ordered_id_to_path,
|
||||
doc_id_map,
|
||||
&ctx,
|
||||
&mut field_serializer,
|
||||
)?;
|
||||
field_serializer.close()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -98,7 +120,8 @@ pub(crate) trait PostingsWriter: Send + Sync {
|
||||
/// The actual serialization format is handled by the `PostingsSerializer`.
|
||||
fn serialize(
|
||||
&self,
|
||||
term_addrs: &[(Term<&[u8]>, Addr)],
|
||||
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
|
||||
ordered_id_to_path: &[&str],
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
ctx: &IndexingContext,
|
||||
serializer: &mut FieldSerializer,
|
||||
@@ -162,7 +185,7 @@ impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWri
|
||||
impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
|
||||
#[inline]
|
||||
pub(crate) fn serialize_one_term(
|
||||
term: &Term<&[u8]>,
|
||||
term: &[u8],
|
||||
addr: Addr,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
buffer_lender: &mut BufferLender,
|
||||
@@ -171,7 +194,7 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
|
||||
) -> io::Result<()> {
|
||||
let recorder: Rec = ctx.term_index.read(addr);
|
||||
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
|
||||
serializer.new_term(term.serialized_value_bytes(), term_doc_freq)?;
|
||||
serializer.new_term(term, term_doc_freq)?;
|
||||
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
|
||||
serializer.close_term()?;
|
||||
Ok(())
|
||||
@@ -204,13 +227,14 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
term_addrs: &[(Term<&[u8]>, Addr)],
|
||||
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
|
||||
_ordered_id_to_path: &[&str],
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
ctx: &IndexingContext,
|
||||
serializer: &mut FieldSerializer,
|
||||
) -> io::Result<()> {
|
||||
let mut buffer_lender = BufferLender::default();
|
||||
for (term, addr) in term_addrs {
|
||||
for (_field, _path_id, term, addr) in term_addrs {
|
||||
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -93,6 +93,7 @@ impl Type {
|
||||
}
|
||||
|
||||
/// Returns a 1 byte code used to identify the type.
|
||||
#[inline]
|
||||
pub fn to_code(&self) -> u8 {
|
||||
*self as u8
|
||||
}
|
||||
@@ -115,6 +116,7 @@ impl Type {
|
||||
|
||||
/// Interprets a 1byte code as a type.
|
||||
/// Returns `None` if the code is invalid.
|
||||
#[inline]
|
||||
pub fn from_code(code: u8) -> Option<Self> {
|
||||
match code {
|
||||
b's' => Some(Type::Str),
|
||||
|
||||
@@ -278,6 +278,7 @@ fn locate_splitting_dots(field_path: &str) -> Vec<usize> {
|
||||
|
||||
impl Schema {
|
||||
/// Return the `FieldEntry` associated with a `Field`.
|
||||
#[inline]
|
||||
pub fn get_field_entry(&self, field: Field) -> &FieldEntry {
|
||||
&self.0.fields[field.field_id() as usize]
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::hash::{Hash, Hasher};
|
||||
use std::net::Ipv6Addr;
|
||||
use std::{fmt, str};
|
||||
|
||||
use columnar::MonotonicallyMappableToU128;
|
||||
use columnar::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
|
||||
|
||||
use super::date_time_options::DATE_TIME_PRECISION_INDEXED;
|
||||
use super::Field;
|
||||
@@ -170,6 +170,18 @@ impl Term {
|
||||
self.set_bytes(val.to_u64().to_be_bytes().as_ref());
|
||||
}
|
||||
|
||||
pub(crate) fn append_type_and_fast_value<T: FastValue>(&mut self, val: T) {
|
||||
self.0.push(T::to_type().to_code());
|
||||
let value = if T::to_type() == Type::Date {
|
||||
DateTime::from_u64(val.to_u64())
|
||||
.truncate(DATE_TIME_PRECISION_INDEXED)
|
||||
.to_u64()
|
||||
} else {
|
||||
val.to_u64()
|
||||
};
|
||||
self.0.extend(value.to_be_bytes().as_ref());
|
||||
}
|
||||
|
||||
/// Sets a `Ipv6Addr` value in the term.
|
||||
pub fn set_ip_addr(&mut self, val: Ipv6Addr) {
|
||||
self.set_bytes(val.to_u128().to_be_bytes().as_ref());
|
||||
|
||||
@@ -23,7 +23,7 @@ type HashType = u64;
|
||||
/// The `value_addr` also points to an address in the memory arena.
|
||||
#[derive(Copy, Clone)]
|
||||
struct KeyValue {
|
||||
key_value_addr: Addr,
|
||||
pub(crate) key_value_addr: Addr,
|
||||
hash: HashType,
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ impl KeyValue {
|
||||
/// or copying the key as long as there is no insert.
|
||||
pub struct ArenaHashMap {
|
||||
table: Vec<KeyValue>,
|
||||
memory_arena: MemoryArena,
|
||||
pub memory_arena: MemoryArena,
|
||||
mask: usize,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
@@ -148,6 +148,11 @@ impl MemoryArena {
|
||||
self.get_page(addr.page_id())
|
||||
.slice_from(addr.page_local_addr())
|
||||
}
|
||||
#[inline]
|
||||
pub fn slice_from_mut(&mut self, addr: Addr) -> &mut [u8] {
|
||||
self.get_page_mut(addr.page_id())
|
||||
.slice_from_mut(addr.page_local_addr())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn slice_mut(&mut self, addr: Addr, len: usize) -> &mut [u8] {
|
||||
@@ -206,6 +211,10 @@ impl Page {
|
||||
fn slice_from(&self, local_addr: usize) -> &[u8] {
|
||||
&self.data[local_addr..]
|
||||
}
|
||||
#[inline]
|
||||
fn slice_from_mut(&mut self, local_addr: usize) -> &mut [u8] {
|
||||
&mut self.data[local_addr..]
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn slice_mut(&mut self, local_addr: usize, len: usize) -> &mut [u8] {
|
||||
|
||||
Reference in New Issue
Block a user