From 2d1beeb6bee972e418ad2def2e002b613873487e Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Sun, 1 May 2022 13:13:09 +0800 Subject: [PATCH] term hashmap per field, smaller page size --- src/indexer/segment_writer.rs | 1 + src/postings/indexing_context.rs | 9 +- src/postings/json_postings_writer.rs | 12 +++ src/postings/per_field_postings_writer.rs | 15 ++- src/postings/postings_writer.rs | 110 ++++++++++------------ src/postings/stacker/memory_arena.rs | 11 ++- src/postings/stacker/term_hashmap.rs | 7 ++ src/schema/schema.rs | 12 ++- src/schema/term.rs | 21 +++-- 9 files changed, 119 insertions(+), 79 deletions(-) diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 537b4f502..ebdd3750a 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -149,6 +149,7 @@ impl SegmentWriter { pub fn mem_usage(&self) -> usize { self.ctx.mem_usage() + self.fieldnorms_writer.mem_usage() + + self.per_field_postings_writers.mem_usage() + self.fast_field_writers.mem_usage() + self.segment_serializer.mem_usage() } diff --git a/src/postings/indexing_context.rs b/src/postings/indexing_context.rs index 9fc8a922f..3f80f78d6 100644 --- a/src/postings/indexing_context.rs +++ b/src/postings/indexing_context.rs @@ -1,11 +1,8 @@ -use crate::postings::stacker::{MemoryArena, TermHashMap}; +use crate::postings::stacker::MemoryArena; /// IndexingContext contains all of the transient memory arenas /// required for building the inverted index. pub(crate) struct IndexingContext { - /// The term index is an adhoc hashmap, - /// itself backed by a dedicated memory arena. - pub term_index: TermHashMap, /// Arena is a memory arena that stores posting lists / term frequencies / positions. pub arena: MemoryArena, } @@ -13,15 +10,13 @@ pub(crate) struct IndexingContext { impl IndexingContext { /// Create a new IndexingContext given the size of the term hash map. pub(crate) fn new(table_size: usize) -> IndexingContext { - let term_index = TermHashMap::new(table_size); IndexingContext { arena: MemoryArena::new(), - term_index, } } /// Returns the memory usage for the inverted index memory arenas, in bytes. pub(crate) fn mem_usage(&self) -> usize { - self.term_index.mem_usage() + self.arena.mem_usage() + self.arena.mem_usage() } } diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index c5f495c75..204fc586d 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -13,6 +13,8 @@ use crate::schema::Type; use crate::tokenizer::TokenStream; use crate::{DocId, Term}; +use super::stacker::TermHashMap; + #[derive(Default)] pub(crate) struct JsonPostingsWriter { str_posting_writer: SpecializedPostingsWriter, @@ -26,6 +28,14 @@ impl From> for Box { } impl PostingsWriter for JsonPostingsWriter { + fn mem_usage(&self) -> usize { + self.str_posting_writer.mem_usage() + self.non_str_posting_writer.mem_usage() + } + + fn term_map(&self) -> &TermHashMap { + self.str_posting_writer.term_map() + } + fn subscribe( &mut self, doc: crate::DocId, @@ -74,6 +84,7 @@ impl PostingsWriter for JsonPostingsWriter { doc_id_map, &mut buffer_lender, ctx, + &self.str_posting_writer.term_map, serializer, )?; } else { @@ -83,6 +94,7 @@ impl PostingsWriter for JsonPostingsWriter { doc_id_map, &mut buffer_lender, ctx, + &self.str_posting_writer.term_map, serializer, )?; } diff --git a/src/postings/per_field_postings_writer.rs b/src/postings/per_field_postings_writer.rs index 04966ab42..5c42f6f9c 100644 --- a/src/postings/per_field_postings_writer.rs +++ b/src/postings/per_field_postings_writer.rs @@ -10,9 +10,10 @@ pub(crate) struct PerFieldPostingsWriter { impl PerFieldPostingsWriter { pub fn for_schema(schema: &Schema) -> Self { + let num_fields = schema.num_fields(); let per_field_postings_writers = schema .fields() - .map(|(_, field_entry)| posting_writer_from_field_entry(field_entry)) + .map(|(_, field_entry)| posting_writer_from_field_entry(field_entry, num_fields)) .collect(); PerFieldPostingsWriter { per_field_postings_writers, @@ -26,9 +27,19 @@ impl PerFieldPostingsWriter { pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter { self.per_field_postings_writers[field.field_id() as usize].as_mut() } + + pub(crate) fn mem_usage(&self) -> usize { + self.per_field_postings_writers + .iter() + .map(|postings_writer| postings_writer.mem_usage()) + .sum() + } } -fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box { +fn posting_writer_from_field_entry( + field_entry: &FieldEntry, + _num_fields: usize, +) -> Box { match *field_entry.field_type() { FieldType::Str(ref text_options) => text_options .get_indexing_options() diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index debd03208..42b813f7f 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -5,7 +5,7 @@ use std::ops::Range; use fnv::FnvHashMap; -use super::stacker::Addr; +use super::stacker::{Addr, TermHashMap}; use crate::fastfield::MultiValuedFastFieldWriter; use crate::fieldnorm::FieldNormReaders; use crate::indexer::doc_id_mapping::DocIdMapping; @@ -21,31 +21,6 @@ use crate::DocId; const POSITION_GAP: u32 = 1; -fn make_field_partition( - term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)], -) -> Vec<(Field, Range)> { - let term_offsets_it = term_offsets - .iter() - .map(|(term, _, _)| term.field()) - .enumerate(); - let mut prev_field_opt = None; - let mut fields = vec![]; - let mut offsets = vec![]; - for (offset, field) in term_offsets_it { - if Some(field) != prev_field_opt { - prev_field_opt = Some(field); - fields.push(field); - offsets.push(offset); - } - } - offsets.push(term_offsets.len()); - let mut field_offsets = vec![]; - for i in 0..fields.len() { - field_offsets.push((fields[i], offsets[i]..offsets[i + 1])); - } - field_offsets -} - /// Serialize the inverted index. /// It pushes all term, one field at a time, towards the /// postings serializer. @@ -57,23 +32,23 @@ pub(crate) fn serialize_postings( schema: &Schema, serializer: &mut InvertedIndexSerializer, ) -> crate::Result>> { - let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> = - Vec::with_capacity(ctx.term_index.len()); - term_offsets.extend(ctx.term_index.iter()); - term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone()); let mut unordered_term_mappings: HashMap> = HashMap::new(); - let field_offsets = make_field_partition(&term_offsets); - for (field, byte_offsets) in field_offsets { + for (field, _) in schema.fields() { + let postings_writer = per_field_postings_writers.get_for_field(field); + + let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> = + Vec::with_capacity(postings_writer.term_map().len()); + term_offsets.extend(postings_writer.term_map().iter()); + term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone()); + let field_entry = schema.get_field_entry(field); match *field_entry.field_type() { FieldType::Str(_) | FieldType::Facet(_) => { // populating the (unordered term ord) -> (ordered term ord) mapping // for the field. - let unordered_term_ids = term_offsets[byte_offsets.clone()] - .iter() - .map(|&(_, _, bucket)| bucket); + let unordered_term_ids = term_offsets.iter().map(|&(_, _, bucket)| bucket); let mapping: FnvHashMap = unordered_term_ids .enumerate() .map(|(term_ord, unord_term_id)| { @@ -87,16 +62,10 @@ pub(crate) fn serialize_postings( FieldType::JsonObject(_) => {} } - let postings_writer = per_field_postings_writers.get_for_field(field); let fieldnorm_reader = fieldnorm_readers.get_field(field)?; let mut field_serializer = serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?; - postings_writer.serialize( - &term_offsets[byte_offsets], - doc_id_map, - &ctx, - &mut field_serializer, - )?; + postings_writer.serialize(&term_offsets, doc_id_map, &ctx, &mut field_serializer)?; field_serializer.close()?; } Ok(unordered_term_mappings) @@ -128,6 +97,10 @@ pub(crate) trait PostingsWriter { ctx: &mut IndexingContext, ) -> UnorderedTermId; + fn mem_usage(&self) -> usize; + + fn term_map(&self) -> &TermHashMap; + /// Serializes the postings on disk. /// The actual serialization format is handled by the `PostingsSerializer`. fn serialize( @@ -188,6 +161,7 @@ pub(crate) trait PostingsWriter { pub(crate) struct SpecializedPostingsWriter { total_num_tokens: u64, _recorder_type: PhantomData, + pub(crate) term_map: TermHashMap, } impl From> for Box { @@ -206,9 +180,10 @@ impl SpecializedPostingsWriter { doc_id_map: Option<&DocIdMapping>, buffer_lender: &mut BufferLender, ctx: &IndexingContext, + term_index: &TermHashMap, serializer: &mut FieldSerializer, ) -> io::Result<()> { - let recorder: Rec = ctx.term_index.read(addr); + let recorder: Rec = term_index.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); serializer.new_term(term.value_bytes(), term_doc_freq)?; recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender); @@ -218,6 +193,14 @@ impl SpecializedPostingsWriter { } impl PostingsWriter for SpecializedPostingsWriter { + fn mem_usage(&self) -> usize { + self.term_map.mem_usage() + } + + fn term_map(&self) -> &TermHashMap { + &self.term_map + } + fn subscribe( &mut self, doc: DocId, @@ -227,23 +210,24 @@ impl PostingsWriter for SpecializedPostingsWriter { ) -> UnorderedTermId { debug_assert!(term.as_slice().len() >= 4); self.total_num_tokens += 1; - let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena); - term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option| { - if let Some(mut recorder) = opt_recorder { - let current_doc = recorder.current_doc(); - if current_doc != doc { - recorder.close_doc(arena); + let arena = &mut ctx.arena; + self.term_map + .mutate_or_create(term.as_slice(), |opt_recorder: Option| { + if let Some(mut recorder) = opt_recorder { + let current_doc = recorder.current_doc(); + if current_doc != doc { + recorder.close_doc(arena); + recorder.new_doc(doc, arena); + } + recorder.record_position(position, arena); + recorder + } else { + let mut recorder = Rec::default(); recorder.new_doc(doc, arena); + recorder.record_position(position, arena); + recorder } - recorder.record_position(position, arena); - recorder - } else { - let mut recorder = Rec::default(); - recorder.new_doc(doc, arena); - recorder.record_position(position, arena); - recorder - } - }) as UnorderedTermId + }) as UnorderedTermId } fn serialize( @@ -255,7 +239,15 @@ impl PostingsWriter for SpecializedPostingsWriter { ) -> io::Result<()> { let mut buffer_lender = BufferLender::default(); for (term, addr, _) in term_addrs { - Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?; + Self::serialize_one_term( + term, + *addr, + doc_id_map, + &mut buffer_lender, + ctx, + &self.term_map, + serializer, + )?; } Ok(()) } diff --git a/src/postings/stacker/memory_arena.rs b/src/postings/stacker/memory_arena.rs index 1a468da18..ffc6a49fe 100644 --- a/src/postings/stacker/memory_arena.rs +++ b/src/postings/stacker/memory_arena.rs @@ -24,7 +24,7 @@ //! stores your object using `ptr::write_unaligned` and `ptr::read_unaligned`. use std::{mem, ptr}; -const NUM_BITS_PAGE_ADDR: usize = 20; +const NUM_BITS_PAGE_ADDR: usize = 17; const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large /// Represents a pointer into the `MemoryArena` @@ -46,6 +46,7 @@ impl Addr { } /// Returns the `Addr` object for `addr + offset` + #[inline] pub fn offset(self, offset: u32) -> Addr { Addr(self.0.wrapping_add(offset)) } @@ -54,20 +55,24 @@ impl Addr { Addr((page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32) } + #[inline] fn page_id(self) -> usize { (self.0 as usize) >> NUM_BITS_PAGE_ADDR } + #[inline] fn page_local_addr(self) -> usize { (self.0 as usize) & (PAGE_SIZE - 1) } /// Returns true if and only if the `Addr` is null. + #[inline] pub fn is_null(self) -> bool { self.0 == u32::max_value() } } +#[inline] pub fn store(dest: &mut [u8], val: Item) { assert_eq!(dest.len(), std::mem::size_of::()); unsafe { @@ -75,6 +80,7 @@ pub fn store(dest: &mut [u8], val: Item) { } } +#[inline] pub fn load(data: &[u8]) -> Item { assert_eq!(data.len(), std::mem::size_of::()); unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) } @@ -110,6 +116,7 @@ impl MemoryArena { self.pages.len() * PAGE_SIZE } + #[inline] pub fn write_at(&mut self, addr: Addr, val: Item) { let dest = self.slice_mut(addr, std::mem::size_of::()); store(dest, val); @@ -120,6 +127,7 @@ impl MemoryArena { /// # Panics /// /// If the address is erroneous + #[inline] pub fn read(&self, addr: Addr) -> Item { load(self.slice(addr, mem::size_of::())) } @@ -128,6 +136,7 @@ impl MemoryArena { self.pages[addr.page_id()].slice(addr.page_local_addr(), len) } + #[inline] pub fn slice_from(&self, addr: Addr) -> &[u8] { self.pages[addr.page_id()].slice_from(addr.page_local_addr()) } diff --git a/src/postings/stacker/term_hashmap.rs b/src/postings/stacker/term_hashmap.rs index bf12cf695..c2c985caa 100644 --- a/src/postings/stacker/term_hashmap.rs +++ b/src/postings/stacker/term_hashmap.rs @@ -36,6 +36,7 @@ impl Default for KeyValue { } impl KeyValue { + #[inline] fn is_empty(self) -> bool { self.key_value_addr.is_null() } @@ -57,6 +58,12 @@ pub struct TermHashMap { len: usize, } +impl Default for TermHashMap { + fn default() -> Self { + Self::new(1 << 10) + } +} + struct QuadraticProbing { hash: usize, i: usize, diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 235d0412d..f1d10a96b 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -347,7 +347,9 @@ impl Schema { impl Serialize for Schema { fn serialize(&self, serializer: S) -> Result - where S: Serializer { + where + S: Serializer, + { let mut seq = serializer.serialize_seq(Some(self.0.fields.len()))?; for e in &self.0.fields { seq.serialize_element(e)?; @@ -358,7 +360,9 @@ impl Serialize for Schema { impl<'de> Deserialize<'de> for Schema { fn deserialize(deserializer: D) -> Result - where D: Deserializer<'de> { + where + D: Deserializer<'de>, + { struct SchemaVisitor; impl<'de> Visitor<'de> for SchemaVisitor { @@ -369,7 +373,9 @@ impl<'de> Deserialize<'de> for Schema { } fn visit_seq(self, mut seq: A) -> Result - where A: SeqAccess<'de> { + where + A: SeqAccess<'de>, + { let mut schema = SchemaBuilder { fields: Vec::with_capacity(seq.size_hint().unwrap_or(0)), fields_map: HashMap::with_capacity(seq.size_hint().unwrap_or(0)), diff --git a/src/schema/term.rs b/src/schema/term.rs index 93a5806b2..aa27d5ab8 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -34,7 +34,8 @@ pub const JSON_END_OF_PATH: u8 = 0u8; /// It actually wraps a `Vec`. #[derive(Clone)] pub struct Term>(B) -where B: AsRef<[u8]>; +where + B: AsRef<[u8]>; impl AsMut> for Term { fn as_mut(&mut self) -> &mut Vec { @@ -164,7 +165,8 @@ impl Term { } impl Ord for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.as_slice().cmp(other.as_slice()) @@ -172,7 +174,8 @@ where B: AsRef<[u8]> } impl PartialOrd for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -180,7 +183,8 @@ where B: AsRef<[u8]> } impl PartialEq for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn eq(&self, other: &Self) -> bool { self.as_slice() == other.as_slice() @@ -190,7 +194,8 @@ where B: AsRef<[u8]> impl Eq for Term where B: AsRef<[u8]> {} impl Hash for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn hash(&self, state: &mut H) { self.0.as_ref().hash(state) @@ -198,7 +203,8 @@ where B: AsRef<[u8]> } impl Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { /// Wraps a object holding bytes pub fn wrap(data: B) -> Term { @@ -399,7 +405,8 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re } impl fmt::Debug for Term -where B: AsRef<[u8]> +where + B: AsRef<[u8]>, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let field_id = self.field().field_id();