Adding an IndexingContext object (#1268)

This commit is contained in:
Paul Masurel
2022-02-04 15:08:01 +09:00
committed by GitHub
parent 13a4473faa
commit bdedefe07d
21 changed files with 516 additions and 459 deletions

View File

@@ -73,7 +73,7 @@ fn main() -> tantivy::Result<()> {
// multithreaded.
//
// Here we give tantivy a budget of `50MB`.
// Using a bigger heap for the indexer may increase
// Using a bigger memory_arena for the indexer may increase
// throughput, but 50 MB is already plenty.
let mut index_writer = index.writer(50_000_000)?;

View File

@@ -62,7 +62,7 @@ fn main() -> tantivy::Result<()> {
// multithreaded.
//
// Here we use a buffer of 50MB per thread. Using a bigger
// heap for the indexer can increase its throughput.
// memory arena for the indexer can increase its throughput.
let mut index_writer = index.writer(50_000_000)?;
index_writer.add_document(doc!(
title => "The Old Man and the Sea",

View File

@@ -15,7 +15,7 @@ use crate::directory::error::OpenReadError;
use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::indexer::index_writer::{HEAP_SIZE_MIN, MAX_NUM_THREAD};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_ARENA_NUM_BYTES_MIN};
use crate::indexer::segment_updater::save_new_metas;
use crate::reader::{IndexReader, IndexReaderBuilder};
use crate::schema::{Field, FieldType, Schema};
@@ -397,17 +397,18 @@ impl Index {
/// - `num_threads` defines the number of indexing workers that
/// should work at the same time.
///
/// - `overall_heap_size_in_bytes` sets the amount of memory
/// - `overall_memory_arena_in_bytes` sets the amount of memory
/// allocated for all indexing thread.
/// Each thread will receive a budget of `overall_heap_size_in_bytes / num_threads`.
/// Each thread will receive a budget of `overall_memory_arena_in_bytes / num_threads`.
///
/// # Errors
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
/// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument`
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_num_threads(
&self,
num_threads: usize,
overall_heap_size_in_bytes: usize,
overall_memory_arena_in_bytes: usize,
) -> crate::Result<IndexWriter> {
let directory_lock = self
.directory
@@ -423,18 +424,18 @@ impl Index {
),
)
})?;
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
let memory_arena_in_bytes_per_thread = overall_memory_arena_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
heap_size_in_bytes_per_thread,
memory_arena_in_bytes_per_thread,
directory_lock,
)
}
/// Helper to create an index writer for tests.
///
/// That index writer only simply has a single thread and a heap of 10 MB.
/// That index writer only simply has a single thread and a memory arena of 10 MB.
/// Using a single thread gives us a deterministic allocation of DocId.
#[cfg(test)]
pub fn writer_for_tests(&self) -> crate::Result<IndexWriter> {
@@ -445,19 +446,20 @@ impl Index {
///
/// Tantivy will automatically define the number of threads to use, but
/// no more than 8 threads.
/// `overall_heap_size_in_bytes` is the total target memory usage that will be split
/// `overall_memory_arena_in_bytes` is the total target memory usage that will be split
/// between a given number of threads.
///
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument`
pub fn writer(&self, overall_heap_size_in_bytes: usize) -> crate::Result<IndexWriter> {
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer(&self, memory_arena_num_bytes: usize) -> crate::Result<IndexWriter> {
let mut num_threads = std::cmp::min(num_cpus::get(), MAX_NUM_THREAD);
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
num_threads = (overall_heap_size_in_bytes / HEAP_SIZE_MIN).max(1);
let memory_arena_num_bytes_per_thread = memory_arena_num_bytes / num_threads;
if memory_arena_num_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
num_threads = (memory_arena_num_bytes / MEMORY_ARENA_NUM_BYTES_MIN).max(1);
}
self.writer_with_num_threads(num_threads, overall_heap_size_in_bytes)
self.writer_with_num_threads(num_threads, memory_arena_num_bytes)
}
/// Accessor to the index settings

View File

@@ -79,7 +79,7 @@ impl MultiValuedFastFieldWriter {
// facets are indexed in the `SegmentWriter` as we encode their unordered id.
if !self.is_facet {
for field_value in doc.field_values() {
if field_value.field() == self.field {
if field_value.field == self.field {
self.add_val(value_to_u64(field_value.value()));
}
}

View File

@@ -26,13 +26,13 @@ use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
use crate::schema::{Document, IndexRecordOption, Term};
use crate::Opstamp;
// Size of the margin for the heap. A segment is closed when the remaining memory
// in the heap goes below MARGIN_IN_BYTES.
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
// in the `memory_arena` goes below MARGIN_IN_BYTES.
pub const MARGIN_IN_BYTES: usize = 1_000_000;
// We impose the memory per thread to be at least 3 MB.
pub const HEAP_SIZE_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize;
pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
pub const MEMORY_ARENA_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize;
pub const MEMORY_ARENA_NUM_BYTES_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
// We impose the number of index writter thread to be at most this.
pub const MAX_NUM_THREAD: usize = 8;
@@ -61,7 +61,7 @@ pub struct IndexWriter {
index: Index,
heap_size_in_bytes_per_thread: usize,
memory_arena_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -179,10 +179,10 @@ fn index_documents(
) -> crate::Result<()> {
let schema = segment.schema();
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), schema)?;
for document_group in grouped_document_iterator {
for doc in document_group {
segment_writer.add_document(doc, &schema)?;
segment_writer.add_document(doc)?;
}
let mem_usage = segment_writer.mem_usage();
if mem_usage >= memory_budget - MARGIN_IN_BYTES {
@@ -268,22 +268,26 @@ impl IndexWriter {
/// should work at the same time.
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument`
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
num_threads: usize,
heap_size_in_bytes_per_thread: usize,
memory_arena_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> crate::Result<IndexWriter> {
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
if memory_arena_in_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
let err_msg = format!(
"The heap size per thread needs to be at least {}.",
HEAP_SIZE_MIN
"The memory arena in bytes per thread needs to be at least {}.",
MEMORY_ARENA_NUM_BYTES_MIN
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX {
let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
if memory_arena_in_bytes_per_thread >= MEMORY_ARENA_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {}",
MEMORY_ARENA_NUM_BYTES_MAX
);
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
@@ -301,7 +305,7 @@ impl IndexWriter {
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
heap_size_in_bytes_per_thread,
memory_arena_in_bytes_per_thread,
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
@@ -401,7 +405,7 @@ impl IndexWriter {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.heap_size_in_bytes_per_thread;
let mem_budget = self.memory_arena_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -560,7 +564,7 @@ impl IndexWriter {
let new_index_writer: IndexWriter = IndexWriter::new(
&self.index,
self.num_threads,
self.heap_size_in_bytes_per_thread,
self.memory_arena_in_bytes_per_thread,
directory_lock,
)?;

View File

@@ -4,31 +4,33 @@ use crate::core::Segment;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::postings::{compute_table_size, MultiFieldPostingsWriter};
use crate::schema::{Field, FieldEntry, FieldType, Schema, Term, Type, Value};
use crate::postings::{
compute_table_size, serialize_postings, IndexingContext, PerFieldPostingsWriter, PostingsWriter,
};
use crate::schema::{Field, FieldEntry, FieldType, FieldValue, Schema, Term, Type, Value};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{
BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, TokenStreamChain, Tokenizer,
};
use crate::{DocId, Opstamp, SegmentComponent};
use crate::{DocId, Document, Opstamp, SegmentComponent};
/// Computes the initial size of the hash table.
///
/// Returns a number of bit `b`, such that the recommended initial table size is 2^b.
fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
/// Returns the recommended initial table size as a power of 2.
///
/// Note this is a very dumb way to compute log2, but it is easier to proofread that way.
fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
let table_memory_upper_bound = per_thread_memory_budget / 3;
if let Some(limit) = (10..)
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_memory_upper_bound)
(10..20) // We cap it at 2^19 = 512K capacity.
.map(|power| 1 << power)
.take_while(|capacity| compute_table_size(*capacity) < table_memory_upper_bound)
.last()
{
Ok(limit.min(19)) // we cap it at 2^19 = 512K.
} else {
Err(crate::TantivyError::InvalidArgument(format!(
"per thread memory budget (={}) is too small. Raise the memory budget or lower the \
number of threads.",
per_thread_memory_budget
)))
}
.ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!(
"per thread memory budget (={per_thread_memory_budget}) is too small. Raise the \
memory budget or lower the number of threads."
))
})
}
fn remap_doc_opstamps(
@@ -52,13 +54,15 @@ fn remap_doc_opstamps(
/// The segment is layed on disk when the segment gets `finalized`.
pub struct SegmentWriter {
pub(crate) max_doc: DocId,
pub(crate) multifield_postings: MultiFieldPostingsWriter,
pub(crate) indexing_context: IndexingContext,
pub(crate) per_field_postings_writers: PerFieldPostingsWriter,
pub(crate) segment_serializer: SegmentSerializer,
pub(crate) fast_field_writers: FastFieldsWriter,
pub(crate) fieldnorms_writer: FieldNormsWriter,
pub(crate) doc_opstamps: Vec<Opstamp>,
tokenizers: Vec<Option<TextAnalyzer>>,
term_buffer: Term,
schema: Schema,
}
impl SegmentWriter {
@@ -66,20 +70,20 @@ impl SegmentWriter {
///
/// The arguments are defined as follows
///
/// - heap: most of the segment writer data (terms, and postings lists recorders)
/// is stored in a user-defined heap object. This makes it possible for the user to define
/// the flushing behavior as a buffer limit
/// - memory_budget: most of the segment writer data (terms, and postings lists recorders)
/// is stored in a memory arena. This makes it possible for the user to define
/// the flushing behavior as a memory limit.
/// - segment: The segment being written
/// - schema
pub fn for_segment(
memory_budget: usize,
memory_budget_in_bytes: usize,
segment: Segment,
schema: &Schema,
schema: Schema,
) -> crate::Result<SegmentWriter> {
let tokenizer_manager = segment.index().tokenizers().clone();
let table_num_bits = initial_table_size(memory_budget)?;
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema);
let tokenizers = schema
.fields()
.map(
@@ -96,13 +100,15 @@ impl SegmentWriter {
.collect();
Ok(SegmentWriter {
max_doc: 0,
multifield_postings,
fieldnorms_writer: FieldNormsWriter::for_schema(schema),
indexing_context: IndexingContext::new(table_size),
per_field_postings_writers,
fieldnorms_writer: FieldNormsWriter::for_schema(&schema),
segment_serializer,
fast_field_writers: FastFieldsWriter::from_schema(schema),
fast_field_writers: FastFieldsWriter::from_schema(&schema),
doc_opstamps: Vec::with_capacity(1_000),
tokenizers,
term_buffer: Term::new(),
schema,
})
}
@@ -122,9 +128,11 @@ impl SegmentWriter {
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
.transpose()?;
remap_and_write(
&self.multifield_postings,
&self.per_field_postings_writers,
self.indexing_context,
&self.fast_field_writers,
&self.fieldnorms_writer,
&self.schema,
self.segment_serializer,
mapping.as_ref(),
)?;
@@ -133,28 +141,16 @@ impl SegmentWriter {
}
pub fn mem_usage(&self) -> usize {
self.multifield_postings.mem_usage()
self.indexing_context.mem_usage()
+ self.fieldnorms_writer.mem_usage()
+ self.fast_field_writers.mem_usage()
+ self.segment_serializer.mem_usage()
}
/// Indexes a new document
///
/// As a user, you should rather use `IndexWriter`'s add_document.
pub fn add_document(
&mut self,
add_operation: AddOperation,
schema: &Schema,
) -> crate::Result<()> {
fn index_document(&mut self, doc: &Document) -> crate::Result<()> {
let doc_id = self.max_doc;
let mut doc = add_operation.document;
self.doc_opstamps.push(add_operation.opstamp);
self.fast_field_writers.add_document(&doc);
for (field, values) in doc.get_sorted_field_values() {
let field_entry = schema.get_field_entry(field);
let field_entry = self.schema.get_field_entry(field);
let make_schema_error = || {
crate::TantivyError::SchemaError(format!(
"Expected a {:?} for field {:?}",
@@ -165,8 +161,10 @@ impl SegmentWriter {
if !field_entry.is_indexed() {
continue;
}
let (term_buffer, multifield_postings) =
(&mut self.term_buffer, &mut self.multifield_postings);
let (term_buffer, indexing_context) =
(&mut self.term_buffer, &mut self.indexing_context);
let postings_writer: &mut dyn PostingsWriter =
self.per_field_postings_writers.get_for_field_mut(field);
match *field_entry.field_type() {
FieldType::Facet(_) => {
term_buffer.set_field(Type::Facet, field);
@@ -178,8 +176,13 @@ impl SegmentWriter {
.token_stream(facet_str)
.process(&mut |token| {
term_buffer.set_text(&token.text);
let unordered_term_id =
multifield_postings.subscribe(doc_id, term_buffer);
let unordered_term_id = postings_writer.subscribe(
doc_id,
0u32,
term_buffer,
indexing_context,
);
// TODO pass indexing context directly in subscribe function
unordered_term_id_opt = Some(unordered_term_id);
});
if let Some(unordered_term_id) = unordered_term_id_opt {
@@ -222,11 +225,12 @@ impl SegmentWriter {
0
} else {
let mut token_stream = TokenStreamChain::new(offsets, token_streams);
multifield_postings.index_text(
postings_writer.index_text(
doc_id,
field,
&mut token_stream,
term_buffer,
indexing_context,
)
};
self.fieldnorms_writer.record(doc_id, field, num_tokens);
@@ -236,7 +240,7 @@ impl SegmentWriter {
term_buffer.set_field(Type::U64, field);
let u64_val = value.as_u64().ok_or_else(make_schema_error)?;
term_buffer.set_u64(u64_val);
multifield_postings.subscribe(doc_id, term_buffer);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
}
}
FieldType::Date(_) => {
@@ -244,7 +248,7 @@ impl SegmentWriter {
term_buffer.set_field(Type::Date, field);
let date_val = value.as_date().ok_or_else(make_schema_error)?;
term_buffer.set_i64(date_val.timestamp());
multifield_postings.subscribe(doc_id, term_buffer);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
}
}
FieldType::I64(_) => {
@@ -252,7 +256,7 @@ impl SegmentWriter {
term_buffer.set_field(Type::I64, field);
let i64_val = value.as_i64().ok_or_else(make_schema_error)?;
term_buffer.set_i64(i64_val);
multifield_postings.subscribe(doc_id, term_buffer);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
}
}
FieldType::F64(_) => {
@@ -260,7 +264,7 @@ impl SegmentWriter {
term_buffer.set_field(Type::F64, field);
let f64_val = value.as_f64().ok_or_else(make_schema_error)?;
term_buffer.set_f64(f64_val);
multifield_postings.subscribe(doc_id, term_buffer);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
}
}
FieldType::Bytes(_) => {
@@ -268,15 +272,25 @@ impl SegmentWriter {
term_buffer.set_field(Type::Bytes, field);
let bytes = value.as_bytes().ok_or_else(make_schema_error)?;
term_buffer.set_bytes(bytes);
self.multifield_postings.subscribe(doc_id, term_buffer);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
}
}
}
}
doc.filter_fields(|field| schema.get_field_entry(field).is_stored());
doc.prepare_for_store();
Ok(())
}
/// Indexes a new document
///
/// As a user, you should rather use `IndexWriter`'s add_document.
pub fn add_document(&mut self, add_operation: AddOperation) -> crate::Result<()> {
let doc = add_operation.document;
self.doc_opstamps.push(add_operation.opstamp);
self.fast_field_writers.add_document(&doc);
self.index_document(&doc)?;
let prepared_doc = prepare_doc_for_store(doc, &self.schema);
let doc_writer = self.segment_serializer.get_store_writer();
doc_writer.store(&doc)?;
doc_writer.store(&prepared_doc)?;
self.max_doc += 1;
Ok(())
}
@@ -308,9 +322,11 @@ impl SegmentWriter {
///
/// `doc_id_map` is used to map to the new doc_id order.
fn remap_and_write(
multifield_postings: &MultiFieldPostingsWriter,
per_field_postings_writers: &PerFieldPostingsWriter,
indexing_context: IndexingContext,
fast_field_writers: &FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
schema: &Schema,
mut serializer: SegmentSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<()> {
@@ -321,10 +337,13 @@ fn remap_and_write(
.segment()
.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
let term_ord_map = multifield_postings.serialize(
serializer.get_postings_serializer(),
let term_ord_map = serialize_postings(
indexing_context,
per_field_postings_writers,
fieldnorm_readers,
doc_id_map,
schema,
serializer.get_postings_serializer(),
)?;
fast_field_writers.serialize(
serializer.get_fast_field_serializer(),
@@ -348,7 +367,6 @@ fn remap_and_write(
.segment()
.open_read(SegmentComponent::TempStore)?,
)?;
for old_doc_id in doc_id_map.iter_old_doc_ids() {
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
serializer.get_store_writer().store_bytes(&doc_bytes)?;
@@ -360,15 +378,70 @@ fn remap_and_write(
Ok(())
}
/// Prepares Document for being stored in the document store
///
/// Method transforms PreTokenizedString values into String
/// values.
pub fn prepare_doc_for_store(doc: Document, schema: &Schema) -> Document {
Document::from(
doc.into_iter()
.filter(|field_value| schema.get_field_entry(field_value.field()).is_stored())
.map(|field_value| match field_value {
FieldValue {
field,
value: Value::PreTokStr(pre_tokenized_text),
} => FieldValue {
field,
value: Value::Str(pre_tokenized_text.text),
},
field_value => field_value,
})
.collect::<Vec<_>>(),
)
}
#[cfg(test)]
mod tests {
use super::initial_table_size;
use super::compute_initial_table_size;
use crate::schema::{Schema, STORED, TEXT};
use crate::tokenizer::{PreTokenizedString, Token};
use crate::Document;
#[test]
fn test_hashmap_size() {
assert_eq!(initial_table_size(100_000).unwrap(), 11);
assert_eq!(initial_table_size(1_000_000).unwrap(), 14);
assert_eq!(initial_table_size(10_000_000).unwrap(), 17);
assert_eq!(initial_table_size(1_000_000_000).unwrap(), 19);
assert_eq!(compute_initial_table_size(100_000).unwrap(), 1 << 11);
assert_eq!(compute_initial_table_size(1_000_000).unwrap(), 1 << 14);
assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 17);
assert_eq!(compute_initial_table_size(1_000_000_000).unwrap(), 1 << 19);
assert_eq!(compute_initial_table_size(4_000_000_000).unwrap(), 1 << 19);
}
#[test]
fn test_prepare_for_store() {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("title", TEXT | STORED);
let schema = schema_builder.build();
let mut doc = Document::default();
let pre_tokenized_text = PreTokenizedString {
text: String::from("A"),
tokens: vec![Token {
offset_from: 0,
offset_to: 1,
position: 0,
text: String::from("A"),
position_length: 1,
}],
};
doc.add_pre_tokenized_text(text_field, pre_tokenized_text);
doc.add_text(text_field, "title");
let prepared_doc = super::prepare_doc_for_store(doc, &schema);
assert_eq!(prepared_doc.field_values().len(), 2);
assert_eq!(prepared_doc.field_values()[0].value().as_text(), Some("A"));
assert_eq!(
prepared_doc.field_values()[1].value().as_text(),
Some("title")
);
}
}

View File

@@ -52,7 +52,7 @@ macro_rules! doc(
{
let mut document = $crate::Document::default();
$(
document.add($crate::schema::FieldValue::new($field, $value.into()));
document.add_field_value($field, $value);
)*
document
}

View File

@@ -0,0 +1,27 @@
use crate::postings::stacker::{MemoryArena, TermHashMap};
/// IndexingContext contains all of the transient memory arenas
/// required for building the inverted index.
pub(crate) struct IndexingContext {
/// The term index is an adhoc hashmap,
/// itself backed by a dedicated memory arena.
pub term_index: TermHashMap,
/// Arena is a memory arena that stores posting lists / term frequencies / positions.
pub arena: MemoryArena,
}
impl IndexingContext {
/// Create a new IndexingContext given the size of the term hash map.
pub(crate) fn new(table_size: usize) -> IndexingContext {
let term_index = TermHashMap::new(table_size);
IndexingContext {
arena: MemoryArena::new(),
term_index,
}
}
/// Returns the memory usage for the inverted index memory arenas, in bytes.
pub(crate) fn mem_usage(&self) -> usize {
self.term_index.mem_usage() + self.arena.mem_usage()
}
}

View File

@@ -6,6 +6,8 @@ pub(crate) use self::block_search::branchless_binary_search;
mod block_segment_postings;
pub(crate) mod compression;
mod indexing_context;
mod per_field_postings_writer;
mod postings;
mod postings_writer;
mod recorder;
@@ -16,8 +18,10 @@ mod stacker;
mod term_info;
pub use self::block_segment_postings::BlockSegmentPostings;
pub(crate) use self::indexing_context::IndexingContext;
pub(crate) use self::per_field_postings_writer::PerFieldPostingsWriter;
pub use self::postings::Postings;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub(crate) use self::postings_writer::{serialize_postings, PostingsWriter};
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};
@@ -222,7 +226,7 @@ pub mod tests {
{
let mut segment_writer =
SegmentWriter::for_segment(3_000_000, segment.clone(), &schema).unwrap();
SegmentWriter::for_segment(3_000_000, segment.clone(), schema).unwrap();
{
// checking that position works if the field has two values
let op = AddOperation {
@@ -232,14 +236,14 @@ pub mod tests {
text_field => "d d d d a"
),
};
segment_writer.add_document(op, &schema)?;
segment_writer.add_document(op)?;
}
{
let op = AddOperation {
opstamp: 1u64,
document: doc!(text_field => "b a"),
};
segment_writer.add_document(op, &schema).unwrap();
segment_writer.add_document(op).unwrap();
}
for i in 2..1000 {
let mut text: String = "e ".repeat(i);
@@ -248,7 +252,7 @@ pub mod tests {
opstamp: 2u64,
document: doc!(text_field => text),
};
segment_writer.add_document(op, &schema).unwrap();
segment_writer.add_document(op).unwrap();
}
segment_writer.finalize()?;
}

View File

@@ -0,0 +1,53 @@
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{NothingRecorder, TermFrequencyRecorder, TfAndPositionRecorder};
use crate::postings::PostingsWriter;
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
pub(crate) struct PerFieldPostingsWriter {
per_field_postings_writers: Vec<Box<dyn PostingsWriter>>,
}
impl PerFieldPostingsWriter {
pub fn for_schema(schema: &Schema) -> Self {
let per_field_postings_writers = schema
.fields()
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry))
.collect();
PerFieldPostingsWriter {
per_field_postings_writers,
}
}
pub(crate) fn get_for_field(&self, field: Field) -> &dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_ref()
}
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_mut()
}
}
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter> {
match *field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()
.map(|indexing_options| match indexing_options.index_option() {
IndexRecordOption::Basic => {
SpecializedPostingsWriter::<NothingRecorder>::new_boxed()
}
IndexRecordOption::WithFreqs => {
SpecializedPostingsWriter::<TermFrequencyRecorder>::new_boxed()
}
IndexRecordOption::WithFreqsAndPositions => {
SpecializedPostingsWriter::<TfAndPositionRecorder>::new_boxed()
}
})
.unwrap_or_else(SpecializedPostingsWriter::<NothingRecorder>::new_boxed),
FieldType::U64(_)
| FieldType::I64(_)
| FieldType::F64(_)
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::Facet(_) => SpecializedPostingsWriter::<NothingRecorder>::new_boxed(),
}
}

View File

@@ -1,54 +1,23 @@
use std::collections::HashMap;
use std::io;
use std::marker::PhantomData;
use std::ops::{DerefMut, Range};
use std::ops::Range;
use fnv::FnvHashMap;
use super::stacker::{Addr, MemoryArena, TermHashMap};
use super::stacker::Addr;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::recorder::{
BufferLender, NothingRecorder, Recorder, TermFrequencyRecorder, TfAndPositionRecorder,
use crate::postings::recorder::{BufferLender, Recorder};
use crate::postings::{
FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter,
UnorderedTermId,
};
use crate::postings::{FieldSerializer, InvertedIndexSerializer, UnorderedTermId};
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema, Term, Type};
use crate::schema::{Field, FieldType, Schema, Term, Type};
use crate::termdict::TermOrdinal;
use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN};
use crate::DocId;
fn posting_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter> {
match *field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()
.map(|indexing_options| match indexing_options.index_option() {
IndexRecordOption::Basic => {
SpecializedPostingsWriter::<NothingRecorder>::new_boxed()
}
IndexRecordOption::WithFreqs => {
SpecializedPostingsWriter::<TermFrequencyRecorder>::new_boxed()
}
IndexRecordOption::WithFreqsAndPositions => {
SpecializedPostingsWriter::<TfAndPositionRecorder>::new_boxed()
}
})
.unwrap_or_else(SpecializedPostingsWriter::<NothingRecorder>::new_boxed),
FieldType::U64(_)
| FieldType::I64(_)
| FieldType::F64(_)
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::Facet(_) => SpecializedPostingsWriter::<NothingRecorder>::new_boxed(),
}
}
pub struct MultiFieldPostingsWriter {
heap: MemoryArena,
schema: Schema,
term_index: TermHashMap,
per_field_postings_writers: Vec<Box<dyn PostingsWriter>>,
}
fn make_field_partition(
term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)],
) -> Vec<(Field, Range<usize>)> {
@@ -74,133 +43,81 @@ fn make_field_partition(
field_offsets
}
impl MultiFieldPostingsWriter {
/// Create a new `MultiFieldPostingsWriter` given
/// a schema and a heap.
pub fn new(schema: &Schema, table_bits: usize) -> MultiFieldPostingsWriter {
let term_index = TermHashMap::new(table_bits);
let per_field_postings_writers: Vec<_> = schema
.fields()
.map(|(_, field_entry)| posting_from_field_entry(field_entry))
.collect();
MultiFieldPostingsWriter {
heap: MemoryArena::new(),
schema: schema.clone(),
term_index,
per_field_postings_writers,
}
}
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
pub(crate) fn serialize_postings(
indexing_context: IndexingContext,
per_field_postings_writers: &PerFieldPostingsWriter,
fieldnorm_readers: FieldNormReaders,
doc_id_map: Option<&DocIdMapping>,
schema: &Schema,
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(indexing_context.term_index.len());
term_offsets.extend(indexing_context.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
pub fn mem_usage(&self) -> usize {
self.term_index.mem_usage() + self.heap.mem_usage()
}
let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();
pub fn index_text(
&mut self,
doc: DocId,
field: Field,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
) -> u32 {
let postings_writer =
self.per_field_postings_writers[field.field_id() as usize].deref_mut();
postings_writer.index_text(
&mut self.term_index,
doc,
field,
token_stream,
&mut self.heap,
term_buffer,
)
}
let field_offsets = make_field_partition(&term_offsets);
pub fn subscribe(&mut self, doc: DocId, term: &Term) -> UnorderedTermId {
let postings_writer =
self.per_field_postings_writers[term.field().field_id() as usize].deref_mut();
postings_writer.subscribe(&mut self.term_index, doc, 0u32, term, &mut self.heap)
}
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
pub fn serialize(
&self,
serializer: &mut InvertedIndexSerializer,
fieldnorm_readers: FieldNormReaders,
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(self.term_index.len());
term_offsets.extend(self.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();
let field_offsets = make_field_partition(&term_offsets);
for (field, byte_offsets) in field_offsets {
let field_entry = self.schema.get_field_entry(field);
match *field_entry.field_type() {
FieldType::Str(_) | FieldType::Facet(_) => {
// populating the (unordered term ord) -> (ordered term ord) mapping
// for the field.
let unordered_term_ids = term_offsets[byte_offsets.clone()]
.iter()
.map(|&(_, _, bucket)| bucket);
let mapping: FnvHashMap<UnorderedTermId, TermOrdinal> = unordered_term_ids
.enumerate()
.map(|(term_ord, unord_term_id)| {
(unord_term_id as UnorderedTermId, term_ord as TermOrdinal)
})
.collect();
unordered_term_mappings.insert(field, mapping);
}
FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) | FieldType::Date(_) => {}
FieldType::Bytes(_) => {}
for (field, byte_offsets) in field_offsets {
let field_entry = schema.get_field_entry(field);
match *field_entry.field_type() {
FieldType::Str(_) | FieldType::Facet(_) => {
// populating the (unordered term ord) -> (ordered term ord) mapping
// for the field.
let unordered_term_ids = term_offsets[byte_offsets.clone()]
.iter()
.map(|&(_, _, bucket)| bucket);
let mapping: FnvHashMap<UnorderedTermId, TermOrdinal> = unordered_term_ids
.enumerate()
.map(|(term_ord, unord_term_id)| {
(unord_term_id as UnorderedTermId, term_ord as TermOrdinal)
})
.collect();
unordered_term_mappings.insert(field, mapping);
}
let postings_writer =
self.per_field_postings_writers[field.field_id() as usize].as_ref();
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
let mut field_serializer = serializer.new_field(
field,
postings_writer.total_num_tokens(),
fieldnorm_reader,
)?;
postings_writer.serialize(
&term_offsets[byte_offsets],
&mut field_serializer,
&self.term_index.heap,
&self.heap,
doc_id_map,
)?;
field_serializer.close()?;
FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) | FieldType::Date(_) => {}
FieldType::Bytes(_) => {}
}
Ok(unordered_term_mappings)
let postings_writer = per_field_postings_writers.get_for_field(field);
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
let mut field_serializer =
serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?;
postings_writer.serialize(
&term_offsets[byte_offsets],
doc_id_map,
&indexing_context,
&mut field_serializer,
)?;
field_serializer.close()?;
}
Ok(unordered_term_mappings)
}
/// The `PostingsWriter` is in charge of receiving documenting
/// and building a `Segment` in anonymous memory.
///
/// `PostingsWriter` writes in a `MemoryArena`.
pub trait PostingsWriter {
pub(crate) trait PostingsWriter {
/// Record that a document contains a term at a given position.
///
/// * doc - the document id
/// * pos - the term position (expressed in tokens)
/// * term - the term
/// * heap - heap used to store the postings informations as well as the terms
/// in the hashmap.
/// * indexing_context - Contains a term hashmap and a memory arena to store all necessary
/// posting list information.
fn subscribe(
&mut self,
term_index: &mut TermHashMap,
doc: DocId,
pos: u32,
term: &Term,
heap: &mut MemoryArena,
indexing_context: &mut IndexingContext,
) -> UnorderedTermId;
/// Serializes the postings on disk.
@@ -208,28 +125,26 @@ pub trait PostingsWriter {
fn serialize(
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
serializer: &mut FieldSerializer<'_>,
term_heap: &MemoryArena,
heap: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
indexing_context: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()>;
/// Tokenize a text and subscribe all of its token.
fn index_text(
&mut self,
term_index: &mut TermHashMap,
doc_id: DocId,
field: Field,
token_stream: &mut dyn TokenStream,
heap: &mut MemoryArena,
term_buffer: &mut Term,
indexing_context: &mut IndexingContext,
) -> u32 {
term_buffer.set_field(Type::Str, field);
let mut sink = |token: &Token| {
// We skip all tokens with a len greater than u16.
if token.text.len() <= MAX_TOKEN_LEN {
term_buffer.set_text(token.text.as_str());
self.subscribe(term_index, doc_id, token.position as u32, term_buffer, heap);
self.subscribe(doc_id, token.position as u32, term_buffer, indexing_context);
} else {
warn!(
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
@@ -261,7 +176,7 @@ impl<Rec: Recorder + 'static> SpecializedPostingsWriter<Rec> {
}
}
/// Builds a `SpecializedPostingsWriter` storing its data in a heap.
/// Builds a `SpecializedPostingsWriter` storing its data in a memory arena.
pub fn new_boxed() -> Box<dyn PostingsWriter> {
Box::new(SpecializedPostingsWriter::<Rec>::new())
}
@@ -270,27 +185,30 @@ impl<Rec: Recorder + 'static> SpecializedPostingsWriter<Rec> {
impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec> {
fn subscribe(
&mut self,
term_index: &mut TermHashMap,
doc: DocId,
position: u32,
term: &Term,
heap: &mut MemoryArena,
indexing_context: &mut IndexingContext,
) -> UnorderedTermId {
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
let (term_index, arena) = (
&mut indexing_context.term_index,
&mut indexing_context.arena,
);
term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(heap);
recorder.new_doc(doc, heap);
recorder.close_doc(arena);
recorder.new_doc(doc, arena);
}
recorder.record_position(position, heap);
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::new();
recorder.new_doc(doc, heap);
recorder.record_position(position, heap);
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
}) as UnorderedTermId
@@ -299,17 +217,21 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
fn serialize(
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
serializer: &mut FieldSerializer<'_>,
termdict_heap: &MemoryArena,
heap: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
indexing_context: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
let recorder: Rec = termdict_heap.read(*addr);
let recorder: Rec = indexing_context.term_index.read(*addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(&mut buffer_lender, serializer, heap, doc_id_map);
recorder.serialize(
&indexing_context.arena,
doc_id_map,
serializer,
&mut buffer_lender,
);
serializer.close_term()?;
}
Ok(())

View File

@@ -63,19 +63,19 @@ pub(crate) trait Recorder: Copy + 'static {
fn current_doc(&self) -> u32;
/// Starts recording information about a new document
/// This method shall only be called if the term is within the document.
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena);
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena);
/// Record the position of a term. For each document,
/// this method will be called `term_freq` times.
fn record_position(&mut self, position: u32, heap: &mut MemoryArena);
fn record_position(&mut self, position: u32, arena: &mut MemoryArena);
/// Close the document. It will help record the term frequency.
fn close_doc(&mut self, heap: &mut MemoryArena);
fn close_doc(&mut self, arena: &mut MemoryArena);
/// Pushes the postings information to the serializer.
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer<'_>,
heap: &MemoryArena,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
);
/// Returns the number of document containing this term.
///
@@ -102,24 +102,24 @@ impl Recorder for NothingRecorder {
self.current_doc
}
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.current_doc = doc;
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
}
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {}
fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) {}
fn close_doc(&mut self, _heap: &mut MemoryArena) {}
fn close_doc(&mut self, _arena: &mut MemoryArena) {}
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer<'_>,
heap: &MemoryArena,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
) {
let (buffer, doc_ids) = buffer_lender.lend_all();
self.stack.read_to_end(heap, buffer);
self.stack.read_to_end(arena, buffer);
// TODO avoid reading twice.
if let Some(doc_id_map) = doc_id_map {
doc_ids.extend(
@@ -166,31 +166,31 @@ impl Recorder for TermFrequencyRecorder {
self.current_doc
}
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.term_doc_freq += 1;
self.current_doc = doc;
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
}
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {
fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) {
self.current_tf += 1;
}
fn close_doc(&mut self, heap: &mut MemoryArena) {
fn close_doc(&mut self, arena: &mut MemoryArena) {
debug_assert!(self.current_tf > 0);
let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(heap));
let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(arena));
self.current_tf = 0;
}
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer<'_>,
heap: &MemoryArena,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
) {
let buffer = buffer_lender.lend_u8();
self.stack.read_to_end(heap, buffer);
self.stack.read_to_end(arena, buffer);
let mut u32_it = VInt32Reader::new(&buffer[..]);
if let Some(doc_id_map) = doc_id_map {
let mut doc_id_and_tf = vec![];
@@ -236,29 +236,29 @@ impl Recorder for TfAndPositionRecorder {
self.current_doc
}
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.current_doc = doc;
self.term_doc_freq += 1u32;
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
}
fn record_position(&mut self, position: u32, heap: &mut MemoryArena) {
let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(heap));
fn record_position(&mut self, position: u32, arena: &mut MemoryArena) {
let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(arena));
}
fn close_doc(&mut self, heap: &mut MemoryArena) {
let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(heap));
fn close_doc(&mut self, arena: &mut MemoryArena) {
let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(arena));
}
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer<'_>,
heap: &MemoryArena,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
) {
let (buffer_u8, buffer_positions) = buffer_lender.lend_all();
self.stack.read_to_end(heap, buffer_u8);
self.stack.read_to_end(arena, buffer_u8);
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
let mut doc_id_and_positions = vec![];
while let Some(doc) = u32_it.next() {

View File

@@ -65,12 +65,12 @@ pub struct ExpUnrolledLinkedList {
pub struct ExpUnrolledLinkedListWriter<'a> {
eull: &'a mut ExpUnrolledLinkedList,
heap: &'a mut MemoryArena,
arena: &'a mut MemoryArena,
}
fn ensure_capacity<'a>(
eull: &'a mut ExpUnrolledLinkedList,
heap: &'a mut MemoryArena,
arena: &'a mut MemoryArena,
) -> &'a mut [u8] {
if eull.len <= FIRST_BLOCK as u32 {
// We are still hitting the inline block.
@@ -78,22 +78,22 @@ fn ensure_capacity<'a>(
return &mut eull.inlined_data[eull.len as usize..FIRST_BLOCK];
}
// We need to allocate a new block!
let new_block_addr: Addr = heap.allocate_space(FIRST_BLOCK + mem::size_of::<Addr>());
let new_block_addr: Addr = arena.allocate_space(FIRST_BLOCK + mem::size_of::<Addr>());
store(&mut eull.inlined_data[FIRST_BLOCK..], new_block_addr);
eull.tail = new_block_addr;
return heap.slice_mut(eull.tail, FIRST_BLOCK);
return arena.slice_mut(eull.tail, FIRST_BLOCK);
}
let len = match len_to_capacity(eull.len) {
CapacityResult::NeedAlloc(new_block_len) => {
let new_block_addr: Addr =
heap.allocate_space(new_block_len as usize + mem::size_of::<Addr>());
heap.write_at(eull.tail, new_block_addr);
arena.allocate_space(new_block_len as usize + mem::size_of::<Addr>());
arena.write_at(eull.tail, new_block_addr);
eull.tail = new_block_addr;
new_block_len
}
CapacityResult::Available(available) => available,
};
heap.slice_mut(eull.tail, len as usize)
arena.slice_mut(eull.tail, len as usize)
}
impl<'a> ExpUnrolledLinkedListWriter<'a> {
@@ -106,7 +106,7 @@ impl<'a> ExpUnrolledLinkedListWriter<'a> {
while !buf.is_empty() {
let add_len: usize;
{
let output_buf = ensure_capacity(self.eull, self.heap);
let output_buf = ensure_capacity(self.eull, self.arena);
add_len = buf.len().min(output_buf.len());
output_buf[..add_len].copy_from_slice(&buf[..add_len]);
}
@@ -146,11 +146,11 @@ impl ExpUnrolledLinkedList {
}
#[inline]
pub fn writer<'a>(&'a mut self, heap: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> {
ExpUnrolledLinkedListWriter { eull: self, heap }
pub fn writer<'a>(&'a mut self, arena: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> {
ExpUnrolledLinkedListWriter { eull: self, arena }
}
pub fn read_to_end(&self, heap: &MemoryArena, output: &mut Vec<u8>) {
pub fn read_to_end(&self, arena: &MemoryArena, output: &mut Vec<u8>) {
let len = self.len as usize;
if len <= FIRST_BLOCK {
output.extend_from_slice(&self.inlined_data[..len]);
@@ -164,14 +164,14 @@ impl ExpUnrolledLinkedList {
CapacityResult::Available(capacity) => capacity,
CapacityResult::NeedAlloc(capacity) => capacity,
} as usize;
let data = heap.slice(addr, cap);
let data = arena.slice(addr, cap);
if cur + cap >= len {
output.extend_from_slice(&data[..(len - cur)]);
return;
}
output.extend_from_slice(data);
cur += cap;
addr = heap.read(addr.offset(cap as u32));
addr = arena.read(addr.offset(cap as u32));
}
}
}
@@ -185,33 +185,33 @@ mod tests {
use super::{len_to_capacity, *};
#[test]
fn test_stack() {
let mut heap = MemoryArena::new();
fn test_eull() {
let mut arena = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
stack.writer(&mut heap).extend_from_slice(&[1u8]);
stack.writer(&mut heap).extend_from_slice(&[2u8]);
stack.writer(&mut heap).extend_from_slice(&[3u8, 4u8]);
stack.writer(&mut heap).extend_from_slice(&[5u8]);
stack.writer(&mut arena).extend_from_slice(&[1u8]);
stack.writer(&mut arena).extend_from_slice(&[2u8]);
stack.writer(&mut arena).extend_from_slice(&[3u8, 4u8]);
stack.writer(&mut arena).extend_from_slice(&[5u8]);
{
let mut buffer = Vec::new();
stack.read_to_end(&heap, &mut buffer);
stack.read_to_end(&arena, &mut buffer);
assert_eq!(&buffer[..], &[1u8, 2u8, 3u8, 4u8, 5u8]);
}
}
#[test]
fn test_stack_long() {
let mut heap = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
fn test_eull_long() {
let mut arena = MemoryArena::new();
let mut eull = ExpUnrolledLinkedList::new();
let data: Vec<u32> = (0..100).collect();
for &el in &data {
assert!(stack
.writer(&mut heap)
assert!(eull
.writer(&mut arena)
.write_u32::<LittleEndian>(el)
.is_ok());
}
let mut buffer = Vec::new();
stack.read_to_end(&heap, &mut buffer);
eull.read_to_end(&arena, &mut buffer);
let mut result = vec![];
let mut remaining = &buffer[..];
while !remaining.is_empty() {
@@ -222,8 +222,8 @@ mod tests {
}
#[test]
fn test_stack_interlaced() {
let mut heap = MemoryArena::new();
fn test_eull_interlaced() {
let mut eull = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
let mut stack2 = ExpUnrolledLinkedList::new();
@@ -231,11 +231,11 @@ mod tests {
let mut vec2: Vec<u8> = vec![];
for i in 0..9 {
assert!(stack.writer(&mut heap).write_u32::<LittleEndian>(i).is_ok());
assert!(stack.writer(&mut eull).write_u32::<LittleEndian>(i).is_ok());
assert!(vec1.write_u32::<LittleEndian>(i).is_ok());
if i % 2 == 0 {
assert!(stack2
.writer(&mut heap)
.writer(&mut eull)
.write_u32::<LittleEndian>(i)
.is_ok());
assert!(vec2.write_u32::<LittleEndian>(i).is_ok());
@@ -243,8 +243,8 @@ mod tests {
}
let mut res1 = vec![];
let mut res2 = vec![];
stack.read_to_end(&heap, &mut res1);
stack2.read_to_end(&heap, &mut res2);
stack.read_to_end(&eull, &mut res1);
stack2.read_to_end(&eull, &mut res2);
assert_eq!(&vec1[..], &res1[..]);
assert_eq!(&vec2[..], &res2[..]);
}
@@ -331,7 +331,7 @@ mod bench {
#[bench]
fn bench_push_stack(bench: &mut Bencher) {
bench.iter(|| {
let mut heap = MemoryArena::new();
let mut arena = MemoryArena::new();
let mut stacks: Vec<ExpUnrolledLinkedList> =
iter::repeat_with(ExpUnrolledLinkedList::new)
.take(NUM_STACK)
@@ -339,7 +339,7 @@ mod bench {
for s in 0..NUM_STACK {
for i in 0u32..STACK_SIZE {
let t = s * 392017 % NUM_STACK;
let _ = stacks[t].writer(&mut heap).write_u32::<NativeEndian>(i);
let _ = stacks[t].writer(&mut arena).write_u32::<NativeEndian>(i);
}
}
});

View File

@@ -81,6 +81,7 @@ pub fn load<Item: Copy + 'static>(data: &[u8]) -> Item {
}
/// The `MemoryArena`
#[allow(clippy::new_without_default)]
pub struct MemoryArena {
pages: Vec<Page>,
}
@@ -114,7 +115,7 @@ impl MemoryArena {
store(dest, val);
}
/// Read an item in the heap at the given `address`.
/// Read an item in the memory arena at the given `address`.
///
/// # Panics
///

View File

@@ -2,6 +2,6 @@ mod expull;
mod memory_arena;
mod term_hashmap;
pub use self::expull::ExpUnrolledLinkedList;
pub use self::memory_arena::{Addr, MemoryArena};
pub use self::term_hashmap::{compute_table_size, TermHashMap};
pub(crate) use self::expull::ExpUnrolledLinkedList;
pub(crate) use self::memory_arena::{Addr, MemoryArena};
pub(crate) use self::term_hashmap::{compute_table_size, TermHashMap};

View File

@@ -9,14 +9,15 @@ use crate::postings::UnorderedTermId;
use crate::Term;
/// Returns the actual memory size in bytes
/// required to create a table of size $2^num_bits$.
pub fn compute_table_size(num_bits: usize) -> usize {
(1 << num_bits) * mem::size_of::<KeyValue>()
/// required to create a table with a given capacity.
/// required to create a table of size
pub(crate) fn compute_table_size(capacity: usize) -> usize {
capacity * mem::size_of::<KeyValue>()
}
/// `KeyValue` is the item stored in the hash table.
/// The key is actually a `BytesRef` object stored in an external heap.
/// The `value_addr` also points to an address in the heap.
/// The key is actually a `BytesRef` object stored in an external memory arena.
/// The `value_addr` also points to an address in the memory arena.
#[derive(Copy, Clone)]
struct KeyValue {
key_value_addr: Addr,
@@ -43,14 +44,14 @@ impl KeyValue {
/// Customized `HashMap` with string keys
///
/// This `HashMap` takes String as keys. Keys are
/// stored in a user defined heap.
/// stored in a user defined memory arena.
///
/// The quirky API has the benefit of avoiding
/// the computation of the hash of the key twice,
/// or copying the key as long as there is no insert.
pub struct TermHashMap {
table: Box<[KeyValue]>,
pub heap: MemoryArena,
memory_arena: MemoryArena,
mask: usize,
occupied: Vec<usize>,
len: usize,
@@ -91,20 +92,37 @@ impl<'a> Iterator for Iter<'a> {
}
}
/// Returns the greatest power of two lower or equal to `n`.
/// Except if n == 0, in that case, return 1.
///
/// # Panics if n == 0
fn compute_previous_power_of_two(n: usize) -> usize {
assert!(n > 0);
let msb = (63u32 - n.leading_zeros()) as u8;
1 << msb
}
impl TermHashMap {
pub fn new(num_bucket_power_of_2: usize) -> TermHashMap {
let heap = MemoryArena::new();
let table_size = 1 << num_bucket_power_of_2;
let table: Vec<KeyValue> = iter::repeat(KeyValue::default()).take(table_size).collect();
pub(crate) fn new(table_size: usize) -> TermHashMap {
assert!(table_size > 0);
let table_size_power_of_2 = compute_previous_power_of_two(table_size);
let memory_arena = MemoryArena::new();
let table: Vec<KeyValue> = iter::repeat(KeyValue::default())
.take(table_size_power_of_2)
.collect();
TermHashMap {
table: table.into_boxed_slice(),
heap,
mask: table_size - 1,
occupied: Vec::with_capacity(table_size / 2),
memory_arena,
mask: table_size_power_of_2 - 1,
occupied: Vec::with_capacity(table_size_power_of_2 / 2),
len: 0,
}
}
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
self.memory_arena.read(addr)
}
fn probe(&self, hash: u32) -> QuadraticProbing {
QuadraticProbing::compute(hash as usize, self.mask)
}
@@ -119,7 +137,7 @@ impl TermHashMap {
#[inline]
fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
let data = self.heap.slice_from(addr);
let data = self.memory_arena.slice_from(addr);
let key_bytes_len = NativeEndian::read_u16(data) as usize;
let key_bytes: &[u8] = &data[2..][..key_bytes_len];
(key_bytes, addr.offset(2u32 + key_bytes_len as u32))
@@ -209,9 +227,9 @@ impl TermHashMap {
// The key does not exists yet.
let val = updater(None);
let num_bytes = std::mem::size_of::<u16>() + key.len() + std::mem::size_of::<V>();
let key_addr = self.heap.allocate_space(num_bytes);
let key_addr = self.memory_arena.allocate_space(num_bytes);
{
let data = self.heap.slice_mut(key_addr, num_bytes);
let data = self.memory_arena.slice_mut(key_addr, num_bytes);
NativeEndian::write_u16(data, key.len() as u16);
let stop = 2 + key.len();
data[2..stop].copy_from_slice(key);
@@ -220,9 +238,9 @@ impl TermHashMap {
return self.set_bucket(hash, key_addr, bucket);
} else if kv.hash == hash {
if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) {
let v = self.heap.read(val_addr);
let v = self.memory_arena.read(val_addr);
let new_v = updater(Some(v));
self.heap.write_at(val_addr, new_v);
self.memory_arena.write_at(val_addr, new_v);
return kv.unordered_term_id;
}
}
@@ -235,11 +253,11 @@ mod tests {
use std::collections::HashMap;
use super::TermHashMap;
use super::{compute_previous_power_of_two, TermHashMap};
#[test]
fn test_hash_map() {
let mut hash_map: TermHashMap = TermHashMap::new(18);
let mut hash_map: TermHashMap = TermHashMap::new(1 << 18);
hash_map.mutate_or_create(b"abc", |opt_val: Option<u32>| {
assert_eq!(opt_val, None);
3u32
@@ -255,9 +273,17 @@ mod tests {
let mut vanilla_hash_map = HashMap::new();
let iter_values = hash_map.iter();
for (key, addr, _) in iter_values {
let val: u32 = hash_map.heap.read(addr);
let val: u32 = hash_map.memory_arena.read(addr);
vanilla_hash_map.insert(key.to_owned(), val);
}
assert_eq!(vanilla_hash_map.len(), 2);
}
#[test]
fn test_compute_previous_power_of_two() {
assert_eq!(compute_previous_power_of_two(8), 8);
assert_eq!(compute_previous_power_of_two(9), 8);
assert_eq!(compute_previous_power_of_two(7), 4);
assert_eq!(compute_previous_power_of_two(u64::MAX as usize), 1 << 63);
}
}

View File

@@ -26,7 +26,6 @@ impl From<Vec<FieldValue>> for Document {
Document { field_values }
}
}
impl PartialEq for Document {
fn eq(&self, other: &Document) -> bool {
// super slow, but only here for tests
@@ -51,6 +50,16 @@ impl PartialEq for Document {
impl Eq for Document {}
impl IntoIterator for Document {
type Item = FieldValue;
type IntoIter = std::vec::IntoIter<FieldValue>;
fn into_iter(self) -> Self::IntoIter {
self.field_values.into_iter()
}
}
impl Document {
/// Creates a new, empty document object
pub fn new() -> Document {
@@ -67,63 +76,54 @@ impl Document {
self.field_values.is_empty()
}
/// Retain only the field that are matching the
/// predicate given in argument.
pub fn filter_fields<P: Fn(Field) -> bool>(&mut self, predicate: P) {
self.field_values
.retain(|field_value| predicate(field_value.field()));
}
/// Adding a facet to the document.
pub fn add_facet<F>(&mut self, field: Field, path: F)
where Facet: From<F> {
let facet = Facet::from(path);
let value = Value::Facet(facet);
self.add(FieldValue::new(field, value));
self.add_field_value(field, value);
}
/// Add a text field.
pub fn add_text<S: ToString>(&mut self, field: Field, text: S) {
self.add(FieldValue::new(field, Value::Str(text.to_string())));
let value = Value::Str(text.to_string());
self.add_field_value(field, value);
}
/// Add a pre-tokenized text field.
pub fn add_pre_tokenized_text(
&mut self,
field: Field,
pre_tokenized_text: &PreTokenizedString,
) {
let value = Value::PreTokStr(pre_tokenized_text.clone());
self.add(FieldValue::new(field, value));
pub fn add_pre_tokenized_text(&mut self, field: Field, pre_tokenized_text: PreTokenizedString) {
self.add_field_value(field, pre_tokenized_text);
}
/// Add a u64 field
pub fn add_u64(&mut self, field: Field, value: u64) {
self.add(FieldValue::new(field, Value::U64(value)));
self.add_field_value(field, value);
}
/// Add a i64 field
pub fn add_i64(&mut self, field: Field, value: i64) {
self.add(FieldValue::new(field, Value::I64(value)));
self.add_field_value(field, value);
}
/// Add a f64 field
pub fn add_f64(&mut self, field: Field, value: f64) {
self.add(FieldValue::new(field, Value::F64(value)));
self.add_field_value(field, value);
}
/// Add a date field
pub fn add_date(&mut self, field: Field, value: &DateTime) {
self.add(FieldValue::new(field, Value::Date(*value)));
pub fn add_date(&mut self, field: Field, value: DateTime) {
self.add_field_value(field, value);
}
/// Add a bytes field
pub fn add_bytes<T: Into<Vec<u8>>>(&mut self, field: Field, value: T) {
self.add(FieldValue::new(field, Value::Bytes(value.into())))
self.add_field_value(field, value.into())
}
/// Add a field value
pub fn add(&mut self, field_value: FieldValue) {
/// Add a (field, value) to the document.
pub fn add_field_value<T: Into<Value>>(&mut self, field: Field, typed_val: T) {
let value = typed_val.into();
let field_value = FieldValue { field, value };
self.field_values.push(field_value);
}
@@ -180,21 +180,6 @@ impl Document {
pub fn get_first(&self, field: Field) -> Option<&Value> {
self.get_all(field).next()
}
/// Prepares Document for being stored in the document store
///
/// Method transforms PreTokenizedString values into String
/// values.
pub fn prepare_for_store(&mut self) {
for field_value in &mut self.field_values {
if let Value::PreTokStr(pre_tokenized_text) = field_value.value() {
*field_value = FieldValue::new(
field_value.field(),
Value::Str(pre_tokenized_text.text.clone()), //< TODO somehow remove .clone()
);
}
}
}
}
impl BinarySerializable for Document {
@@ -220,7 +205,6 @@ impl BinarySerializable for Document {
mod tests {
use crate::schema::*;
use crate::tokenizer::{PreTokenizedString, Token};
#[test]
fn test_doc() {
@@ -230,38 +214,4 @@ mod tests {
doc.add_text(text_field, "My title");
assert_eq!(doc.field_values().len(), 1);
}
#[test]
fn test_prepare_for_store() {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("title", TEXT);
let mut doc = Document::default();
let pre_tokenized_text = PreTokenizedString {
text: String::from("A"),
tokens: vec![Token {
offset_from: 0,
offset_to: 1,
position: 0,
text: String::from("A"),
position_length: 1,
}],
};
doc.add_pre_tokenized_text(text_field, &pre_tokenized_text);
doc.add_text(text_field, "title");
doc.prepare_for_store();
assert_eq!(doc.field_values().len(), 2);
match doc.field_values()[0].value() {
Value::Str(ref text) => assert_eq!(text, "A"),
_ => panic!("Incorrect variant of Value"),
}
match doc.field_values()[1].value() {
Value::Str(ref text) => assert_eq!(text, "title"),
_ => panic!("Incorrect variant of Value"),
}
}
}

View File

@@ -303,7 +303,7 @@ mod tests {
let naive_date = NaiveDate::from_ymd(1982, 9, 17);
let naive_time = NaiveTime::from_hms(13, 20, 00);
let date_time = DateTime::from_utc(NaiveDateTime::new(naive_date, naive_time), Utc);
doc.add_date(date_field, &date_time);
doc.add_date(date_field, date_time);
let doc_json = schema.to_json(&doc);
assert_eq!(doc_json, r#"{"date":["1982-09-17T13:20:00+00:00"]}"#);
}

View File

@@ -5,10 +5,11 @@ use common::BinarySerializable;
use crate::schema::{Field, Value};
/// `FieldValue` holds together a `Field` and its `Value`.
#[allow(missing_docs)]
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct FieldValue {
field: Field,
value: Value,
pub field: Field,
pub value: Value,
}
impl FieldValue {
@@ -28,6 +29,12 @@ impl FieldValue {
}
}
impl From<FieldValue> for Value {
fn from(field_value: FieldValue) -> Self {
field_value.value
}
}
impl BinarySerializable for FieldValue {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.field.serialize(writer)?;
@@ -37,6 +44,6 @@ impl BinarySerializable for FieldValue {
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let field = Field::deserialize(reader)?;
let value = Value::deserialize(reader)?;
Ok(FieldValue::new(field, value))
Ok(FieldValue { field, value })
}
}

View File

@@ -271,8 +271,7 @@ impl Schema {
for (field_name, values) in named_doc.0 {
if let Some(field) = self.get_field(&field_name) {
for value in values {
let field_value = FieldValue::new(field, value);
document.add(field_value);
document.add_field_value(field, value);
}
}
}
@@ -320,14 +319,14 @@ impl Schema {
let value = field_type
.value_from_json(json_item)
.map_err(|e| DocParsingError::ValueError(field_name.clone(), e))?;
doc.add(FieldValue::new(field, value));
doc.add_field_value(field, value);
}
}
_ => {
let value = field_type
.value_from_json(json_value)
.map_err(|e| DocParsingError::ValueError(field_name.clone(), e))?;
doc.add(FieldValue::new(field, value));
doc.add_field_value(field, value);
}
}
}

View File

@@ -60,9 +60,7 @@ pub mod tests {
use super::*;
use crate::directory::{Directory, RamDirectory, WritePtr};
use crate::fastfield::AliveBitSet;
use crate::schema::{
self, Document, FieldValue, Schema, TextFieldIndexing, TextOptions, STORED, TEXT,
};
use crate::schema::{self, Document, Schema, TextFieldIndexing, TextOptions, STORED, TEXT};
use crate::{Index, Term};
const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \
@@ -86,18 +84,9 @@ pub mod tests {
{
let mut store_writer = StoreWriter::new(writer, compressor);
for i in 0..num_docs {
let mut fields: Vec<FieldValue> = Vec::new();
{
let field_value = FieldValue::new(field_body, From::from(LOREM.to_string()));
fields.push(field_value);
}
{
let title_text = format!("Doc {}", i);
let field_value = FieldValue::new(field_title, From::from(title_text));
fields.push(field_value);
}
// let fields_refs: Vec<&FieldValue> = fields.iter().collect();
let doc = Document::from(fields);
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
doc.add_field_value(field_title, format!("Doc {i}"));
store_writer.store(&doc).unwrap();
}
store_writer.close().unwrap();