mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-03 00:50:41 +00:00
docid deltas while indexing (#2249)
* docid deltas while indexing storing deltas is especially helpful for repetitive data like logs. In those cases, recording a doc on a term costed 4 bytes instead of 1 byte now. HDFS Indexing 1.1GB Total memory consumption: Before: 760 MB Now: 590 MB * use scan for delta decoding
This commit is contained in:
@@ -82,21 +82,12 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static {
|
||||
}
|
||||
|
||||
/// Only records the doc ids
|
||||
#[derive(Clone, Copy)]
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub struct DocIdRecorder {
|
||||
stack: ExpUnrolledLinkedList,
|
||||
current_doc: DocId,
|
||||
}
|
||||
|
||||
impl Default for DocIdRecorder {
|
||||
fn default() -> Self {
|
||||
DocIdRecorder {
|
||||
stack: ExpUnrolledLinkedList::default(),
|
||||
current_doc: u32::MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Recorder for DocIdRecorder {
|
||||
#[inline]
|
||||
fn current_doc(&self) -> DocId {
|
||||
@@ -105,8 +96,9 @@ impl Recorder for DocIdRecorder {
|
||||
|
||||
#[inline]
|
||||
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
|
||||
let delta = doc - self.current_doc;
|
||||
self.current_doc = doc;
|
||||
self.stack.writer(arena).write_u32_vint(doc);
|
||||
self.stack.writer(arena).write_u32_vint(delta);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -123,21 +115,20 @@ impl Recorder for DocIdRecorder {
|
||||
buffer_lender: &mut BufferLender,
|
||||
) {
|
||||
let (buffer, doc_ids) = buffer_lender.lend_all();
|
||||
self.stack.read_to_end(arena, buffer);
|
||||
// TODO avoid reading twice.
|
||||
self.stack.read_to_end(arena, buffer);
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
doc_ids.extend(
|
||||
VInt32Reader::new(&buffer[..])
|
||||
.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)),
|
||||
);
|
||||
let iter = get_sum_reader(VInt32Reader::new(&buffer[..]));
|
||||
doc_ids.extend(iter.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)));
|
||||
doc_ids.sort_unstable();
|
||||
|
||||
for doc in doc_ids {
|
||||
serializer.write_doc(*doc, 0u32, &[][..]);
|
||||
}
|
||||
} else {
|
||||
for doc in VInt32Reader::new(&buffer[..]) {
|
||||
serializer.write_doc(doc, 0u32, &[][..]);
|
||||
let iter = get_sum_reader(VInt32Reader::new(&buffer[..]));
|
||||
for doc_id in iter {
|
||||
serializer.write_doc(doc_id, 0u32, &[][..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -147,6 +138,15 @@ impl Recorder for DocIdRecorder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Takes an Iterator of delta encoded elements and returns an iterator
|
||||
/// that yields the sum of the elements.
|
||||
fn get_sum_reader(iter: impl Iterator<Item = u32>) -> impl Iterator<Item = u32> {
|
||||
iter.scan(0, |state, delta| {
|
||||
*state += delta;
|
||||
Some(*state)
|
||||
})
|
||||
}
|
||||
|
||||
/// Recorder encoding document ids, and term frequencies
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub struct TermFrequencyRecorder {
|
||||
@@ -164,9 +164,10 @@ impl Recorder for TermFrequencyRecorder {
|
||||
|
||||
#[inline]
|
||||
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
|
||||
let delta = doc - self.current_doc;
|
||||
self.term_doc_freq += 1;
|
||||
self.current_doc = doc;
|
||||
self.stack.writer(arena).write_u32_vint(doc);
|
||||
self.stack.writer(arena).write_u32_vint(delta);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -193,9 +194,12 @@ impl Recorder for TermFrequencyRecorder {
|
||||
let mut u32_it = VInt32Reader::new(&buffer[..]);
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
let mut doc_id_and_tf = vec![];
|
||||
while let Some(old_doc_id) = u32_it.next() {
|
||||
let mut prev_doc = 0;
|
||||
while let Some(delta_doc_id) = u32_it.next() {
|
||||
let doc_id = prev_doc + delta_doc_id;
|
||||
prev_doc = doc_id;
|
||||
let term_freq = u32_it.next().unwrap_or(self.current_tf);
|
||||
doc_id_and_tf.push((doc_id_map.get_new_doc_id(old_doc_id), term_freq));
|
||||
doc_id_and_tf.push((doc_id_map.get_new_doc_id(doc_id), term_freq));
|
||||
}
|
||||
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);
|
||||
|
||||
@@ -203,9 +207,12 @@ impl Recorder for TermFrequencyRecorder {
|
||||
serializer.write_doc(doc_id, tf, &[][..]);
|
||||
}
|
||||
} else {
|
||||
while let Some(doc) = u32_it.next() {
|
||||
let mut prev_doc = 0;
|
||||
while let Some(delta_doc_id) = u32_it.next() {
|
||||
let doc_id = prev_doc + delta_doc_id;
|
||||
prev_doc = doc_id;
|
||||
let term_freq = u32_it.next().unwrap_or(self.current_tf);
|
||||
serializer.write_doc(doc, term_freq, &[][..]);
|
||||
serializer.write_doc(doc_id, term_freq, &[][..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,23 +223,13 @@ impl Recorder for TermFrequencyRecorder {
|
||||
}
|
||||
|
||||
/// Recorder encoding term frequencies as well as positions.
|
||||
#[derive(Clone, Copy)]
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub struct TfAndPositionRecorder {
|
||||
stack: ExpUnrolledLinkedList,
|
||||
current_doc: DocId,
|
||||
term_doc_freq: u32,
|
||||
}
|
||||
|
||||
impl Default for TfAndPositionRecorder {
|
||||
fn default() -> Self {
|
||||
TfAndPositionRecorder {
|
||||
stack: ExpUnrolledLinkedList::default(),
|
||||
current_doc: u32::MAX,
|
||||
term_doc_freq: 0u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Recorder for TfAndPositionRecorder {
|
||||
#[inline]
|
||||
fn current_doc(&self) -> DocId {
|
||||
@@ -241,9 +238,10 @@ impl Recorder for TfAndPositionRecorder {
|
||||
|
||||
#[inline]
|
||||
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
|
||||
let delta = doc - self.current_doc;
|
||||
self.current_doc = doc;
|
||||
self.term_doc_freq += 1u32;
|
||||
self.stack.writer(arena).write_u32_vint(doc);
|
||||
self.stack.writer(arena).write_u32_vint(delta);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -269,7 +267,10 @@ impl Recorder for TfAndPositionRecorder {
|
||||
self.stack.read_to_end(arena, buffer_u8);
|
||||
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
|
||||
let mut doc_id_and_positions = vec![];
|
||||
while let Some(doc) = u32_it.next() {
|
||||
let mut prev_doc = 0;
|
||||
while let Some(delta_doc_id) = u32_it.next() {
|
||||
let doc_id = prev_doc + delta_doc_id;
|
||||
prev_doc = doc_id;
|
||||
let mut prev_position_plus_one = 1u32;
|
||||
buffer_positions.clear();
|
||||
loop {
|
||||
@@ -287,9 +288,9 @@ impl Recorder for TfAndPositionRecorder {
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
// this simple variant to remap may consume to much memory
|
||||
doc_id_and_positions
|
||||
.push((doc_id_map.get_new_doc_id(doc), buffer_positions.to_vec()));
|
||||
.push((doc_id_map.get_new_doc_id(doc_id), buffer_positions.to_vec()));
|
||||
} else {
|
||||
serializer.write_doc(doc, buffer_positions.len() as u32, buffer_positions);
|
||||
serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions);
|
||||
}
|
||||
}
|
||||
if doc_id_map.is_some() {
|
||||
|
||||
Reference in New Issue
Block a user