diff --git a/examples/basic_search.rs b/examples/basic_search.rs index cbdb36ce8..44b4eec34 100644 --- a/examples/basic_search.rs +++ b/examples/basic_search.rs @@ -73,7 +73,7 @@ fn main() -> tantivy::Result<()> { // multithreaded. // // Here we give tantivy a budget of `50MB`. - // Using a bigger heap for the indexer may increase + // Using a bigger memory_arena for the indexer may increase // throughput, but 50 MB is already plenty. let mut index_writer = index.writer(50_000_000)?; diff --git a/examples/custom_tokenizer.rs b/examples/custom_tokenizer.rs index 35f78f97e..439487d6a 100644 --- a/examples/custom_tokenizer.rs +++ b/examples/custom_tokenizer.rs @@ -62,7 +62,7 @@ fn main() -> tantivy::Result<()> { // multithreaded. // // Here we use a buffer of 50MB per thread. Using a bigger - // heap for the indexer can increase its throughput. + // memory arena for the indexer can increase its throughput. let mut index_writer = index.writer(50_000_000)?; index_writer.add_document(doc!( title => "The Old Man and the Sea", diff --git a/src/core/index.rs b/src/core/index.rs index a5cc0feb3..d9b234655 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -15,7 +15,7 @@ use crate::directory::error::OpenReadError; use crate::directory::MmapDirectory; use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK}; use crate::error::{DataCorruption, TantivyError}; -use crate::indexer::index_writer::{HEAP_SIZE_MIN, MAX_NUM_THREAD}; +use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_ARENA_NUM_BYTES_MIN}; use crate::indexer::segment_updater::save_new_metas; use crate::reader::{IndexReader, IndexReaderBuilder}; use crate::schema::{Field, FieldType, Schema}; @@ -397,17 +397,18 @@ impl Index { /// - `num_threads` defines the number of indexing workers that /// should work at the same time. /// - /// - `overall_heap_size_in_bytes` sets the amount of memory + /// - `overall_memory_arena_in_bytes` sets the amount of memory /// allocated for all indexing thread. - /// Each thread will receive a budget of `overall_heap_size_in_bytes / num_threads`. + /// Each thread will receive a budget of `overall_memory_arena_in_bytes / num_threads`. /// /// # Errors /// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`. - /// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument` + /// If the memory arena per thread is too small or too big, returns + /// `TantivyError::InvalidArgument` pub fn writer_with_num_threads( &self, num_threads: usize, - overall_heap_size_in_bytes: usize, + overall_memory_arena_in_bytes: usize, ) -> crate::Result { let directory_lock = self .directory @@ -423,18 +424,18 @@ impl Index { ), ) })?; - let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads; + let memory_arena_in_bytes_per_thread = overall_memory_arena_in_bytes / num_threads; IndexWriter::new( self, num_threads, - heap_size_in_bytes_per_thread, + memory_arena_in_bytes_per_thread, directory_lock, ) } /// Helper to create an index writer for tests. /// - /// That index writer only simply has a single thread and a heap of 10 MB. + /// That index writer only simply has a single thread and a memory arena of 10 MB. /// Using a single thread gives us a deterministic allocation of DocId. #[cfg(test)] pub fn writer_for_tests(&self) -> crate::Result { @@ -445,19 +446,20 @@ impl Index { /// /// Tantivy will automatically define the number of threads to use, but /// no more than 8 threads. - /// `overall_heap_size_in_bytes` is the total target memory usage that will be split + /// `overall_memory_arena_in_bytes` is the total target memory usage that will be split /// between a given number of threads. /// /// # Errors /// If the lockfile already exists, returns `Error::FileAlreadyExists`. - /// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument` - pub fn writer(&self, overall_heap_size_in_bytes: usize) -> crate::Result { + /// If the memory arena per thread is too small or too big, returns + /// `TantivyError::InvalidArgument` + pub fn writer(&self, memory_arena_num_bytes: usize) -> crate::Result { let mut num_threads = std::cmp::min(num_cpus::get(), MAX_NUM_THREAD); - let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads; - if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN { - num_threads = (overall_heap_size_in_bytes / HEAP_SIZE_MIN).max(1); + let memory_arena_num_bytes_per_thread = memory_arena_num_bytes / num_threads; + if memory_arena_num_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN { + num_threads = (memory_arena_num_bytes / MEMORY_ARENA_NUM_BYTES_MIN).max(1); } - self.writer_with_num_threads(num_threads, overall_heap_size_in_bytes) + self.writer_with_num_threads(num_threads, memory_arena_num_bytes) } /// Accessor to the index settings diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 6114cedc3..d94a0bf6f 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -79,7 +79,7 @@ impl MultiValuedFastFieldWriter { // facets are indexed in the `SegmentWriter` as we encode their unordered id. if !self.is_facet { for field_value in doc.field_values() { - if field_value.field() == self.field { + if field_value.field == self.field { self.add_val(value_to_u64(field_value.value())); } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 8d6c0c40e..d2b29c1b3 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -26,13 +26,13 @@ use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; use crate::schema::{Document, IndexRecordOption, Term}; use crate::Opstamp; -// Size of the margin for the heap. A segment is closed when the remaining memory -// in the heap goes below MARGIN_IN_BYTES. +// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory +// in the `memory_arena` goes below MARGIN_IN_BYTES. pub const MARGIN_IN_BYTES: usize = 1_000_000; // We impose the memory per thread to be at least 3 MB. -pub const HEAP_SIZE_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize; -pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES; +pub const MEMORY_ARENA_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize; +pub const MEMORY_ARENA_NUM_BYTES_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES; // We impose the number of index writter thread to be at most this. pub const MAX_NUM_THREAD: usize = 8; @@ -61,7 +61,7 @@ pub struct IndexWriter { index: Index, - heap_size_in_bytes_per_thread: usize, + memory_arena_in_bytes_per_thread: usize, workers_join_handle: Vec>>, @@ -179,10 +179,10 @@ fn index_documents( ) -> crate::Result<()> { let schema = segment.schema(); - let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?; + let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), schema)?; for document_group in grouped_document_iterator { for doc in document_group { - segment_writer.add_document(doc, &schema)?; + segment_writer.add_document(doc)?; } let mem_usage = segment_writer.mem_usage(); if mem_usage >= memory_budget - MARGIN_IN_BYTES { @@ -268,22 +268,26 @@ impl IndexWriter { /// should work at the same time. /// # Errors /// If the lockfile already exists, returns `Error::FileAlreadyExists`. - /// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument` + /// If the memory arena per thread is too small or too big, returns + /// `TantivyError::InvalidArgument` pub(crate) fn new( index: &Index, num_threads: usize, - heap_size_in_bytes_per_thread: usize, + memory_arena_in_bytes_per_thread: usize, directory_lock: DirectoryLock, ) -> crate::Result { - if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN { + if memory_arena_in_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN { let err_msg = format!( - "The heap size per thread needs to be at least {}.", - HEAP_SIZE_MIN + "The memory arena in bytes per thread needs to be at least {}.", + MEMORY_ARENA_NUM_BYTES_MIN ); return Err(TantivyError::InvalidArgument(err_msg)); } - if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX { - let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX); + if memory_arena_in_bytes_per_thread >= MEMORY_ARENA_NUM_BYTES_MAX { + let err_msg = format!( + "The memory arena in bytes per thread cannot exceed {}", + MEMORY_ARENA_NUM_BYTES_MAX + ); return Err(TantivyError::InvalidArgument(err_msg)); } let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) = @@ -301,7 +305,7 @@ impl IndexWriter { let mut index_writer = IndexWriter { _directory_lock: Some(directory_lock), - heap_size_in_bytes_per_thread, + memory_arena_in_bytes_per_thread, index: index.clone(), index_writer_status: IndexWriterStatus::from(document_receiver), @@ -401,7 +405,7 @@ impl IndexWriter { let mut delete_cursor = self.delete_queue.cursor(); - let mem_budget = self.heap_size_in_bytes_per_thread; + let mem_budget = self.memory_arena_in_bytes_per_thread; let index = self.index.clone(); let join_handle: JoinHandle> = thread::Builder::new() .name(format!("thrd-tantivy-index{}", self.worker_id)) @@ -560,7 +564,7 @@ impl IndexWriter { let new_index_writer: IndexWriter = IndexWriter::new( &self.index, self.num_threads, - self.heap_size_in_bytes_per_thread, + self.memory_arena_in_bytes_per_thread, directory_lock, )?; diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index fd7194d62..d2b4fd213 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -4,31 +4,33 @@ use crate::core::Segment; use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::indexer::segment_serializer::SegmentSerializer; -use crate::postings::{compute_table_size, MultiFieldPostingsWriter}; -use crate::schema::{Field, FieldEntry, FieldType, Schema, Term, Type, Value}; +use crate::postings::{ + compute_table_size, serialize_postings, IndexingContext, PerFieldPostingsWriter, PostingsWriter, +}; +use crate::schema::{Field, FieldEntry, FieldType, FieldValue, Schema, Term, Type, Value}; use crate::store::{StoreReader, StoreWriter}; use crate::tokenizer::{ BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, TokenStreamChain, Tokenizer, }; -use crate::{DocId, Opstamp, SegmentComponent}; +use crate::{DocId, Document, Opstamp, SegmentComponent}; /// Computes the initial size of the hash table. /// -/// Returns a number of bit `b`, such that the recommended initial table size is 2^b. -fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result { +/// Returns the recommended initial table size as a power of 2. +/// +/// Note this is a very dumb way to compute log2, but it is easier to proofread that way. +fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result { let table_memory_upper_bound = per_thread_memory_budget / 3; - if let Some(limit) = (10..) - .take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_memory_upper_bound) + (10..20) // We cap it at 2^19 = 512K capacity. + .map(|power| 1 << power) + .take_while(|capacity| compute_table_size(*capacity) < table_memory_upper_bound) .last() - { - Ok(limit.min(19)) // we cap it at 2^19 = 512K. - } else { - Err(crate::TantivyError::InvalidArgument(format!( - "per thread memory budget (={}) is too small. Raise the memory budget or lower the \ - number of threads.", - per_thread_memory_budget - ))) - } + .ok_or_else(|| { + crate::TantivyError::InvalidArgument(format!( + "per thread memory budget (={per_thread_memory_budget}) is too small. Raise the \ + memory budget or lower the number of threads." + )) + }) } fn remap_doc_opstamps( @@ -52,13 +54,15 @@ fn remap_doc_opstamps( /// The segment is layed on disk when the segment gets `finalized`. pub struct SegmentWriter { pub(crate) max_doc: DocId, - pub(crate) multifield_postings: MultiFieldPostingsWriter, + pub(crate) indexing_context: IndexingContext, + pub(crate) per_field_postings_writers: PerFieldPostingsWriter, pub(crate) segment_serializer: SegmentSerializer, pub(crate) fast_field_writers: FastFieldsWriter, pub(crate) fieldnorms_writer: FieldNormsWriter, pub(crate) doc_opstamps: Vec, tokenizers: Vec>, term_buffer: Term, + schema: Schema, } impl SegmentWriter { @@ -66,20 +70,20 @@ impl SegmentWriter { /// /// The arguments are defined as follows /// - /// - heap: most of the segment writer data (terms, and postings lists recorders) - /// is stored in a user-defined heap object. This makes it possible for the user to define - /// the flushing behavior as a buffer limit + /// - memory_budget: most of the segment writer data (terms, and postings lists recorders) + /// is stored in a memory arena. This makes it possible for the user to define + /// the flushing behavior as a memory limit. /// - segment: The segment being written /// - schema pub fn for_segment( - memory_budget: usize, + memory_budget_in_bytes: usize, segment: Segment, - schema: &Schema, + schema: Schema, ) -> crate::Result { let tokenizer_manager = segment.index().tokenizers().clone(); - let table_num_bits = initial_table_size(memory_budget)?; + let table_size = compute_initial_table_size(memory_budget_in_bytes)?; let segment_serializer = SegmentSerializer::for_segment(segment, false)?; - let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits); + let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema); let tokenizers = schema .fields() .map( @@ -96,13 +100,15 @@ impl SegmentWriter { .collect(); Ok(SegmentWriter { max_doc: 0, - multifield_postings, - fieldnorms_writer: FieldNormsWriter::for_schema(schema), + indexing_context: IndexingContext::new(table_size), + per_field_postings_writers, + fieldnorms_writer: FieldNormsWriter::for_schema(&schema), segment_serializer, - fast_field_writers: FastFieldsWriter::from_schema(schema), + fast_field_writers: FastFieldsWriter::from_schema(&schema), doc_opstamps: Vec::with_capacity(1_000), tokenizers, term_buffer: Term::new(), + schema, }) } @@ -122,9 +128,11 @@ impl SegmentWriter { .map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self)) .transpose()?; remap_and_write( - &self.multifield_postings, + &self.per_field_postings_writers, + self.indexing_context, &self.fast_field_writers, &self.fieldnorms_writer, + &self.schema, self.segment_serializer, mapping.as_ref(), )?; @@ -133,28 +141,16 @@ impl SegmentWriter { } pub fn mem_usage(&self) -> usize { - self.multifield_postings.mem_usage() + self.indexing_context.mem_usage() + self.fieldnorms_writer.mem_usage() + self.fast_field_writers.mem_usage() + self.segment_serializer.mem_usage() } - /// Indexes a new document - /// - /// As a user, you should rather use `IndexWriter`'s add_document. - pub fn add_document( - &mut self, - add_operation: AddOperation, - schema: &Schema, - ) -> crate::Result<()> { + fn index_document(&mut self, doc: &Document) -> crate::Result<()> { let doc_id = self.max_doc; - let mut doc = add_operation.document; - self.doc_opstamps.push(add_operation.opstamp); - - self.fast_field_writers.add_document(&doc); - for (field, values) in doc.get_sorted_field_values() { - let field_entry = schema.get_field_entry(field); + let field_entry = self.schema.get_field_entry(field); let make_schema_error = || { crate::TantivyError::SchemaError(format!( "Expected a {:?} for field {:?}", @@ -165,8 +161,10 @@ impl SegmentWriter { if !field_entry.is_indexed() { continue; } - let (term_buffer, multifield_postings) = - (&mut self.term_buffer, &mut self.multifield_postings); + let (term_buffer, indexing_context) = + (&mut self.term_buffer, &mut self.indexing_context); + let postings_writer: &mut dyn PostingsWriter = + self.per_field_postings_writers.get_for_field_mut(field); match *field_entry.field_type() { FieldType::Facet(_) => { term_buffer.set_field(Type::Facet, field); @@ -178,8 +176,13 @@ impl SegmentWriter { .token_stream(facet_str) .process(&mut |token| { term_buffer.set_text(&token.text); - let unordered_term_id = - multifield_postings.subscribe(doc_id, term_buffer); + let unordered_term_id = postings_writer.subscribe( + doc_id, + 0u32, + term_buffer, + indexing_context, + ); + // TODO pass indexing context directly in subscribe function unordered_term_id_opt = Some(unordered_term_id); }); if let Some(unordered_term_id) = unordered_term_id_opt { @@ -222,11 +225,12 @@ impl SegmentWriter { 0 } else { let mut token_stream = TokenStreamChain::new(offsets, token_streams); - multifield_postings.index_text( + postings_writer.index_text( doc_id, field, &mut token_stream, term_buffer, + indexing_context, ) }; self.fieldnorms_writer.record(doc_id, field, num_tokens); @@ -236,7 +240,7 @@ impl SegmentWriter { term_buffer.set_field(Type::U64, field); let u64_val = value.as_u64().ok_or_else(make_schema_error)?; term_buffer.set_u64(u64_val); - multifield_postings.subscribe(doc_id, term_buffer); + postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); } } FieldType::Date(_) => { @@ -244,7 +248,7 @@ impl SegmentWriter { term_buffer.set_field(Type::Date, field); let date_val = value.as_date().ok_or_else(make_schema_error)?; term_buffer.set_i64(date_val.timestamp()); - multifield_postings.subscribe(doc_id, term_buffer); + postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); } } FieldType::I64(_) => { @@ -252,7 +256,7 @@ impl SegmentWriter { term_buffer.set_field(Type::I64, field); let i64_val = value.as_i64().ok_or_else(make_schema_error)?; term_buffer.set_i64(i64_val); - multifield_postings.subscribe(doc_id, term_buffer); + postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); } } FieldType::F64(_) => { @@ -260,7 +264,7 @@ impl SegmentWriter { term_buffer.set_field(Type::F64, field); let f64_val = value.as_f64().ok_or_else(make_schema_error)?; term_buffer.set_f64(f64_val); - multifield_postings.subscribe(doc_id, term_buffer); + postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); } } FieldType::Bytes(_) => { @@ -268,15 +272,25 @@ impl SegmentWriter { term_buffer.set_field(Type::Bytes, field); let bytes = value.as_bytes().ok_or_else(make_schema_error)?; term_buffer.set_bytes(bytes); - self.multifield_postings.subscribe(doc_id, term_buffer); + postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); } } } } - doc.filter_fields(|field| schema.get_field_entry(field).is_stored()); - doc.prepare_for_store(); + Ok(()) + } + + /// Indexes a new document + /// + /// As a user, you should rather use `IndexWriter`'s add_document. + pub fn add_document(&mut self, add_operation: AddOperation) -> crate::Result<()> { + let doc = add_operation.document; + self.doc_opstamps.push(add_operation.opstamp); + self.fast_field_writers.add_document(&doc); + self.index_document(&doc)?; + let prepared_doc = prepare_doc_for_store(doc, &self.schema); let doc_writer = self.segment_serializer.get_store_writer(); - doc_writer.store(&doc)?; + doc_writer.store(&prepared_doc)?; self.max_doc += 1; Ok(()) } @@ -308,9 +322,11 @@ impl SegmentWriter { /// /// `doc_id_map` is used to map to the new doc_id order. fn remap_and_write( - multifield_postings: &MultiFieldPostingsWriter, + per_field_postings_writers: &PerFieldPostingsWriter, + indexing_context: IndexingContext, fast_field_writers: &FastFieldsWriter, fieldnorms_writer: &FieldNormsWriter, + schema: &Schema, mut serializer: SegmentSerializer, doc_id_map: Option<&DocIdMapping>, ) -> crate::Result<()> { @@ -321,10 +337,13 @@ fn remap_and_write( .segment() .open_read(SegmentComponent::FieldNorms)?; let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; - let term_ord_map = multifield_postings.serialize( - serializer.get_postings_serializer(), + let term_ord_map = serialize_postings( + indexing_context, + per_field_postings_writers, fieldnorm_readers, doc_id_map, + schema, + serializer.get_postings_serializer(), )?; fast_field_writers.serialize( serializer.get_fast_field_serializer(), @@ -348,7 +367,6 @@ fn remap_and_write( .segment() .open_read(SegmentComponent::TempStore)?, )?; - for old_doc_id in doc_id_map.iter_old_doc_ids() { let doc_bytes = store_read.get_document_bytes(old_doc_id)?; serializer.get_store_writer().store_bytes(&doc_bytes)?; @@ -360,15 +378,70 @@ fn remap_and_write( Ok(()) } +/// Prepares Document for being stored in the document store +/// +/// Method transforms PreTokenizedString values into String +/// values. +pub fn prepare_doc_for_store(doc: Document, schema: &Schema) -> Document { + Document::from( + doc.into_iter() + .filter(|field_value| schema.get_field_entry(field_value.field()).is_stored()) + .map(|field_value| match field_value { + FieldValue { + field, + value: Value::PreTokStr(pre_tokenized_text), + } => FieldValue { + field, + value: Value::Str(pre_tokenized_text.text), + }, + field_value => field_value, + }) + .collect::>(), + ) +} + #[cfg(test)] mod tests { - use super::initial_table_size; + use super::compute_initial_table_size; + use crate::schema::{Schema, STORED, TEXT}; + use crate::tokenizer::{PreTokenizedString, Token}; + use crate::Document; #[test] fn test_hashmap_size() { - assert_eq!(initial_table_size(100_000).unwrap(), 11); - assert_eq!(initial_table_size(1_000_000).unwrap(), 14); - assert_eq!(initial_table_size(10_000_000).unwrap(), 17); - assert_eq!(initial_table_size(1_000_000_000).unwrap(), 19); + assert_eq!(compute_initial_table_size(100_000).unwrap(), 1 << 11); + assert_eq!(compute_initial_table_size(1_000_000).unwrap(), 1 << 14); + assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 17); + assert_eq!(compute_initial_table_size(1_000_000_000).unwrap(), 1 << 19); + assert_eq!(compute_initial_table_size(4_000_000_000).unwrap(), 1 << 19); + } + + #[test] + fn test_prepare_for_store() { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("title", TEXT | STORED); + let schema = schema_builder.build(); + let mut doc = Document::default(); + let pre_tokenized_text = PreTokenizedString { + text: String::from("A"), + tokens: vec![Token { + offset_from: 0, + offset_to: 1, + position: 0, + text: String::from("A"), + position_length: 1, + }], + }; + + doc.add_pre_tokenized_text(text_field, pre_tokenized_text); + doc.add_text(text_field, "title"); + let prepared_doc = super::prepare_doc_for_store(doc, &schema); + + assert_eq!(prepared_doc.field_values().len(), 2); + assert_eq!(prepared_doc.field_values()[0].value().as_text(), Some("A")); + assert_eq!( + prepared_doc.field_values()[1].value().as_text(), + Some("title") + ); } } diff --git a/src/macros.rs b/src/macros.rs index 303e79a3e..b4332128c 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -52,7 +52,7 @@ macro_rules! doc( { let mut document = $crate::Document::default(); $( - document.add($crate::schema::FieldValue::new($field, $value.into())); + document.add_field_value($field, $value); )* document } diff --git a/src/postings/indexing_context.rs b/src/postings/indexing_context.rs new file mode 100644 index 000000000..9fc8a922f --- /dev/null +++ b/src/postings/indexing_context.rs @@ -0,0 +1,27 @@ +use crate::postings::stacker::{MemoryArena, TermHashMap}; + +/// IndexingContext contains all of the transient memory arenas +/// required for building the inverted index. +pub(crate) struct IndexingContext { + /// The term index is an adhoc hashmap, + /// itself backed by a dedicated memory arena. + pub term_index: TermHashMap, + /// Arena is a memory arena that stores posting lists / term frequencies / positions. + pub arena: MemoryArena, +} + +impl IndexingContext { + /// Create a new IndexingContext given the size of the term hash map. + pub(crate) fn new(table_size: usize) -> IndexingContext { + let term_index = TermHashMap::new(table_size); + IndexingContext { + arena: MemoryArena::new(), + term_index, + } + } + + /// Returns the memory usage for the inverted index memory arenas, in bytes. + pub(crate) fn mem_usage(&self) -> usize { + self.term_index.mem_usage() + self.arena.mem_usage() + } +} diff --git a/src/postings/mod.rs b/src/postings/mod.rs index a66fe5f74..39b8c2f52 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -6,6 +6,8 @@ pub(crate) use self::block_search::branchless_binary_search; mod block_segment_postings; pub(crate) mod compression; +mod indexing_context; +mod per_field_postings_writer; mod postings; mod postings_writer; mod recorder; @@ -16,8 +18,10 @@ mod stacker; mod term_info; pub use self::block_segment_postings::BlockSegmentPostings; +pub(crate) use self::indexing_context::IndexingContext; +pub(crate) use self::per_field_postings_writer::PerFieldPostingsWriter; pub use self::postings::Postings; -pub(crate) use self::postings_writer::MultiFieldPostingsWriter; +pub(crate) use self::postings_writer::{serialize_postings, PostingsWriter}; pub use self::segment_postings::SegmentPostings; pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; pub(crate) use self::skip::{BlockInfo, SkipReader}; @@ -222,7 +226,7 @@ pub mod tests { { let mut segment_writer = - SegmentWriter::for_segment(3_000_000, segment.clone(), &schema).unwrap(); + SegmentWriter::for_segment(3_000_000, segment.clone(), schema).unwrap(); { // checking that position works if the field has two values let op = AddOperation { @@ -232,14 +236,14 @@ pub mod tests { text_field => "d d d d a" ), }; - segment_writer.add_document(op, &schema)?; + segment_writer.add_document(op)?; } { let op = AddOperation { opstamp: 1u64, document: doc!(text_field => "b a"), }; - segment_writer.add_document(op, &schema).unwrap(); + segment_writer.add_document(op).unwrap(); } for i in 2..1000 { let mut text: String = "e ".repeat(i); @@ -248,7 +252,7 @@ pub mod tests { opstamp: 2u64, document: doc!(text_field => text), }; - segment_writer.add_document(op, &schema).unwrap(); + segment_writer.add_document(op).unwrap(); } segment_writer.finalize()?; } diff --git a/src/postings/per_field_postings_writer.rs b/src/postings/per_field_postings_writer.rs new file mode 100644 index 000000000..4c32cb51e --- /dev/null +++ b/src/postings/per_field_postings_writer.rs @@ -0,0 +1,53 @@ +use crate::postings::postings_writer::SpecializedPostingsWriter; +use crate::postings::recorder::{NothingRecorder, TermFrequencyRecorder, TfAndPositionRecorder}; +use crate::postings::PostingsWriter; +use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema}; + +pub(crate) struct PerFieldPostingsWriter { + per_field_postings_writers: Vec>, +} + +impl PerFieldPostingsWriter { + pub fn for_schema(schema: &Schema) -> Self { + let per_field_postings_writers = schema + .fields() + .map(|(_, field_entry)| posting_writer_from_field_entry(field_entry)) + .collect(); + PerFieldPostingsWriter { + per_field_postings_writers, + } + } + + pub(crate) fn get_for_field(&self, field: Field) -> &dyn PostingsWriter { + self.per_field_postings_writers[field.field_id() as usize].as_ref() + } + + pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter { + self.per_field_postings_writers[field.field_id() as usize].as_mut() + } +} + +fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box { + match *field_entry.field_type() { + FieldType::Str(ref text_options) => text_options + .get_indexing_options() + .map(|indexing_options| match indexing_options.index_option() { + IndexRecordOption::Basic => { + SpecializedPostingsWriter::::new_boxed() + } + IndexRecordOption::WithFreqs => { + SpecializedPostingsWriter::::new_boxed() + } + IndexRecordOption::WithFreqsAndPositions => { + SpecializedPostingsWriter::::new_boxed() + } + }) + .unwrap_or_else(SpecializedPostingsWriter::::new_boxed), + FieldType::U64(_) + | FieldType::I64(_) + | FieldType::F64(_) + | FieldType::Date(_) + | FieldType::Bytes(_) + | FieldType::Facet(_) => SpecializedPostingsWriter::::new_boxed(), + } +} diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 72726888d..6e391ab12 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -1,54 +1,23 @@ use std::collections::HashMap; use std::io; use std::marker::PhantomData; -use std::ops::{DerefMut, Range}; +use std::ops::Range; use fnv::FnvHashMap; -use super::stacker::{Addr, MemoryArena, TermHashMap}; +use super::stacker::Addr; use crate::fieldnorm::FieldNormReaders; use crate::indexer::doc_id_mapping::DocIdMapping; -use crate::postings::recorder::{ - BufferLender, NothingRecorder, Recorder, TermFrequencyRecorder, TfAndPositionRecorder, +use crate::postings::recorder::{BufferLender, Recorder}; +use crate::postings::{ + FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter, + UnorderedTermId, }; -use crate::postings::{FieldSerializer, InvertedIndexSerializer, UnorderedTermId}; -use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema, Term, Type}; +use crate::schema::{Field, FieldType, Schema, Term, Type}; use crate::termdict::TermOrdinal; use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN}; use crate::DocId; -fn posting_from_field_entry(field_entry: &FieldEntry) -> Box { - match *field_entry.field_type() { - FieldType::Str(ref text_options) => text_options - .get_indexing_options() - .map(|indexing_options| match indexing_options.index_option() { - IndexRecordOption::Basic => { - SpecializedPostingsWriter::::new_boxed() - } - IndexRecordOption::WithFreqs => { - SpecializedPostingsWriter::::new_boxed() - } - IndexRecordOption::WithFreqsAndPositions => { - SpecializedPostingsWriter::::new_boxed() - } - }) - .unwrap_or_else(SpecializedPostingsWriter::::new_boxed), - FieldType::U64(_) - | FieldType::I64(_) - | FieldType::F64(_) - | FieldType::Date(_) - | FieldType::Bytes(_) - | FieldType::Facet(_) => SpecializedPostingsWriter::::new_boxed(), - } -} - -pub struct MultiFieldPostingsWriter { - heap: MemoryArena, - schema: Schema, - term_index: TermHashMap, - per_field_postings_writers: Vec>, -} - fn make_field_partition( term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)], ) -> Vec<(Field, Range)> { @@ -74,133 +43,81 @@ fn make_field_partition( field_offsets } -impl MultiFieldPostingsWriter { - /// Create a new `MultiFieldPostingsWriter` given - /// a schema and a heap. - pub fn new(schema: &Schema, table_bits: usize) -> MultiFieldPostingsWriter { - let term_index = TermHashMap::new(table_bits); - let per_field_postings_writers: Vec<_> = schema - .fields() - .map(|(_, field_entry)| posting_from_field_entry(field_entry)) - .collect(); - MultiFieldPostingsWriter { - heap: MemoryArena::new(), - schema: schema.clone(), - term_index, - per_field_postings_writers, - } - } +/// Serialize the inverted index. +/// It pushes all term, one field at a time, towards the +/// postings serializer. +pub(crate) fn serialize_postings( + indexing_context: IndexingContext, + per_field_postings_writers: &PerFieldPostingsWriter, + fieldnorm_readers: FieldNormReaders, + doc_id_map: Option<&DocIdMapping>, + schema: &Schema, + serializer: &mut InvertedIndexSerializer, +) -> crate::Result>> { + let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> = + Vec::with_capacity(indexing_context.term_index.len()); + term_offsets.extend(indexing_context.term_index.iter()); + term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone()); - pub fn mem_usage(&self) -> usize { - self.term_index.mem_usage() + self.heap.mem_usage() - } + let mut unordered_term_mappings: HashMap> = + HashMap::new(); - pub fn index_text( - &mut self, - doc: DocId, - field: Field, - token_stream: &mut dyn TokenStream, - term_buffer: &mut Term, - ) -> u32 { - let postings_writer = - self.per_field_postings_writers[field.field_id() as usize].deref_mut(); - postings_writer.index_text( - &mut self.term_index, - doc, - field, - token_stream, - &mut self.heap, - term_buffer, - ) - } + let field_offsets = make_field_partition(&term_offsets); - pub fn subscribe(&mut self, doc: DocId, term: &Term) -> UnorderedTermId { - let postings_writer = - self.per_field_postings_writers[term.field().field_id() as usize].deref_mut(); - postings_writer.subscribe(&mut self.term_index, doc, 0u32, term, &mut self.heap) - } - - /// Serialize the inverted index. - /// It pushes all term, one field at a time, towards the - /// postings serializer. - pub fn serialize( - &self, - serializer: &mut InvertedIndexSerializer, - fieldnorm_readers: FieldNormReaders, - doc_id_map: Option<&DocIdMapping>, - ) -> crate::Result>> { - let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> = - Vec::with_capacity(self.term_index.len()); - term_offsets.extend(self.term_index.iter()); - term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone()); - - let mut unordered_term_mappings: HashMap> = - HashMap::new(); - - let field_offsets = make_field_partition(&term_offsets); - - for (field, byte_offsets) in field_offsets { - let field_entry = self.schema.get_field_entry(field); - - match *field_entry.field_type() { - FieldType::Str(_) | FieldType::Facet(_) => { - // populating the (unordered term ord) -> (ordered term ord) mapping - // for the field. - let unordered_term_ids = term_offsets[byte_offsets.clone()] - .iter() - .map(|&(_, _, bucket)| bucket); - let mapping: FnvHashMap = unordered_term_ids - .enumerate() - .map(|(term_ord, unord_term_id)| { - (unord_term_id as UnorderedTermId, term_ord as TermOrdinal) - }) - .collect(); - unordered_term_mappings.insert(field, mapping); - } - FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) | FieldType::Date(_) => {} - FieldType::Bytes(_) => {} + for (field, byte_offsets) in field_offsets { + let field_entry = schema.get_field_entry(field); + match *field_entry.field_type() { + FieldType::Str(_) | FieldType::Facet(_) => { + // populating the (unordered term ord) -> (ordered term ord) mapping + // for the field. + let unordered_term_ids = term_offsets[byte_offsets.clone()] + .iter() + .map(|&(_, _, bucket)| bucket); + let mapping: FnvHashMap = unordered_term_ids + .enumerate() + .map(|(term_ord, unord_term_id)| { + (unord_term_id as UnorderedTermId, term_ord as TermOrdinal) + }) + .collect(); + unordered_term_mappings.insert(field, mapping); } - - let postings_writer = - self.per_field_postings_writers[field.field_id() as usize].as_ref(); - let fieldnorm_reader = fieldnorm_readers.get_field(field)?; - let mut field_serializer = serializer.new_field( - field, - postings_writer.total_num_tokens(), - fieldnorm_reader, - )?; - postings_writer.serialize( - &term_offsets[byte_offsets], - &mut field_serializer, - &self.term_index.heap, - &self.heap, - doc_id_map, - )?; - field_serializer.close()?; + FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) | FieldType::Date(_) => {} + FieldType::Bytes(_) => {} } - Ok(unordered_term_mappings) + + let postings_writer = per_field_postings_writers.get_for_field(field); + let fieldnorm_reader = fieldnorm_readers.get_field(field)?; + let mut field_serializer = + serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?; + postings_writer.serialize( + &term_offsets[byte_offsets], + doc_id_map, + &indexing_context, + &mut field_serializer, + )?; + field_serializer.close()?; } + Ok(unordered_term_mappings) } /// The `PostingsWriter` is in charge of receiving documenting /// and building a `Segment` in anonymous memory. /// /// `PostingsWriter` writes in a `MemoryArena`. -pub trait PostingsWriter { +pub(crate) trait PostingsWriter { /// Record that a document contains a term at a given position. /// /// * doc - the document id /// * pos - the term position (expressed in tokens) /// * term - the term - /// * heap - heap used to store the postings informations as well as the terms - /// in the hashmap. + /// * indexing_context - Contains a term hashmap and a memory arena to store all necessary + /// posting list information. fn subscribe( &mut self, - term_index: &mut TermHashMap, doc: DocId, pos: u32, term: &Term, - heap: &mut MemoryArena, + indexing_context: &mut IndexingContext, ) -> UnorderedTermId; /// Serializes the postings on disk. @@ -208,28 +125,26 @@ pub trait PostingsWriter { fn serialize( &self, term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)], - serializer: &mut FieldSerializer<'_>, - term_heap: &MemoryArena, - heap: &MemoryArena, doc_id_map: Option<&DocIdMapping>, + indexing_context: &IndexingContext, + serializer: &mut FieldSerializer, ) -> io::Result<()>; /// Tokenize a text and subscribe all of its token. fn index_text( &mut self, - term_index: &mut TermHashMap, doc_id: DocId, field: Field, token_stream: &mut dyn TokenStream, - heap: &mut MemoryArena, term_buffer: &mut Term, + indexing_context: &mut IndexingContext, ) -> u32 { term_buffer.set_field(Type::Str, field); let mut sink = |token: &Token| { // We skip all tokens with a len greater than u16. if token.text.len() <= MAX_TOKEN_LEN { term_buffer.set_text(token.text.as_str()); - self.subscribe(term_index, doc_id, token.position as u32, term_buffer, heap); + self.subscribe(doc_id, token.position as u32, term_buffer, indexing_context); } else { warn!( "A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \ @@ -261,7 +176,7 @@ impl SpecializedPostingsWriter { } } - /// Builds a `SpecializedPostingsWriter` storing its data in a heap. + /// Builds a `SpecializedPostingsWriter` storing its data in a memory arena. pub fn new_boxed() -> Box { Box::new(SpecializedPostingsWriter::::new()) } @@ -270,27 +185,30 @@ impl SpecializedPostingsWriter { impl PostingsWriter for SpecializedPostingsWriter { fn subscribe( &mut self, - term_index: &mut TermHashMap, doc: DocId, position: u32, term: &Term, - heap: &mut MemoryArena, + indexing_context: &mut IndexingContext, ) -> UnorderedTermId { debug_assert!(term.as_slice().len() >= 4); self.total_num_tokens += 1; + let (term_index, arena) = ( + &mut indexing_context.term_index, + &mut indexing_context.arena, + ); term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option| { if let Some(mut recorder) = opt_recorder { let current_doc = recorder.current_doc(); if current_doc != doc { - recorder.close_doc(heap); - recorder.new_doc(doc, heap); + recorder.close_doc(arena); + recorder.new_doc(doc, arena); } - recorder.record_position(position, heap); + recorder.record_position(position, arena); recorder } else { let mut recorder = Rec::new(); - recorder.new_doc(doc, heap); - recorder.record_position(position, heap); + recorder.new_doc(doc, arena); + recorder.record_position(position, arena); recorder } }) as UnorderedTermId @@ -299,17 +217,21 @@ impl PostingsWriter for SpecializedPostingsWriter fn serialize( &self, term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)], - serializer: &mut FieldSerializer<'_>, - termdict_heap: &MemoryArena, - heap: &MemoryArena, doc_id_map: Option<&DocIdMapping>, + indexing_context: &IndexingContext, + serializer: &mut FieldSerializer, ) -> io::Result<()> { let mut buffer_lender = BufferLender::default(); for (term, addr, _) in term_addrs { - let recorder: Rec = termdict_heap.read(*addr); + let recorder: Rec = indexing_context.term_index.read(*addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); serializer.new_term(term.value_bytes(), term_doc_freq)?; - recorder.serialize(&mut buffer_lender, serializer, heap, doc_id_map); + recorder.serialize( + &indexing_context.arena, + doc_id_map, + serializer, + &mut buffer_lender, + ); serializer.close_term()?; } Ok(()) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index c2c12c560..a8f1f6575 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -63,19 +63,19 @@ pub(crate) trait Recorder: Copy + 'static { fn current_doc(&self) -> u32; /// Starts recording information about a new document /// This method shall only be called if the term is within the document. - fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena); + fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena); /// Record the position of a term. For each document, /// this method will be called `term_freq` times. - fn record_position(&mut self, position: u32, heap: &mut MemoryArena); + fn record_position(&mut self, position: u32, arena: &mut MemoryArena); /// Close the document. It will help record the term frequency. - fn close_doc(&mut self, heap: &mut MemoryArena); + fn close_doc(&mut self, arena: &mut MemoryArena); /// Pushes the postings information to the serializer. fn serialize( &self, - buffer_lender: &mut BufferLender, - serializer: &mut FieldSerializer<'_>, - heap: &MemoryArena, + arena: &MemoryArena, doc_id_map: Option<&DocIdMapping>, + serializer: &mut FieldSerializer<'_>, + buffer_lender: &mut BufferLender, ); /// Returns the number of document containing this term. /// @@ -102,24 +102,24 @@ impl Recorder for NothingRecorder { self.current_doc } - fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) { + fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { self.current_doc = doc; - let _ = write_u32_vint(doc, &mut self.stack.writer(heap)); + let _ = write_u32_vint(doc, &mut self.stack.writer(arena)); } - fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {} + fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) {} - fn close_doc(&mut self, _heap: &mut MemoryArena) {} + fn close_doc(&mut self, _arena: &mut MemoryArena) {} fn serialize( &self, - buffer_lender: &mut BufferLender, - serializer: &mut FieldSerializer<'_>, - heap: &MemoryArena, + arena: &MemoryArena, doc_id_map: Option<&DocIdMapping>, + serializer: &mut FieldSerializer<'_>, + buffer_lender: &mut BufferLender, ) { let (buffer, doc_ids) = buffer_lender.lend_all(); - self.stack.read_to_end(heap, buffer); + self.stack.read_to_end(arena, buffer); // TODO avoid reading twice. if let Some(doc_id_map) = doc_id_map { doc_ids.extend( @@ -166,31 +166,31 @@ impl Recorder for TermFrequencyRecorder { self.current_doc } - fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) { + fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { self.term_doc_freq += 1; self.current_doc = doc; - let _ = write_u32_vint(doc, &mut self.stack.writer(heap)); + let _ = write_u32_vint(doc, &mut self.stack.writer(arena)); } - fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) { + fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) { self.current_tf += 1; } - fn close_doc(&mut self, heap: &mut MemoryArena) { + fn close_doc(&mut self, arena: &mut MemoryArena) { debug_assert!(self.current_tf > 0); - let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(heap)); + let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(arena)); self.current_tf = 0; } fn serialize( &self, - buffer_lender: &mut BufferLender, - serializer: &mut FieldSerializer<'_>, - heap: &MemoryArena, + arena: &MemoryArena, doc_id_map: Option<&DocIdMapping>, + serializer: &mut FieldSerializer<'_>, + buffer_lender: &mut BufferLender, ) { let buffer = buffer_lender.lend_u8(); - self.stack.read_to_end(heap, buffer); + self.stack.read_to_end(arena, buffer); let mut u32_it = VInt32Reader::new(&buffer[..]); if let Some(doc_id_map) = doc_id_map { let mut doc_id_and_tf = vec![]; @@ -236,29 +236,29 @@ impl Recorder for TfAndPositionRecorder { self.current_doc } - fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) { + fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { self.current_doc = doc; self.term_doc_freq += 1u32; - let _ = write_u32_vint(doc, &mut self.stack.writer(heap)); + let _ = write_u32_vint(doc, &mut self.stack.writer(arena)); } - fn record_position(&mut self, position: u32, heap: &mut MemoryArena) { - let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(heap)); + fn record_position(&mut self, position: u32, arena: &mut MemoryArena) { + let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(arena)); } - fn close_doc(&mut self, heap: &mut MemoryArena) { - let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(heap)); + fn close_doc(&mut self, arena: &mut MemoryArena) { + let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(arena)); } fn serialize( &self, - buffer_lender: &mut BufferLender, - serializer: &mut FieldSerializer<'_>, - heap: &MemoryArena, + arena: &MemoryArena, doc_id_map: Option<&DocIdMapping>, + serializer: &mut FieldSerializer<'_>, + buffer_lender: &mut BufferLender, ) { let (buffer_u8, buffer_positions) = buffer_lender.lend_all(); - self.stack.read_to_end(heap, buffer_u8); + self.stack.read_to_end(arena, buffer_u8); let mut u32_it = VInt32Reader::new(&buffer_u8[..]); let mut doc_id_and_positions = vec![]; while let Some(doc) = u32_it.next() { diff --git a/src/postings/stacker/expull.rs b/src/postings/stacker/expull.rs index f0e8c29e4..56134641b 100644 --- a/src/postings/stacker/expull.rs +++ b/src/postings/stacker/expull.rs @@ -65,12 +65,12 @@ pub struct ExpUnrolledLinkedList { pub struct ExpUnrolledLinkedListWriter<'a> { eull: &'a mut ExpUnrolledLinkedList, - heap: &'a mut MemoryArena, + arena: &'a mut MemoryArena, } fn ensure_capacity<'a>( eull: &'a mut ExpUnrolledLinkedList, - heap: &'a mut MemoryArena, + arena: &'a mut MemoryArena, ) -> &'a mut [u8] { if eull.len <= FIRST_BLOCK as u32 { // We are still hitting the inline block. @@ -78,22 +78,22 @@ fn ensure_capacity<'a>( return &mut eull.inlined_data[eull.len as usize..FIRST_BLOCK]; } // We need to allocate a new block! - let new_block_addr: Addr = heap.allocate_space(FIRST_BLOCK + mem::size_of::()); + let new_block_addr: Addr = arena.allocate_space(FIRST_BLOCK + mem::size_of::()); store(&mut eull.inlined_data[FIRST_BLOCK..], new_block_addr); eull.tail = new_block_addr; - return heap.slice_mut(eull.tail, FIRST_BLOCK); + return arena.slice_mut(eull.tail, FIRST_BLOCK); } let len = match len_to_capacity(eull.len) { CapacityResult::NeedAlloc(new_block_len) => { let new_block_addr: Addr = - heap.allocate_space(new_block_len as usize + mem::size_of::()); - heap.write_at(eull.tail, new_block_addr); + arena.allocate_space(new_block_len as usize + mem::size_of::()); + arena.write_at(eull.tail, new_block_addr); eull.tail = new_block_addr; new_block_len } CapacityResult::Available(available) => available, }; - heap.slice_mut(eull.tail, len as usize) + arena.slice_mut(eull.tail, len as usize) } impl<'a> ExpUnrolledLinkedListWriter<'a> { @@ -106,7 +106,7 @@ impl<'a> ExpUnrolledLinkedListWriter<'a> { while !buf.is_empty() { let add_len: usize; { - let output_buf = ensure_capacity(self.eull, self.heap); + let output_buf = ensure_capacity(self.eull, self.arena); add_len = buf.len().min(output_buf.len()); output_buf[..add_len].copy_from_slice(&buf[..add_len]); } @@ -146,11 +146,11 @@ impl ExpUnrolledLinkedList { } #[inline] - pub fn writer<'a>(&'a mut self, heap: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> { - ExpUnrolledLinkedListWriter { eull: self, heap } + pub fn writer<'a>(&'a mut self, arena: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> { + ExpUnrolledLinkedListWriter { eull: self, arena } } - pub fn read_to_end(&self, heap: &MemoryArena, output: &mut Vec) { + pub fn read_to_end(&self, arena: &MemoryArena, output: &mut Vec) { let len = self.len as usize; if len <= FIRST_BLOCK { output.extend_from_slice(&self.inlined_data[..len]); @@ -164,14 +164,14 @@ impl ExpUnrolledLinkedList { CapacityResult::Available(capacity) => capacity, CapacityResult::NeedAlloc(capacity) => capacity, } as usize; - let data = heap.slice(addr, cap); + let data = arena.slice(addr, cap); if cur + cap >= len { output.extend_from_slice(&data[..(len - cur)]); return; } output.extend_from_slice(data); cur += cap; - addr = heap.read(addr.offset(cap as u32)); + addr = arena.read(addr.offset(cap as u32)); } } } @@ -185,33 +185,33 @@ mod tests { use super::{len_to_capacity, *}; #[test] - fn test_stack() { - let mut heap = MemoryArena::new(); + fn test_eull() { + let mut arena = MemoryArena::new(); let mut stack = ExpUnrolledLinkedList::new(); - stack.writer(&mut heap).extend_from_slice(&[1u8]); - stack.writer(&mut heap).extend_from_slice(&[2u8]); - stack.writer(&mut heap).extend_from_slice(&[3u8, 4u8]); - stack.writer(&mut heap).extend_from_slice(&[5u8]); + stack.writer(&mut arena).extend_from_slice(&[1u8]); + stack.writer(&mut arena).extend_from_slice(&[2u8]); + stack.writer(&mut arena).extend_from_slice(&[3u8, 4u8]); + stack.writer(&mut arena).extend_from_slice(&[5u8]); { let mut buffer = Vec::new(); - stack.read_to_end(&heap, &mut buffer); + stack.read_to_end(&arena, &mut buffer); assert_eq!(&buffer[..], &[1u8, 2u8, 3u8, 4u8, 5u8]); } } #[test] - fn test_stack_long() { - let mut heap = MemoryArena::new(); - let mut stack = ExpUnrolledLinkedList::new(); + fn test_eull_long() { + let mut arena = MemoryArena::new(); + let mut eull = ExpUnrolledLinkedList::new(); let data: Vec = (0..100).collect(); for &el in &data { - assert!(stack - .writer(&mut heap) + assert!(eull + .writer(&mut arena) .write_u32::(el) .is_ok()); } let mut buffer = Vec::new(); - stack.read_to_end(&heap, &mut buffer); + eull.read_to_end(&arena, &mut buffer); let mut result = vec![]; let mut remaining = &buffer[..]; while !remaining.is_empty() { @@ -222,8 +222,8 @@ mod tests { } #[test] - fn test_stack_interlaced() { - let mut heap = MemoryArena::new(); + fn test_eull_interlaced() { + let mut eull = MemoryArena::new(); let mut stack = ExpUnrolledLinkedList::new(); let mut stack2 = ExpUnrolledLinkedList::new(); @@ -231,11 +231,11 @@ mod tests { let mut vec2: Vec = vec![]; for i in 0..9 { - assert!(stack.writer(&mut heap).write_u32::(i).is_ok()); + assert!(stack.writer(&mut eull).write_u32::(i).is_ok()); assert!(vec1.write_u32::(i).is_ok()); if i % 2 == 0 { assert!(stack2 - .writer(&mut heap) + .writer(&mut eull) .write_u32::(i) .is_ok()); assert!(vec2.write_u32::(i).is_ok()); @@ -243,8 +243,8 @@ mod tests { } let mut res1 = vec![]; let mut res2 = vec![]; - stack.read_to_end(&heap, &mut res1); - stack2.read_to_end(&heap, &mut res2); + stack.read_to_end(&eull, &mut res1); + stack2.read_to_end(&eull, &mut res2); assert_eq!(&vec1[..], &res1[..]); assert_eq!(&vec2[..], &res2[..]); } @@ -331,7 +331,7 @@ mod bench { #[bench] fn bench_push_stack(bench: &mut Bencher) { bench.iter(|| { - let mut heap = MemoryArena::new(); + let mut arena = MemoryArena::new(); let mut stacks: Vec = iter::repeat_with(ExpUnrolledLinkedList::new) .take(NUM_STACK) @@ -339,7 +339,7 @@ mod bench { for s in 0..NUM_STACK { for i in 0u32..STACK_SIZE { let t = s * 392017 % NUM_STACK; - let _ = stacks[t].writer(&mut heap).write_u32::(i); + let _ = stacks[t].writer(&mut arena).write_u32::(i); } } }); diff --git a/src/postings/stacker/memory_arena.rs b/src/postings/stacker/memory_arena.rs index 94d24e5dd..1a468da18 100644 --- a/src/postings/stacker/memory_arena.rs +++ b/src/postings/stacker/memory_arena.rs @@ -81,6 +81,7 @@ pub fn load(data: &[u8]) -> Item { } /// The `MemoryArena` +#[allow(clippy::new_without_default)] pub struct MemoryArena { pages: Vec, } @@ -114,7 +115,7 @@ impl MemoryArena { store(dest, val); } - /// Read an item in the heap at the given `address`. + /// Read an item in the memory arena at the given `address`. /// /// # Panics /// diff --git a/src/postings/stacker/mod.rs b/src/postings/stacker/mod.rs index be80510d7..19fff9216 100644 --- a/src/postings/stacker/mod.rs +++ b/src/postings/stacker/mod.rs @@ -2,6 +2,6 @@ mod expull; mod memory_arena; mod term_hashmap; -pub use self::expull::ExpUnrolledLinkedList; -pub use self::memory_arena::{Addr, MemoryArena}; -pub use self::term_hashmap::{compute_table_size, TermHashMap}; +pub(crate) use self::expull::ExpUnrolledLinkedList; +pub(crate) use self::memory_arena::{Addr, MemoryArena}; +pub(crate) use self::term_hashmap::{compute_table_size, TermHashMap}; diff --git a/src/postings/stacker/term_hashmap.rs b/src/postings/stacker/term_hashmap.rs index 70cd24a2b..bf12cf695 100644 --- a/src/postings/stacker/term_hashmap.rs +++ b/src/postings/stacker/term_hashmap.rs @@ -9,14 +9,15 @@ use crate::postings::UnorderedTermId; use crate::Term; /// Returns the actual memory size in bytes -/// required to create a table of size $2^num_bits$. -pub fn compute_table_size(num_bits: usize) -> usize { - (1 << num_bits) * mem::size_of::() +/// required to create a table with a given capacity. +/// required to create a table of size +pub(crate) fn compute_table_size(capacity: usize) -> usize { + capacity * mem::size_of::() } /// `KeyValue` is the item stored in the hash table. -/// The key is actually a `BytesRef` object stored in an external heap. -/// The `value_addr` also points to an address in the heap. +/// The key is actually a `BytesRef` object stored in an external memory arena. +/// The `value_addr` also points to an address in the memory arena. #[derive(Copy, Clone)] struct KeyValue { key_value_addr: Addr, @@ -43,14 +44,14 @@ impl KeyValue { /// Customized `HashMap` with string keys /// /// This `HashMap` takes String as keys. Keys are -/// stored in a user defined heap. +/// stored in a user defined memory arena. /// /// The quirky API has the benefit of avoiding /// the computation of the hash of the key twice, /// or copying the key as long as there is no insert. pub struct TermHashMap { table: Box<[KeyValue]>, - pub heap: MemoryArena, + memory_arena: MemoryArena, mask: usize, occupied: Vec, len: usize, @@ -91,20 +92,37 @@ impl<'a> Iterator for Iter<'a> { } } +/// Returns the greatest power of two lower or equal to `n`. +/// Except if n == 0, in that case, return 1. +/// +/// # Panics if n == 0 +fn compute_previous_power_of_two(n: usize) -> usize { + assert!(n > 0); + let msb = (63u32 - n.leading_zeros()) as u8; + 1 << msb +} + impl TermHashMap { - pub fn new(num_bucket_power_of_2: usize) -> TermHashMap { - let heap = MemoryArena::new(); - let table_size = 1 << num_bucket_power_of_2; - let table: Vec = iter::repeat(KeyValue::default()).take(table_size).collect(); + pub(crate) fn new(table_size: usize) -> TermHashMap { + assert!(table_size > 0); + let table_size_power_of_2 = compute_previous_power_of_two(table_size); + let memory_arena = MemoryArena::new(); + let table: Vec = iter::repeat(KeyValue::default()) + .take(table_size_power_of_2) + .collect(); TermHashMap { table: table.into_boxed_slice(), - heap, - mask: table_size - 1, - occupied: Vec::with_capacity(table_size / 2), + memory_arena, + mask: table_size_power_of_2 - 1, + occupied: Vec::with_capacity(table_size_power_of_2 / 2), len: 0, } } + pub fn read(&self, addr: Addr) -> Item { + self.memory_arena.read(addr) + } + fn probe(&self, hash: u32) -> QuadraticProbing { QuadraticProbing::compute(hash as usize, self.mask) } @@ -119,7 +137,7 @@ impl TermHashMap { #[inline] fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) { - let data = self.heap.slice_from(addr); + let data = self.memory_arena.slice_from(addr); let key_bytes_len = NativeEndian::read_u16(data) as usize; let key_bytes: &[u8] = &data[2..][..key_bytes_len]; (key_bytes, addr.offset(2u32 + key_bytes_len as u32)) @@ -209,9 +227,9 @@ impl TermHashMap { // The key does not exists yet. let val = updater(None); let num_bytes = std::mem::size_of::() + key.len() + std::mem::size_of::(); - let key_addr = self.heap.allocate_space(num_bytes); + let key_addr = self.memory_arena.allocate_space(num_bytes); { - let data = self.heap.slice_mut(key_addr, num_bytes); + let data = self.memory_arena.slice_mut(key_addr, num_bytes); NativeEndian::write_u16(data, key.len() as u16); let stop = 2 + key.len(); data[2..stop].copy_from_slice(key); @@ -220,9 +238,9 @@ impl TermHashMap { return self.set_bucket(hash, key_addr, bucket); } else if kv.hash == hash { if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) { - let v = self.heap.read(val_addr); + let v = self.memory_arena.read(val_addr); let new_v = updater(Some(v)); - self.heap.write_at(val_addr, new_v); + self.memory_arena.write_at(val_addr, new_v); return kv.unordered_term_id; } } @@ -235,11 +253,11 @@ mod tests { use std::collections::HashMap; - use super::TermHashMap; + use super::{compute_previous_power_of_two, TermHashMap}; #[test] fn test_hash_map() { - let mut hash_map: TermHashMap = TermHashMap::new(18); + let mut hash_map: TermHashMap = TermHashMap::new(1 << 18); hash_map.mutate_or_create(b"abc", |opt_val: Option| { assert_eq!(opt_val, None); 3u32 @@ -255,9 +273,17 @@ mod tests { let mut vanilla_hash_map = HashMap::new(); let iter_values = hash_map.iter(); for (key, addr, _) in iter_values { - let val: u32 = hash_map.heap.read(addr); + let val: u32 = hash_map.memory_arena.read(addr); vanilla_hash_map.insert(key.to_owned(), val); } assert_eq!(vanilla_hash_map.len(), 2); } + + #[test] + fn test_compute_previous_power_of_two() { + assert_eq!(compute_previous_power_of_two(8), 8); + assert_eq!(compute_previous_power_of_two(9), 8); + assert_eq!(compute_previous_power_of_two(7), 4); + assert_eq!(compute_previous_power_of_two(u64::MAX as usize), 1 << 63); + } } diff --git a/src/schema/document.rs b/src/schema/document.rs index e7e41225a..2833e27d7 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -26,7 +26,6 @@ impl From> for Document { Document { field_values } } } - impl PartialEq for Document { fn eq(&self, other: &Document) -> bool { // super slow, but only here for tests @@ -51,6 +50,16 @@ impl PartialEq for Document { impl Eq for Document {} +impl IntoIterator for Document { + type Item = FieldValue; + + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.field_values.into_iter() + } +} + impl Document { /// Creates a new, empty document object pub fn new() -> Document { @@ -67,63 +76,54 @@ impl Document { self.field_values.is_empty() } - /// Retain only the field that are matching the - /// predicate given in argument. - pub fn filter_fields bool>(&mut self, predicate: P) { - self.field_values - .retain(|field_value| predicate(field_value.field())); - } - /// Adding a facet to the document. pub fn add_facet(&mut self, field: Field, path: F) where Facet: From { let facet = Facet::from(path); let value = Value::Facet(facet); - self.add(FieldValue::new(field, value)); + self.add_field_value(field, value); } /// Add a text field. pub fn add_text(&mut self, field: Field, text: S) { - self.add(FieldValue::new(field, Value::Str(text.to_string()))); + let value = Value::Str(text.to_string()); + self.add_field_value(field, value); } /// Add a pre-tokenized text field. - pub fn add_pre_tokenized_text( - &mut self, - field: Field, - pre_tokenized_text: &PreTokenizedString, - ) { - let value = Value::PreTokStr(pre_tokenized_text.clone()); - self.add(FieldValue::new(field, value)); + pub fn add_pre_tokenized_text(&mut self, field: Field, pre_tokenized_text: PreTokenizedString) { + self.add_field_value(field, pre_tokenized_text); } /// Add a u64 field pub fn add_u64(&mut self, field: Field, value: u64) { - self.add(FieldValue::new(field, Value::U64(value))); + self.add_field_value(field, value); } /// Add a i64 field pub fn add_i64(&mut self, field: Field, value: i64) { - self.add(FieldValue::new(field, Value::I64(value))); + self.add_field_value(field, value); } /// Add a f64 field pub fn add_f64(&mut self, field: Field, value: f64) { - self.add(FieldValue::new(field, Value::F64(value))); + self.add_field_value(field, value); } /// Add a date field - pub fn add_date(&mut self, field: Field, value: &DateTime) { - self.add(FieldValue::new(field, Value::Date(*value))); + pub fn add_date(&mut self, field: Field, value: DateTime) { + self.add_field_value(field, value); } /// Add a bytes field pub fn add_bytes>>(&mut self, field: Field, value: T) { - self.add(FieldValue::new(field, Value::Bytes(value.into()))) + self.add_field_value(field, value.into()) } - /// Add a field value - pub fn add(&mut self, field_value: FieldValue) { + /// Add a (field, value) to the document. + pub fn add_field_value>(&mut self, field: Field, typed_val: T) { + let value = typed_val.into(); + let field_value = FieldValue { field, value }; self.field_values.push(field_value); } @@ -180,21 +180,6 @@ impl Document { pub fn get_first(&self, field: Field) -> Option<&Value> { self.get_all(field).next() } - - /// Prepares Document for being stored in the document store - /// - /// Method transforms PreTokenizedString values into String - /// values. - pub fn prepare_for_store(&mut self) { - for field_value in &mut self.field_values { - if let Value::PreTokStr(pre_tokenized_text) = field_value.value() { - *field_value = FieldValue::new( - field_value.field(), - Value::Str(pre_tokenized_text.text.clone()), //< TODO somehow remove .clone() - ); - } - } - } } impl BinarySerializable for Document { @@ -220,7 +205,6 @@ impl BinarySerializable for Document { mod tests { use crate::schema::*; - use crate::tokenizer::{PreTokenizedString, Token}; #[test] fn test_doc() { @@ -230,38 +214,4 @@ mod tests { doc.add_text(text_field, "My title"); assert_eq!(doc.field_values().len(), 1); } - - #[test] - fn test_prepare_for_store() { - let mut schema_builder = Schema::builder(); - let text_field = schema_builder.add_text_field("title", TEXT); - let mut doc = Document::default(); - - let pre_tokenized_text = PreTokenizedString { - text: String::from("A"), - tokens: vec![Token { - offset_from: 0, - offset_to: 1, - position: 0, - text: String::from("A"), - position_length: 1, - }], - }; - - doc.add_pre_tokenized_text(text_field, &pre_tokenized_text); - doc.add_text(text_field, "title"); - doc.prepare_for_store(); - - assert_eq!(doc.field_values().len(), 2); - - match doc.field_values()[0].value() { - Value::Str(ref text) => assert_eq!(text, "A"), - _ => panic!("Incorrect variant of Value"), - } - - match doc.field_values()[1].value() { - Value::Str(ref text) => assert_eq!(text, "title"), - _ => panic!("Incorrect variant of Value"), - } - } } diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index bd6c8fa9d..6b995773a 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -303,7 +303,7 @@ mod tests { let naive_date = NaiveDate::from_ymd(1982, 9, 17); let naive_time = NaiveTime::from_hms(13, 20, 00); let date_time = DateTime::from_utc(NaiveDateTime::new(naive_date, naive_time), Utc); - doc.add_date(date_field, &date_time); + doc.add_date(date_field, date_time); let doc_json = schema.to_json(&doc); assert_eq!(doc_json, r#"{"date":["1982-09-17T13:20:00+00:00"]}"#); } diff --git a/src/schema/field_value.rs b/src/schema/field_value.rs index b3ac6fd6a..5d3199f1b 100644 --- a/src/schema/field_value.rs +++ b/src/schema/field_value.rs @@ -5,10 +5,11 @@ use common::BinarySerializable; use crate::schema::{Field, Value}; /// `FieldValue` holds together a `Field` and its `Value`. +#[allow(missing_docs)] #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct FieldValue { - field: Field, - value: Value, + pub field: Field, + pub value: Value, } impl FieldValue { @@ -28,6 +29,12 @@ impl FieldValue { } } +impl From for Value { + fn from(field_value: FieldValue) -> Self { + field_value.value + } +} + impl BinarySerializable for FieldValue { fn serialize(&self, writer: &mut W) -> io::Result<()> { self.field.serialize(writer)?; @@ -37,6 +44,6 @@ impl BinarySerializable for FieldValue { fn deserialize(reader: &mut R) -> io::Result { let field = Field::deserialize(reader)?; let value = Value::deserialize(reader)?; - Ok(FieldValue::new(field, value)) + Ok(FieldValue { field, value }) } } diff --git a/src/schema/schema.rs b/src/schema/schema.rs index decc6ae00..4f62d57e6 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -271,8 +271,7 @@ impl Schema { for (field_name, values) in named_doc.0 { if let Some(field) = self.get_field(&field_name) { for value in values { - let field_value = FieldValue::new(field, value); - document.add(field_value); + document.add_field_value(field, value); } } } @@ -320,14 +319,14 @@ impl Schema { let value = field_type .value_from_json(json_item) .map_err(|e| DocParsingError::ValueError(field_name.clone(), e))?; - doc.add(FieldValue::new(field, value)); + doc.add_field_value(field, value); } } _ => { let value = field_type .value_from_json(json_value) .map_err(|e| DocParsingError::ValueError(field_name.clone(), e))?; - doc.add(FieldValue::new(field, value)); + doc.add_field_value(field, value); } } } diff --git a/src/store/mod.rs b/src/store/mod.rs index 7a9c48634..ed142ab3c 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -60,9 +60,7 @@ pub mod tests { use super::*; use crate::directory::{Directory, RamDirectory, WritePtr}; use crate::fastfield::AliveBitSet; - use crate::schema::{ - self, Document, FieldValue, Schema, TextFieldIndexing, TextOptions, STORED, TEXT, - }; + use crate::schema::{self, Document, Schema, TextFieldIndexing, TextOptions, STORED, TEXT}; use crate::{Index, Term}; const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \ @@ -86,18 +84,9 @@ pub mod tests { { let mut store_writer = StoreWriter::new(writer, compressor); for i in 0..num_docs { - let mut fields: Vec = Vec::new(); - { - let field_value = FieldValue::new(field_body, From::from(LOREM.to_string())); - fields.push(field_value); - } - { - let title_text = format!("Doc {}", i); - let field_value = FieldValue::new(field_title, From::from(title_text)); - fields.push(field_value); - } - // let fields_refs: Vec<&FieldValue> = fields.iter().collect(); - let doc = Document::from(fields); + let mut doc = Document::default(); + doc.add_field_value(field_body, LOREM.to_string()); + doc.add_field_value(field_title, format!("Doc {i}")); store_writer.store(&doc).unwrap(); } store_writer.close().unwrap();