term hashmap per field, smaller page size

This commit is contained in:
Pascal Seitz
2022-05-01 13:13:09 +08:00
parent 7e3c0c5392
commit 2d1beeb6be
9 changed files with 119 additions and 79 deletions

View File

@@ -149,6 +149,7 @@ impl SegmentWriter {
pub fn mem_usage(&self) -> usize {
self.ctx.mem_usage()
+ self.fieldnorms_writer.mem_usage()
+ self.per_field_postings_writers.mem_usage()
+ self.fast_field_writers.mem_usage()
+ self.segment_serializer.mem_usage()
}

View File

@@ -1,11 +1,8 @@
use crate::postings::stacker::{MemoryArena, TermHashMap};
use crate::postings::stacker::MemoryArena;
/// IndexingContext contains all of the transient memory arenas
/// required for building the inverted index.
pub(crate) struct IndexingContext {
/// The term index is an adhoc hashmap,
/// itself backed by a dedicated memory arena.
pub term_index: TermHashMap,
/// Arena is a memory arena that stores posting lists / term frequencies / positions.
pub arena: MemoryArena,
}
@@ -13,15 +10,13 @@ pub(crate) struct IndexingContext {
impl IndexingContext {
/// Create a new IndexingContext given the size of the term hash map.
pub(crate) fn new(table_size: usize) -> IndexingContext {
let term_index = TermHashMap::new(table_size);
IndexingContext {
arena: MemoryArena::new(),
term_index,
}
}
/// Returns the memory usage for the inverted index memory arenas, in bytes.
pub(crate) fn mem_usage(&self) -> usize {
self.term_index.mem_usage() + self.arena.mem_usage()
self.arena.mem_usage()
}
}

View File

@@ -13,6 +13,8 @@ use crate::schema::Type;
use crate::tokenizer::TokenStream;
use crate::{DocId, Term};
use super::stacker::TermHashMap;
#[derive(Default)]
pub(crate) struct JsonPostingsWriter<Rec: Recorder> {
str_posting_writer: SpecializedPostingsWriter<Rec>,
@@ -26,6 +28,14 @@ impl<Rec: Recorder> From<JsonPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
}
impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
fn mem_usage(&self) -> usize {
self.str_posting_writer.mem_usage() + self.non_str_posting_writer.mem_usage()
}
fn term_map(&self) -> &TermHashMap {
self.str_posting_writer.term_map()
}
fn subscribe(
&mut self,
doc: crate::DocId,
@@ -74,6 +84,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
doc_id_map,
&mut buffer_lender,
ctx,
&self.str_posting_writer.term_map,
serializer,
)?;
} else {
@@ -83,6 +94,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
doc_id_map,
&mut buffer_lender,
ctx,
&self.str_posting_writer.term_map,
serializer,
)?;
}

View File

@@ -10,9 +10,10 @@ pub(crate) struct PerFieldPostingsWriter {
impl PerFieldPostingsWriter {
pub fn for_schema(schema: &Schema) -> Self {
let num_fields = schema.num_fields();
let per_field_postings_writers = schema
.fields()
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry))
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry, num_fields))
.collect();
PerFieldPostingsWriter {
per_field_postings_writers,
@@ -26,9 +27,19 @@ impl PerFieldPostingsWriter {
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_mut()
}
pub(crate) fn mem_usage(&self) -> usize {
self.per_field_postings_writers
.iter()
.map(|postings_writer| postings_writer.mem_usage())
.sum()
}
}
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter> {
fn posting_writer_from_field_entry(
field_entry: &FieldEntry,
_num_fields: usize,
) -> Box<dyn PostingsWriter> {
match *field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()

View File

@@ -5,7 +5,7 @@ use std::ops::Range;
use fnv::FnvHashMap;
use super::stacker::Addr;
use super::stacker::{Addr, TermHashMap};
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -21,31 +21,6 @@ use crate::DocId;
const POSITION_GAP: u32 = 1;
fn make_field_partition(
term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)],
) -> Vec<(Field, Range<usize>)> {
let term_offsets_it = term_offsets
.iter()
.map(|(term, _, _)| term.field())
.enumerate();
let mut prev_field_opt = None;
let mut fields = vec![];
let mut offsets = vec![];
for (offset, field) in term_offsets_it {
if Some(field) != prev_field_opt {
prev_field_opt = Some(field);
fields.push(field);
offsets.push(offset);
}
}
offsets.push(term_offsets.len());
let mut field_offsets = vec![];
for i in 0..fields.len() {
field_offsets.push((fields[i], offsets[i]..offsets[i + 1]));
}
field_offsets
}
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
@@ -57,23 +32,23 @@ pub(crate) fn serialize_postings(
schema: &Schema,
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();
let field_offsets = make_field_partition(&term_offsets);
for (field, byte_offsets) in field_offsets {
for (field, _) in schema.fields() {
let postings_writer = per_field_postings_writers.get_for_field(field);
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(postings_writer.term_map().len());
term_offsets.extend(postings_writer.term_map().iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let field_entry = schema.get_field_entry(field);
match *field_entry.field_type() {
FieldType::Str(_) | FieldType::Facet(_) => {
// populating the (unordered term ord) -> (ordered term ord) mapping
// for the field.
let unordered_term_ids = term_offsets[byte_offsets.clone()]
.iter()
.map(|&(_, _, bucket)| bucket);
let unordered_term_ids = term_offsets.iter().map(|&(_, _, bucket)| bucket);
let mapping: FnvHashMap<UnorderedTermId, TermOrdinal> = unordered_term_ids
.enumerate()
.map(|(term_ord, unord_term_id)| {
@@ -87,16 +62,10 @@ pub(crate) fn serialize_postings(
FieldType::JsonObject(_) => {}
}
let postings_writer = per_field_postings_writers.get_for_field(field);
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
let mut field_serializer =
serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?;
postings_writer.serialize(
&term_offsets[byte_offsets],
doc_id_map,
&ctx,
&mut field_serializer,
)?;
postings_writer.serialize(&term_offsets, doc_id_map, &ctx, &mut field_serializer)?;
field_serializer.close()?;
}
Ok(unordered_term_mappings)
@@ -128,6 +97,10 @@ pub(crate) trait PostingsWriter {
ctx: &mut IndexingContext,
) -> UnorderedTermId;
fn mem_usage(&self) -> usize;
fn term_map(&self) -> &TermHashMap;
/// Serializes the postings on disk.
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
@@ -188,6 +161,7 @@ pub(crate) trait PostingsWriter {
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
total_num_tokens: u64,
_recorder_type: PhantomData<Rec>,
pub(crate) term_map: TermHashMap,
}
impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
@@ -206,9 +180,10 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
doc_id_map: Option<&DocIdMapping>,
buffer_lender: &mut BufferLender,
ctx: &IndexingContext,
term_index: &TermHashMap,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let recorder: Rec = ctx.term_index.read(addr);
let recorder: Rec = term_index.read(addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
@@ -218,6 +193,14 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
}
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
fn mem_usage(&self) -> usize {
self.term_map.mem_usage()
}
fn term_map(&self) -> &TermHashMap {
&self.term_map
}
fn subscribe(
&mut self,
doc: DocId,
@@ -227,23 +210,24 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
) -> UnorderedTermId {
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(arena);
let arena = &mut ctx.arena;
self.term_map
.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(arena);
recorder.new_doc(doc, arena);
}
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::default();
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::default();
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
}) as UnorderedTermId
}) as UnorderedTermId
}
fn serialize(
@@ -255,7 +239,15 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
Self::serialize_one_term(
term,
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
&self.term_map,
serializer,
)?;
}
Ok(())
}

View File

@@ -24,7 +24,7 @@
//! stores your object using `ptr::write_unaligned` and `ptr::read_unaligned`.
use std::{mem, ptr};
const NUM_BITS_PAGE_ADDR: usize = 20;
const NUM_BITS_PAGE_ADDR: usize = 17;
const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large
/// Represents a pointer into the `MemoryArena`
@@ -46,6 +46,7 @@ impl Addr {
}
/// Returns the `Addr` object for `addr + offset`
#[inline]
pub fn offset(self, offset: u32) -> Addr {
Addr(self.0.wrapping_add(offset))
}
@@ -54,20 +55,24 @@ impl Addr {
Addr((page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32)
}
#[inline]
fn page_id(self) -> usize {
(self.0 as usize) >> NUM_BITS_PAGE_ADDR
}
#[inline]
fn page_local_addr(self) -> usize {
(self.0 as usize) & (PAGE_SIZE - 1)
}
/// Returns true if and only if the `Addr` is null.
#[inline]
pub fn is_null(self) -> bool {
self.0 == u32::max_value()
}
}
#[inline]
pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
assert_eq!(dest.len(), std::mem::size_of::<Item>());
unsafe {
@@ -75,6 +80,7 @@ pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
}
}
#[inline]
pub fn load<Item: Copy + 'static>(data: &[u8]) -> Item {
assert_eq!(data.len(), std::mem::size_of::<Item>());
unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) }
@@ -110,6 +116,7 @@ impl MemoryArena {
self.pages.len() * PAGE_SIZE
}
#[inline]
pub fn write_at<Item: Copy + 'static>(&mut self, addr: Addr, val: Item) {
let dest = self.slice_mut(addr, std::mem::size_of::<Item>());
store(dest, val);
@@ -120,6 +127,7 @@ impl MemoryArena {
/// # Panics
///
/// If the address is erroneous
#[inline]
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
load(self.slice(addr, mem::size_of::<Item>()))
}
@@ -128,6 +136,7 @@ impl MemoryArena {
self.pages[addr.page_id()].slice(addr.page_local_addr(), len)
}
#[inline]
pub fn slice_from(&self, addr: Addr) -> &[u8] {
self.pages[addr.page_id()].slice_from(addr.page_local_addr())
}

View File

@@ -36,6 +36,7 @@ impl Default for KeyValue {
}
impl KeyValue {
#[inline]
fn is_empty(self) -> bool {
self.key_value_addr.is_null()
}
@@ -57,6 +58,12 @@ pub struct TermHashMap {
len: usize,
}
impl Default for TermHashMap {
fn default() -> Self {
Self::new(1 << 10)
}
}
struct QuadraticProbing {
hash: usize,
i: usize,

View File

@@ -347,7 +347,9 @@ impl Schema {
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.0.fields.len()))?;
for e in &self.0.fields {
seq.serialize_element(e)?;
@@ -358,7 +360,9 @@ impl Serialize for Schema {
impl<'de> Deserialize<'de> for Schema {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
where
D: Deserializer<'de>,
{
struct SchemaVisitor;
impl<'de> Visitor<'de> for SchemaVisitor {
@@ -369,7 +373,9 @@ impl<'de> Deserialize<'de> for Schema {
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where A: SeqAccess<'de> {
where
A: SeqAccess<'de>,
{
let mut schema = SchemaBuilder {
fields: Vec::with_capacity(seq.size_hint().unwrap_or(0)),
fields_map: HashMap::with_capacity(seq.size_hint().unwrap_or(0)),

View File

@@ -34,7 +34,8 @@ pub const JSON_END_OF_PATH: u8 = 0u8;
/// It actually wraps a `Vec<u8>`.
#[derive(Clone)]
pub struct Term<B = Vec<u8>>(B)
where B: AsRef<[u8]>;
where
B: AsRef<[u8]>;
impl AsMut<Vec<u8>> for Term {
fn as_mut(&mut self) -> &mut Vec<u8> {
@@ -164,7 +165,8 @@ impl Term {
}
impl<B> Ord for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.as_slice().cmp(other.as_slice())
@@ -172,7 +174,8 @@ where B: AsRef<[u8]>
}
impl<B> PartialOrd for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
@@ -180,7 +183,8 @@ where B: AsRef<[u8]>
}
impl<B> PartialEq for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn eq(&self, other: &Self) -> bool {
self.as_slice() == other.as_slice()
@@ -190,7 +194,8 @@ where B: AsRef<[u8]>
impl<B> Eq for Term<B> where B: AsRef<[u8]> {}
impl<B> Hash for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.as_ref().hash(state)
@@ -198,7 +203,8 @@ where B: AsRef<[u8]>
}
impl<B> Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
/// Wraps a object holding bytes
pub fn wrap(data: B) -> Term<B> {
@@ -399,7 +405,8 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re
}
impl<B> fmt::Debug for Term<B>
where B: AsRef<[u8]>
where
B: AsRef<[u8]>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let field_id = self.field().field_id();