Compare commits

...

4 Commits

Author SHA1 Message Date
Pascal Seitz
54fc557a6d use a single memory arena to store all terms 2022-05-03 20:11:28 +08:00
Pascal Seitz
7f5a409d6f remove term encoding in term bytes 2022-05-03 20:04:53 +08:00
Pascal Seitz
50ee43ab79 refactor Term 2022-05-03 20:04:53 +08:00
Pascal Seitz
2d1beeb6be term hashmap per field, smaller page size 2022-05-03 20:04:53 +08:00
12 changed files with 210 additions and 286 deletions

View File

@@ -57,7 +57,7 @@ struct IndexingPositionsPerPath {
impl IndexingPositionsPerPath {
fn get_position(&mut self, term: &Term) -> &mut IndexingPosition {
self.positions_per_path
.entry(murmurhash2(term.as_slice()))
.entry(murmurhash2(term.value_bytes()))
.or_insert_with(Default::default)
}
}
@@ -208,7 +208,7 @@ impl<'a> JsonTermWriter<'a> {
pub fn wrap(term_buffer: &'a mut Term) -> Self {
term_buffer.clear_with_type(Type::Json);
let mut path_stack = Vec::with_capacity(10);
path_stack.push(5);
path_stack.push(5); // magic number?
Self {
term_buffer,
path_stack,
@@ -250,8 +250,8 @@ impl<'a> JsonTermWriter<'a> {
/// Returns the json path of the term being currently built.
#[cfg(test)]
pub(crate) fn path(&self) -> &[u8] {
let end_of_path = self.path_stack.last().cloned().unwrap_or(6);
&self.term().as_slice()[5..end_of_path - 1]
let end_of_path = self.path_stack.last().cloned().unwrap_or(6); // TODO remove magic number
&self.term().value_bytes()[..end_of_path - 1]
}
pub fn set_fast_value<T: FastValue>(&mut self, val: T) {
@@ -321,10 +321,7 @@ mod tests {
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
json_writer.set_str("red");
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00sred"
)
assert_eq!(json_writer.term().value_bytes(), b"color\x00sred")
}
#[test]
@@ -336,8 +333,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_fast_value(-4i64);
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00i\x7f\xff\xff\xff\xff\xff\xff\xfc"
json_writer.term().value_bytes(),
b"color\x00i\x7f\xff\xff\xff\xff\xff\xff\xfc"
)
}
@@ -350,8 +347,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_fast_value(4u64);
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00u\x00\x00\x00\x00\x00\x00\x00\x04"
json_writer.term().value_bytes(),
b"color\x00u\x00\x00\x00\x00\x00\x00\x00\x04"
)
}
@@ -364,8 +361,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_fast_value(4.0f64);
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00f\xc0\x10\x00\x00\x00\x00\x00\x00"
json_writer.term().value_bytes(),
b"color\x00f\xc0\x10\x00\x00\x00\x00\x00\x00"
)
}
@@ -380,8 +377,8 @@ mod tests {
json_writer.push_path_segment("color");
json_writer.set_str("red");
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jattribute\x01color\x00sred"
json_writer.term().value_bytes(),
b"attribute\x01color\x00sred"
)
}
@@ -395,10 +392,7 @@ mod tests {
json_writer.push_path_segment("hue");
json_writer.pop_path_segment();
json_writer.set_str("red");
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00sred"
)
assert_eq!(json_writer.term().value_bytes(), b"color\x00sred")
}
#[test]

View File

@@ -6,8 +6,7 @@ use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::indexer::json_term_writer::index_json_values;
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::postings::{
compute_table_size, serialize_postings, IndexingContext, IndexingPosition,
PerFieldPostingsWriter, PostingsWriter,
serialize_postings, IndexingContext, IndexingPosition, PerFieldPostingsWriter, PostingsWriter,
};
use crate::schema::{FieldEntry, FieldType, FieldValue, Schema, Term, Value};
use crate::store::{StoreReader, StoreWriter};
@@ -16,25 +15,6 @@ use crate::tokenizer::{
};
use crate::{DocId, Document, Opstamp, SegmentComponent};
/// Computes the initial size of the hash table.
///
/// Returns the recommended initial table size as a power of 2.
///
/// Note this is a very dumb way to compute log2, but it is easier to proofread that way.
fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
let table_memory_upper_bound = per_thread_memory_budget / 3;
(10..20) // We cap it at 2^19 = 512K capacity.
.map(|power| 1 << power)
.take_while(|capacity| compute_table_size(*capacity) < table_memory_upper_bound)
.last()
.ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!(
"per thread memory budget (={per_thread_memory_budget}) is too small. Raise the \
memory budget or lower the number of threads."
))
})
}
fn remap_doc_opstamps(
opstamps: Vec<Opstamp>,
doc_id_mapping_opt: Option<&DocIdMapping>,
@@ -78,12 +58,11 @@ impl SegmentWriter {
/// - segment: The segment being written
/// - schema
pub fn for_segment(
memory_budget_in_bytes: usize,
_memory_budget_in_bytes: usize,
segment: Segment,
schema: Schema,
) -> crate::Result<SegmentWriter> {
let tokenizer_manager = segment.index().tokenizers().clone();
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema);
let per_field_text_analyzers = schema
@@ -106,7 +85,7 @@ impl SegmentWriter {
.collect();
Ok(SegmentWriter {
max_doc: 0,
ctx: IndexingContext::new(table_size),
ctx: IndexingContext::new(),
per_field_postings_writers,
fieldnorms_writer: FieldNormsWriter::for_schema(&schema),
segment_serializer,
@@ -149,6 +128,7 @@ impl SegmentWriter {
pub fn mem_usage(&self) -> usize {
self.ctx.mem_usage()
+ self.fieldnorms_writer.mem_usage()
+ self.per_field_postings_writers.mem_usage()
+ self.fast_field_writers.mem_usage()
+ self.segment_serializer.mem_usage()
}
@@ -223,7 +203,7 @@ impl SegmentWriter {
let mut indexing_position = IndexingPosition::default();
for mut token_stream in token_streams {
assert_eq!(term_buffer.as_slice().len(), 5);
// assert_eq!(term_buffer.as_slice().len(), 5);
postings_writer.index_text(
doc_id,
&mut *token_stream,
@@ -418,7 +398,6 @@ pub fn prepare_doc_for_store(doc: Document, schema: &Schema) -> Document {
#[cfg(test)]
mod tests {
use super::compute_initial_table_size;
use crate::collector::Count;
use crate::indexer::json_term_writer::JsonTermWriter;
use crate::postings::TermInfo;
@@ -429,15 +408,6 @@ mod tests {
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{DateTime, DocAddress, DocSet, Document, Index, Postings, Term, TERMINATED};
#[test]
fn test_hashmap_size() {
assert_eq!(compute_initial_table_size(100_000).unwrap(), 1 << 11);
assert_eq!(compute_initial_table_size(1_000_000).unwrap(), 1 << 14);
assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 17);
assert_eq!(compute_initial_table_size(1_000_000_000).unwrap(), 1 << 19);
assert_eq!(compute_initial_table_size(4_000_000_000).unwrap(), 1 << 19);
}
#[test]
fn test_prepare_for_store() {
let mut schema_builder = Schema::builder();

View File

@@ -1,27 +1,24 @@
use crate::postings::stacker::{MemoryArena, TermHashMap};
use crate::postings::stacker::MemoryArena;
/// IndexingContext contains all of the transient memory arenas
/// required for building the inverted index.
pub(crate) struct IndexingContext {
/// The term index is an adhoc hashmap,
/// itself backed by a dedicated memory arena.
pub term_index: TermHashMap,
/// Arena is a memory arena that stores posting lists / term frequencies / positions.
pub arena: MemoryArena,
pub arena_terms: MemoryArena,
}
impl IndexingContext {
/// Create a new IndexingContext given the size of the term hash map.
pub(crate) fn new(table_size: usize) -> IndexingContext {
let term_index = TermHashMap::new(table_size);
pub(crate) fn new() -> IndexingContext {
IndexingContext {
arena: MemoryArena::new(),
term_index,
arena_terms: MemoryArena::new(),
}
}
/// Returns the memory usage for the inverted index memory arenas, in bytes.
pub(crate) fn mem_usage(&self) -> usize {
self.term_index.mem_usage() + self.arena.mem_usage()
self.arena.mem_usage() + self.arena_terms.mem_usage()
}
}

View File

@@ -1,5 +1,6 @@
use std::io;
use super::stacker::TermHashMap;
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::postings_writer::SpecializedPostingsWriter;
@@ -26,6 +27,14 @@ impl<Rec: Recorder> From<JsonPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
}
impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
fn mem_usage(&self) -> usize {
self.str_posting_writer.mem_usage() + self.non_str_posting_writer.mem_usage()
}
fn term_map(&self) -> &TermHashMap {
self.str_posting_writer.term_map()
}
fn subscribe(
&mut self,
doc: crate::DocId,
@@ -74,6 +83,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
doc_id_map,
&mut buffer_lender,
ctx,
&self.str_posting_writer.term_map,
serializer,
)?;
} else {
@@ -83,6 +93,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
doc_id_map,
&mut buffer_lender,
ctx,
&self.str_posting_writer.term_map,
serializer,
)?;
}

View File

@@ -26,7 +26,6 @@ pub(crate) use self::postings_writer::{serialize_postings, IndexingPosition, Pos
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub(crate) use self::stacker::compute_table_size;
pub use self::term_info::TermInfo;
pub(crate) type UnorderedTermId = u64;

View File

@@ -10,9 +10,10 @@ pub(crate) struct PerFieldPostingsWriter {
impl PerFieldPostingsWriter {
pub fn for_schema(schema: &Schema) -> Self {
let num_fields = schema.num_fields();
let per_field_postings_writers = schema
.fields()
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry))
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry, num_fields))
.collect();
PerFieldPostingsWriter {
per_field_postings_writers,
@@ -26,9 +27,19 @@ impl PerFieldPostingsWriter {
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_mut()
}
pub(crate) fn mem_usage(&self) -> usize {
self.per_field_postings_writers
.iter()
.map(|postings_writer| postings_writer.mem_usage())
.sum()
}
}
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter> {
fn posting_writer_from_field_entry(
field_entry: &FieldEntry,
_num_fields: usize,
) -> Box<dyn PostingsWriter> {
match *field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()

View File

@@ -1,11 +1,10 @@
use std::collections::HashMap;
use std::io;
use std::marker::PhantomData;
use std::ops::Range;
use fnv::FnvHashMap;
use super::stacker::Addr;
use super::stacker::{Addr, TermHashMap};
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -21,31 +20,6 @@ use crate::DocId;
const POSITION_GAP: u32 = 1;
fn make_field_partition(
term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)],
) -> Vec<(Field, Range<usize>)> {
let term_offsets_it = term_offsets
.iter()
.map(|(term, _, _)| term.field())
.enumerate();
let mut prev_field_opt = None;
let mut fields = vec![];
let mut offsets = vec![];
for (offset, field) in term_offsets_it {
if Some(field) != prev_field_opt {
prev_field_opt = Some(field);
fields.push(field);
offsets.push(offset);
}
}
offsets.push(term_offsets.len());
let mut field_offsets = vec![];
for i in 0..fields.len() {
field_offsets.push((fields[i], offsets[i]..offsets[i + 1]));
}
field_offsets
}
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
@@ -57,23 +31,23 @@ pub(crate) fn serialize_postings(
schema: &Schema,
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();
let field_offsets = make_field_partition(&term_offsets);
for (field, byte_offsets) in field_offsets {
for (field, _) in schema.fields() {
let postings_writer = per_field_postings_writers.get_for_field(field);
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(postings_writer.term_map().len());
term_offsets.extend(postings_writer.term_map().iter(&ctx.arena_terms));
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let field_entry = schema.get_field_entry(field);
match *field_entry.field_type() {
FieldType::Str(_) | FieldType::Facet(_) => {
// populating the (unordered term ord) -> (ordered term ord) mapping
// for the field.
let unordered_term_ids = term_offsets[byte_offsets.clone()]
.iter()
.map(|&(_, _, bucket)| bucket);
let unordered_term_ids = term_offsets.iter().map(|&(_, _, bucket)| bucket);
let mapping: FnvHashMap<UnorderedTermId, TermOrdinal> = unordered_term_ids
.enumerate()
.map(|(term_ord, unord_term_id)| {
@@ -87,16 +61,10 @@ pub(crate) fn serialize_postings(
FieldType::JsonObject(_) => {}
}
let postings_writer = per_field_postings_writers.get_for_field(field);
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
let mut field_serializer =
serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?;
postings_writer.serialize(
&term_offsets[byte_offsets],
doc_id_map,
&ctx,
&mut field_serializer,
)?;
postings_writer.serialize(&term_offsets, doc_id_map, &ctx, &mut field_serializer)?;
field_serializer.close()?;
}
Ok(unordered_term_mappings)
@@ -128,6 +96,10 @@ pub(crate) trait PostingsWriter {
ctx: &mut IndexingContext,
) -> UnorderedTermId;
fn mem_usage(&self) -> usize;
fn term_map(&self) -> &TermHashMap;
/// Serializes the postings on disk.
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
@@ -148,7 +120,7 @@ pub(crate) trait PostingsWriter {
indexing_position: &mut IndexingPosition,
mut term_id_fast_field_writer_opt: Option<&mut MultiValuedFastFieldWriter>,
) {
let end_of_path_idx = term_buffer.as_slice().len();
let end_of_path_idx = term_buffer.value_bytes().len();
let mut num_tokens = 0;
let mut end_position = 0;
token_stream.process(&mut |token: &Token| {
@@ -188,6 +160,7 @@ pub(crate) trait PostingsWriter {
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
total_num_tokens: u64,
_recorder_type: PhantomData<Rec>,
pub(crate) term_map: TermHashMap,
}
impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
@@ -206,9 +179,10 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
doc_id_map: Option<&DocIdMapping>,
buffer_lender: &mut BufferLender,
ctx: &IndexingContext,
term_index: &TermHashMap,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let recorder: Rec = ctx.term_index.read(addr);
let recorder: Rec = term_index.read(addr, &ctx.arena_terms);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
@@ -218,6 +192,14 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
}
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
fn mem_usage(&self) -> usize {
self.term_map.mem_usage()
}
fn term_map(&self) -> &TermHashMap {
&self.term_map
}
fn subscribe(
&mut self,
doc: DocId,
@@ -225,25 +207,30 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
term: &Term,
ctx: &mut IndexingContext,
) -> UnorderedTermId {
debug_assert!(term.as_slice().len() >= 4);
//debug_assert!(term.value_bytes().len() >= 1);
self.total_num_tokens += 1;
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(arena);
let arena = &mut ctx.arena;
let arena_terms = &mut ctx.arena_terms;
self.term_map.mutate_or_create(
term.value_bytes(),
arena_terms,
|opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(arena);
recorder.new_doc(doc, arena);
}
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::default();
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::default();
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
}) as UnorderedTermId
},
) as UnorderedTermId
}
fn serialize(
@@ -255,7 +242,15 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
Self::serialize_one_term(
term,
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
&self.term_map,
serializer,
)?;
}
Ok(())
}

View File

@@ -46,6 +46,7 @@ impl Addr {
}
/// Returns the `Addr` object for `addr + offset`
#[inline]
pub fn offset(self, offset: u32) -> Addr {
Addr(self.0.wrapping_add(offset))
}
@@ -54,20 +55,24 @@ impl Addr {
Addr((page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32)
}
#[inline]
fn page_id(self) -> usize {
(self.0 as usize) >> NUM_BITS_PAGE_ADDR
}
#[inline]
fn page_local_addr(self) -> usize {
(self.0 as usize) & (PAGE_SIZE - 1)
}
/// Returns true if and only if the `Addr` is null.
#[inline]
pub fn is_null(self) -> bool {
self.0 == u32::max_value()
}
}
#[inline]
pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
assert_eq!(dest.len(), std::mem::size_of::<Item>());
unsafe {
@@ -75,6 +80,7 @@ pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
}
}
#[inline]
pub fn load<Item: Copy + 'static>(data: &[u8]) -> Item {
assert_eq!(data.len(), std::mem::size_of::<Item>());
unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) }
@@ -110,6 +116,7 @@ impl MemoryArena {
self.pages.len() * PAGE_SIZE
}
#[inline]
pub fn write_at<Item: Copy + 'static>(&mut self, addr: Addr, val: Item) {
let dest = self.slice_mut(addr, std::mem::size_of::<Item>());
store(dest, val);
@@ -120,6 +127,7 @@ impl MemoryArena {
/// # Panics
///
/// If the address is erroneous
#[inline]
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
load(self.slice(addr, mem::size_of::<Item>()))
}
@@ -128,6 +136,7 @@ impl MemoryArena {
self.pages[addr.page_id()].slice(addr.page_local_addr(), len)
}
#[inline]
pub fn slice_from(&self, addr: Addr) -> &[u8] {
self.pages[addr.page_id()].slice_from(addr.page_local_addr())
}

View File

@@ -4,4 +4,4 @@ mod term_hashmap;
pub(crate) use self::expull::ExpUnrolledLinkedList;
pub(crate) use self::memory_arena::{Addr, MemoryArena};
pub(crate) use self::term_hashmap::{compute_table_size, TermHashMap};
pub(crate) use self::term_hashmap::TermHashMap;

View File

@@ -1,6 +1,6 @@
use std::convert::TryInto;
use std::{iter, mem, slice};
use byteorder::{ByteOrder, NativeEndian};
use murmurhash32::murmurhash2;
use super::{Addr, MemoryArena};
@@ -8,13 +8,6 @@ use crate::postings::stacker::memory_arena::store;
use crate::postings::UnorderedTermId;
use crate::Term;
/// Returns the actual memory size in bytes
/// required to create a table with a given capacity.
/// required to create a table of size
pub(crate) fn compute_table_size(capacity: usize) -> usize {
capacity * mem::size_of::<KeyValue>()
}
/// `KeyValue` is the item stored in the hash table.
/// The key is actually a `BytesRef` object stored in an external memory arena.
/// The `value_addr` also points to an address in the memory arena.
@@ -36,6 +29,7 @@ impl Default for KeyValue {
}
impl KeyValue {
#[inline]
fn is_empty(self) -> bool {
self.key_value_addr.is_null()
}
@@ -51,12 +45,17 @@ impl KeyValue {
/// or copying the key as long as there is no insert.
pub struct TermHashMap {
table: Box<[KeyValue]>,
memory_arena: MemoryArena,
mask: usize,
occupied: Vec<usize>,
len: usize,
}
impl Default for TermHashMap {
fn default() -> Self {
Self::new(1 << 10)
}
}
struct QuadraticProbing {
hash: usize,
i: usize,
@@ -75,18 +74,21 @@ impl QuadraticProbing {
}
}
pub struct Iter<'a> {
pub struct Iter<'a, 'm> {
hashmap: &'a TermHashMap,
memory_arena: &'m MemoryArena,
inner: slice::Iter<'a, usize>,
}
impl<'a> Iterator for Iter<'a> {
type Item = (Term<&'a [u8]>, Addr, UnorderedTermId);
impl<'a, 'm> Iterator for Iter<'a, 'm> {
type Item = (Term<&'m [u8]>, Addr, UnorderedTermId);
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().cloned().map(move |bucket: usize| {
let kv = self.hashmap.table[bucket];
let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr);
let (key, offset): (&'m [u8], Addr) = self
.hashmap
.get_key_value(kv.key_value_addr, self.memory_arena);
(Term::wrap(key), offset, kv.unordered_term_id)
})
}
@@ -106,21 +108,19 @@ impl TermHashMap {
pub(crate) fn new(table_size: usize) -> TermHashMap {
assert!(table_size > 0);
let table_size_power_of_2 = compute_previous_power_of_two(table_size);
let memory_arena = MemoryArena::new();
let table: Vec<KeyValue> = iter::repeat(KeyValue::default())
.take(table_size_power_of_2)
.collect();
TermHashMap {
table: table.into_boxed_slice(),
memory_arena,
mask: table_size_power_of_2 - 1,
occupied: Vec::with_capacity(table_size_power_of_2 / 2),
len: 0,
}
}
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
self.memory_arena.read(addr)
pub fn read<Item: Copy + 'static>(&self, addr: Addr, memory_arena: &MemoryArena) -> Item {
memory_arena.read(addr)
}
fn probe(&self, hash: u32) -> QuadraticProbing {
@@ -129,6 +129,8 @@ impl TermHashMap {
pub fn mem_usage(&self) -> usize {
self.table.len() * mem::size_of::<KeyValue>()
+ self.occupied.len()
* std::mem::size_of_val(&self.occupied.get(0).cloned().unwrap_or_default())
}
fn is_saturated(&self) -> bool {
@@ -136,16 +138,22 @@ impl TermHashMap {
}
#[inline]
fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
let data = self.memory_arena.slice_from(addr);
let key_bytes_len = NativeEndian::read_u16(data) as usize;
let key_bytes: &[u8] = &data[2..][..key_bytes_len];
fn get_key_value<'m>(&self, addr: Addr, memory_arena: &'m MemoryArena) -> (&'m [u8], Addr) {
let data = memory_arena.slice_from(addr);
let (key_bytes_len_enc, data) = data.split_at(2);
let key_bytes_len: u16 = u16::from_ne_bytes(key_bytes_len_enc.try_into().unwrap());
let key_bytes: &[u8] = &data[..key_bytes_len as usize];
(key_bytes, addr.offset(2u32 + key_bytes_len as u32))
}
#[inline]
fn get_value_addr_if_key_match(&self, target_key: &[u8], addr: Addr) -> Option<Addr> {
let (stored_key, value_addr) = self.get_key_value(addr);
fn get_value_addr_if_key_match(
&self,
target_key: &[u8],
addr: Addr,
memory_arena: &mut MemoryArena,
) -> Option<Addr> {
let (stored_key, value_addr) = self.get_key_value(addr, memory_arena);
if stored_key == target_key {
Some(value_addr)
} else {
@@ -169,10 +177,11 @@ impl TermHashMap {
self.len
}
pub fn iter(&self) -> Iter<'_> {
pub fn iter<'a, 'm>(&'a self, memory_arena: &'m MemoryArena) -> Iter<'a, 'm> {
Iter {
inner: self.occupied.iter(),
hashmap: self,
memory_arena,
}
}
@@ -209,6 +218,7 @@ impl TermHashMap {
pub fn mutate_or_create<V, TMutator>(
&mut self,
key: &[u8],
memory_arena: &mut MemoryArena,
mut updater: TMutator,
) -> UnorderedTermId
where
@@ -219,28 +229,33 @@ impl TermHashMap {
self.resize();
}
let hash = murmurhash2(key);
let mut probe = self.probe(hash);
loop {
let bucket = probe.next_probe();
let kv: KeyValue = self.table[bucket];
if kv.is_empty() {
// The key does not exists yet.
let val = updater(None);
let num_bytes = std::mem::size_of::<u16>() + key.len() + std::mem::size_of::<V>();
let key_addr = self.memory_arena.allocate_space(num_bytes);
let key_addr = memory_arena.allocate_space(num_bytes);
{
let data = self.memory_arena.slice_mut(key_addr, num_bytes);
NativeEndian::write_u16(data, key.len() as u16);
let stop = 2 + key.len();
data[2..stop].copy_from_slice(key);
let data = memory_arena.slice_mut(key_addr, num_bytes);
let (key_len, data) = data.split_at_mut(2);
key_len.copy_from_slice(&(key.len() as u16).to_le_bytes());
let stop = key.len();
data[..key.len()].copy_from_slice(key);
store(&mut data[stop..], val);
}
return self.set_bucket(hash, key_addr, bucket);
} else if kv.hash == hash {
if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) {
let v = self.memory_arena.read(val_addr);
if let Some(val_addr) =
self.get_value_addr_if_key_match(key, kv.key_value_addr, memory_arena)
{
let v = memory_arena.read(val_addr);
let new_v = updater(Some(v));
self.memory_arena.write_at(val_addr, new_v);
memory_arena.write_at(val_addr, new_v);
return kv.unordered_term_id;
}
}
@@ -254,26 +269,28 @@ mod tests {
use std::collections::HashMap;
use super::{compute_previous_power_of_two, TermHashMap};
use crate::postings::stacker::MemoryArena;
#[test]
fn test_hash_map() {
let mut arena = MemoryArena::new();
let mut hash_map: TermHashMap = TermHashMap::new(1 << 18);
hash_map.mutate_or_create(b"abc", |opt_val: Option<u32>| {
hash_map.mutate_or_create(b"abc", &mut arena, |opt_val: Option<u32>| {
assert_eq!(opt_val, None);
3u32
});
hash_map.mutate_or_create(b"abcd", |opt_val: Option<u32>| {
hash_map.mutate_or_create(b"abcd", &mut arena, |opt_val: Option<u32>| {
assert_eq!(opt_val, None);
4u32
});
hash_map.mutate_or_create(b"abc", |opt_val: Option<u32>| {
hash_map.mutate_or_create(b"abc", &mut arena, |opt_val: Option<u32>| {
assert_eq!(opt_val, Some(3u32));
5u32
});
let mut vanilla_hash_map = HashMap::new();
let iter_values = hash_map.iter();
let iter_values = hash_map.iter(&arena);
for (key, addr, _) in iter_values {
let val: u32 = hash_map.memory_arena.read(addr);
let val: u32 = arena.read(addr);
vanilla_hash_map.insert(key.to_owned(), val);
}
assert_eq!(vanilla_hash_map.len(), 2);

View File

@@ -718,7 +718,7 @@ fn generate_literals_for_json_object(
}
json_term_writer.close_path_and_set_type(Type::Str);
drop(json_term_writer);
let term_num_bytes = term.as_slice().len();
let term_num_bytes = term.value_bytes().len();
let mut token_stream = text_analyzer.token_stream(phrase);
let mut terms: Vec<(usize, Term)> = Vec::new();
token_stream.process(&mut |token| {

View File

@@ -17,7 +17,7 @@ use crate::DateTime;
///
/// - <value> is, if this is not the json term, a binary representation specific to the type.
/// If it is a JSON Term, then it is preprended with the path that leads to this leaf value.
const FAST_VALUE_TERM_LEN: usize = 4 + 1 + 8;
const FAST_VALUE_TERM_LEN: usize = 8;
/// Separates the different segments of
/// the json path.
@@ -33,22 +33,33 @@ pub const JSON_END_OF_PATH: u8 = 0u8;
///
/// It actually wraps a `Vec<u8>`.
#[derive(Clone)]
pub struct Term<B = Vec<u8>>(B)
where B: AsRef<[u8]>;
pub struct Term<B = Vec<u8>> {
data: B,
field: Field,
field_type: Type,
}
impl AsMut<Vec<u8>> for Term {
fn as_mut(&mut self) -> &mut Vec<u8> {
&mut self.0
&mut self.data
}
}
impl Term {
pub(crate) fn new() -> Term {
Term(Vec::with_capacity(100))
Self::with_capacity(32)
}
pub(crate) fn with_capacity(cap: usize) -> Term {
Term {
data: Vec::with_capacity(cap),
field: Field::from_field_id(0),
field_type: Type::Str,
}
}
fn from_fast_value<T: FastValue>(field: Field, val: &T) -> Term {
let mut term = Term(vec![0u8; FAST_VALUE_TERM_LEN]);
let mut term = Term::with_capacity(FAST_VALUE_TERM_LEN);
term.set_field(T::to_type(), field);
term.set_u64(val.to_u64());
term
@@ -86,9 +97,9 @@ impl Term {
}
fn create_bytes_term(typ: Type, field: Field, bytes: &[u8]) -> Term {
let mut term = Term(vec![0u8; 5 + bytes.len()]);
let mut term = Term::with_capacity(bytes.len());
term.set_field(typ, field);
term.0.extend_from_slice(bytes);
term.data.extend_from_slice(bytes);
term
}
@@ -98,10 +109,9 @@ impl Term {
}
pub(crate) fn set_field(&mut self, typ: Type, field: Field) {
self.0.clear();
self.0
.extend_from_slice(field.field_id().to_be_bytes().as_ref());
self.0.push(typ.to_code());
self.field = field;
self.field_type = typ;
self.data.clear();
}
/// Sets a u64 value in the term.
@@ -112,11 +122,9 @@ impl Term {
/// the natural order of the values.
pub fn set_u64(&mut self, val: u64) {
self.set_fast_value(val);
self.set_bytes(val.to_be_bytes().as_ref());
}
fn set_fast_value<T: FastValue>(&mut self, val: T) {
self.0.resize(FAST_VALUE_TERM_LEN, 0u8);
self.set_bytes(val.to_u64().to_be_bytes().as_ref());
}
@@ -137,8 +145,8 @@ impl Term {
/// Sets the value of a `Bytes` field.
pub fn set_bytes(&mut self, bytes: &[u8]) {
self.0.resize(5, 0u8);
self.0.extend(bytes);
self.data.clear();
self.data.extend(bytes);
}
/// Set the texts only, keeping the field untouched.
@@ -148,18 +156,18 @@ impl Term {
/// Removes the value_bytes and set the type code.
pub fn clear_with_type(&mut self, typ: Type) {
self.truncate(5);
self.0[4] = typ.to_code();
self.data.clear();
self.field_type = typ;
}
/// Truncate the term right after the field and the type code.
pub fn truncate(&mut self, len: usize) {
self.0.truncate(len);
self.data.truncate(len);
}
/// Truncate the term right after the field and the type code.
pub fn append_bytes(&mut self, bytes: &[u8]) {
self.0.extend_from_slice(bytes);
self.data.extend_from_slice(bytes);
}
}
@@ -167,7 +175,7 @@ impl<B> Ord for Term<B>
where B: AsRef<[u8]>
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.as_slice().cmp(other.as_slice())
self.value_bytes().cmp(other.value_bytes())
}
}
@@ -183,7 +191,7 @@ impl<B> PartialEq for Term<B>
where B: AsRef<[u8]>
{
fn eq(&self, other: &Self) -> bool {
self.as_slice() == other.as_slice()
self.value_bytes() == other.value_bytes()
}
}
@@ -193,7 +201,7 @@ impl<B> Hash for Term<B>
where B: AsRef<[u8]>
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.as_ref().hash(state)
self.data.as_ref().hash(state)
}
}
@@ -202,14 +210,15 @@ where B: AsRef<[u8]>
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> Term<B> {
Term(data)
Term {
data,
field: Field::from_field_id(0),
field_type: Type::Str,
}
}
fn typ_code(&self) -> u8 {
*self
.as_slice()
.get(4)
.expect("the byte representation is too short")
self.field_type as u8
}
/// Return the type of the term.
@@ -219,55 +228,7 @@ where B: AsRef<[u8]>
/// Returns the field.
pub fn field(&self) -> Field {
let mut field_id_bytes = [0u8; 4];
field_id_bytes.copy_from_slice(&self.0.as_ref()[..4]);
Field::from_field_id(u32::from_be_bytes(field_id_bytes))
}
/// Returns the `u64` value stored in a term.
///
/// Returns None if the term is not of the u64 type, or if the term byte representation
/// is invalid.
pub fn as_u64(&self) -> Option<u64> {
self.get_fast_type::<u64>()
}
fn get_fast_type<T: FastValue>(&self) -> Option<T> {
if self.typ() != T::to_type() {
return None;
}
let mut value_bytes = [0u8; 8];
let bytes = self.value_bytes();
if bytes.len() != 8 {
return None;
}
value_bytes.copy_from_slice(self.value_bytes());
let value_u64 = u64::from_be_bytes(value_bytes);
Some(FastValue::from_u64(value_u64))
}
/// Returns the `i64` value stored in a term.
///
/// Returns None if the term is not of the i64 type, or if the term byte representation
/// is invalid.
pub fn as_i64(&self) -> Option<i64> {
self.get_fast_type::<i64>()
}
/// Returns the `f64` value stored in a term.
///
/// Returns None if the term is not of the f64 type, or if the term byte representation
/// is invalid.
pub fn as_f64(&self) -> Option<f64> {
self.get_fast_type::<f64>()
}
/// Returns the `Date` value stored in a term.
///
/// Returns None if the term is not of the Date type, or if the term byte representation
/// is invalid.
pub fn as_date(&self) -> Option<DateTime> {
self.get_fast_type::<DateTime>()
self.field
}
/// Returns the text associated with the term.
@@ -275,43 +236,12 @@ where B: AsRef<[u8]>
/// Returns None if the field is not of string type
/// or if the bytes are not valid utf-8.
pub fn as_str(&self) -> Option<&str> {
if self.as_slice().len() < 5 {
return None;
}
if self.typ() != Type::Str {
return None;
}
str::from_utf8(self.value_bytes()).ok()
}
/// Returns the facet associated with the term.
///
/// Returns None if the field is not of facet type
/// or if the bytes are not valid utf-8.
pub fn as_facet(&self) -> Option<Facet> {
if self.as_slice().len() < 5 {
return None;
}
if self.typ() != Type::Facet {
return None;
}
let facet_encode_str = str::from_utf8(self.value_bytes()).ok()?;
Some(Facet::from_encoded_string(facet_encode_str.to_string()))
}
/// Returns the bytes associated with the term.
///
/// Returns None if the field is not of bytes type.
pub fn as_bytes(&self) -> Option<&[u8]> {
if self.as_slice().len() < 5 {
return None;
}
if self.typ() != Type::Bytes {
return None;
}
Some(self.value_bytes())
}
/// Returns the serialized value of the term.
/// (this does not include the field.)
///
@@ -319,15 +249,7 @@ where B: AsRef<[u8]>
/// If the term is a u64, its value is encoded according
/// to `byteorder::LittleEndian`.
pub fn value_bytes(&self) -> &[u8] {
&self.0.as_ref()[5..]
}
/// Returns the underlying `&[u8]`.
///
/// Do NOT rely on this byte representation in the index.
/// This value is likely to change in the future.
pub(crate) fn as_slice(&self) -> &[u8] {
self.0.as_ref()
&self.data.as_ref()
}
}
@@ -434,7 +356,6 @@ mod tests {
let term = Term::from_field_u64(count_field, 983u64);
assert_eq!(term.field(), count_field);
assert_eq!(term.typ(), Type::U64);
assert_eq!(term.as_slice().len(), super::FAST_VALUE_TERM_LEN);
assert_eq!(term.as_u64(), Some(983u64))
assert_eq!(term.value_bytes().len(), super::FAST_VALUE_TERM_LEN);
}
}