change format for store to make it faster with small documents (#1569)

* use new format for docstore blocks

* move index to end of block

it makes writing the block faster due to one less memcopy
This commit is contained in:
trinity-1686a
2022-10-04 09:58:55 +02:00
committed by GitHub
parent 4cf911d56a
commit 5945dbf0bd
2 changed files with 61 additions and 58 deletions

View File

@@ -1,10 +1,10 @@
use std::io;
use std::iter::Sum;
use std::ops::AddAssign;
use std::ops::{AddAssign, Range};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use common::{BinarySerializable, HasLen, VInt};
use common::{BinarySerializable, HasLen};
use lru::LruCache;
use ownedbytes::OwnedBytes;
@@ -211,17 +211,10 @@ impl StoreReader {
doc_id: DocId,
checkpoint: &Checkpoint,
) -> crate::Result<OwnedBytes> {
let mut cursor = &block[..];
let cursor_len_before = cursor.len();
for _ in checkpoint.doc_range.start..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_pos = doc_id - checkpoint.doc_range.start;
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let start_pos = cursor_len_before - cursor.len();
let end_pos = cursor_len_before - cursor.len() + doc_length;
Ok(block.slice(start_pos..end_pos))
let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
}
/// Iterator over all Documents in their order as they are stored in the doc store.
@@ -254,9 +247,7 @@ impl StoreReader {
let mut curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning
let mut block_start_pos = 0;
let mut num_skipped = 0;
let mut reset_block_pos = false;
let mut doc_pos = 0;
(0..last_doc_id)
.filter_map(move |doc_id| {
// filter_map is only used to resolve lifetime issues between the two closures on
@@ -268,24 +259,19 @@ impl StoreReader {
curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind()));
reset_block_pos = true;
num_skipped = 0;
doc_pos = 0;
}
let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if alive {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
// the map block will move over the num_skipped, so we reset to 0
num_skipped = 0;
reset_block_pos = false;
ret
let res = if alive {
Some((curr_block.clone(), doc_pos))
} else {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
None
}
};
doc_pos += 1;
res
})
.map(move |(block, num_skipped, reset_block_pos)| {
.map(move |(block, doc_pos)| {
let block = block
.ok_or_else(|| {
DataCorruption::comment_only(
@@ -296,30 +282,9 @@ impl StoreReader {
.map_err(|error_kind| {
std::io::Error::new(error_kind, "error when reading block in doc store")
})?;
// this flag is set, when filter_map moved to the next block
if reset_block_pos {
block_start_pos = 0;
}
let mut cursor = &block[block_start_pos..];
let mut pos = 0;
// move forward 1 doc + num_skipped in block and return length of current doc
let doc_length = loop {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let num_bytes_read = block[block_start_pos..].len() - cursor.len();
block_start_pos += num_bytes_read;
pos += 1;
if pos == num_skipped + 1 {
break doc_length;
} else {
block_start_pos += doc_length;
cursor = &block[block_start_pos..];
}
};
let end_pos = block_start_pos + doc_length;
let doc_bytes = block.slice(block_start_pos..end_pos);
block_start_pos = end_pos;
Ok(doc_bytes)
let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
})
}
@@ -329,6 +294,28 @@ impl StoreReader {
}
}
fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result<Range<usize>> {
let doc_pos = doc_pos as usize;
let size_of_u32 = std::mem::size_of::<u32>();
let index_len_pos = block.len() - size_of_u32;
let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize;
if doc_pos > index_len {
return Err(crate::TantivyError::InternalError(
"Attempted to read doc from wrong block".to_owned(),
));
}
let index_start = block.len() - (index_len + 1) * size_of_u32;
let index = &block[index_start..index_start + index_len * size_of_u32];
let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize;
let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..])
.unwrap_or(index_start as u32) as usize;
Ok(start_offset..end_offset)
}
#[cfg(feature = "quickwit")]
impl StoreReader {
/// Advanced API.
@@ -427,7 +414,7 @@ mod tests {
assert_eq!(store.cache_stats().cache_hits, 1);
assert_eq!(store.cache_stats().cache_misses, 2);
assert_eq!(store.cache.peek_lru(), Some(9210));
assert_eq!(store.cache.peek_lru(), Some(11163));
Ok(())
}

View File

@@ -1,6 +1,6 @@
use std::io::{self, Write};
use common::{BinarySerializable, VInt};
use common::BinarySerializable;
use super::compressors::Compressor;
use super::StoreReader;
@@ -22,6 +22,7 @@ pub struct StoreWriter {
num_docs_in_current_block: DocId,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
doc_pos: Vec<u32>,
block_compressor: BlockCompressor,
}
@@ -42,6 +43,7 @@ impl StoreWriter {
block_size,
num_docs_in_current_block: 0,
intermediary_buffer: Vec::new(),
doc_pos: Vec::new(),
current_block: Vec::new(),
block_compressor,
})
@@ -53,12 +55,17 @@ impl StoreWriter {
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.intermediary_buffer.capacity() + self.current_block.capacity()
self.intermediary_buffer.capacity()
+ self.current_block.capacity()
+ self.doc_pos.capacity() * std::mem::size_of::<u32>()
}
/// Checks if the current block is full, and if so, compresses and flushes it.
fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size {
// this does not count the VInt storing the index lenght itself, but it is negligible in
// front of everything else.
let index_len = self.doc_pos.len() * std::mem::size_of::<usize>();
if self.current_block.len() + index_len > self.block_size {
self.send_current_block_to_compressor()?;
}
Ok(())
@@ -70,8 +77,19 @@ impl StoreWriter {
if self.current_block.is_empty() {
return Ok(());
}
let size_of_u32 = std::mem::size_of::<u32>();
self.current_block
.reserve((self.doc_pos.len() + 1) * size_of_u32);
for pos in self.doc_pos.iter() {
pos.serialize(&mut self.current_block)?;
}
(self.doc_pos.len() as u32).serialize(&mut self.current_block)?;
self.block_compressor
.compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?;
self.doc_pos.clear();
self.current_block.clear();
self.num_docs_in_current_block = 0;
Ok(())
@@ -87,8 +105,7 @@ impl StoreWriter {
// calling store bytes would be preferable for code reuse, but then we can't use
// intermediary_buffer due to the borrow checker
// a new buffer costs ~1% indexing performance
let doc_num_bytes = self.intermediary_buffer.len();
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.doc_pos.push(self.current_block.len() as u32);
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.num_docs_in_current_block += 1;
@@ -101,8 +118,7 @@ impl StoreWriter {
/// The document id is implicitly the current number
/// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.doc_pos.push(self.current_block.len() as u32);
self.current_block.extend_from_slice(serialized_document);
self.num_docs_in_current_block += 1;
self.check_flush_block()?;