Files
tantivy/src/store/writer.rs
2021-08-19 23:26:34 +01:00

163 lines
5.8 KiB
Rust

use super::index::SkipIndexBuilder;
use super::StoreReader;
use super::{compressors::Compressor, footer::DocStoreFooter};
use crate::directory::TerminatingWrite;
use crate::directory::WritePtr;
use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId;
use common::CountingWriter;
use common::{BinarySerializable, VInt};
use std::io::{self, Write};
const BLOCK_SIZE: usize = 16_384;
/// Write tantivy's [`Store`](./index.html)
///
/// Contrary to the other components of `tantivy`,
/// the store is written to disc as document as being added,
/// as opposed to when the segment is getting finalized.
///
/// The skip list index on the other hand, is built in memory.
///
pub struct StoreWriter {
compressor: Compressor,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
writer: CountingWriter<WritePtr>,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
}
impl StoreWriter {
/// Create a store writer.
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter {
StoreWriter {
compressor,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
writer: CountingWriter::wrap(writer),
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
}
}
pub(crate) fn compressor(&self) -> Compressor {
self.compressor
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.intermediary_buffer.capacity() + self.current_block.capacity()
}
/// Store bytes of a serialized document.
///
/// The document id is implicitely the current number
/// of documents.
///
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
self.write_and_compress_block()?;
}
Ok(())
}
/// Store a new document.
///
/// The document id is implicitely the current number
/// of documents.
///
pub fn store(&mut self, stored_document: &Document) -> io::Result<()> {
self.intermediary_buffer.clear();
stored_document.serialize(&mut self.intermediary_buffer)?;
// calling store bytes would be preferable for code reuse, but then we can't use
// intermediary_buffer due to the borrow checker
// a new buffer costs ~1% indexing performance
let doc_num_bytes = self.intermediary_buffer.len();
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
self.write_and_compress_block()?;
}
Ok(())
}
/// Stacks a store reader on top of the documents written so far.
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as usize;
// just bulk write all of the block of the given reader.
self.writer
.write_all(store_reader.block_data()?.as_slice())?;
// concatenate the index of the `store_reader`, after translating
// its start doc id and its start file offset.
for mut checkpoint in store_reader.block_checkpoints() {
checkpoint.doc_range.start += doc_shift;
checkpoint.doc_range.end += doc_shift;
checkpoint.byte_range.start += start_shift;
checkpoint.byte_range.end += start_shift;
self.register_checkpoint(checkpoint);
}
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint.clone());
self.first_doc_in_block = checkpoint.doc_range.end;
self.doc = checkpoint.doc_range.end;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
self.compressor
.compress(&self.current_block[..], &mut self.intermediary_buffer)?;
let start_offset = self.writer.written_bytes() as usize;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes() as usize;
let end_doc = self.doc;
self.register_checkpoint(Checkpoint {
doc_range: self.first_doc_in_block..end_doc,
byte_range: start_offset..end_offset,
});
self.current_block.clear();
Ok(())
}
/// Finalized the store writer.
///
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
let header_offset: u64 = self.writer.written_bytes() as u64;
let footer = DocStoreFooter::new(header_offset, self.compressor);
self.offset_index_writer.write(&mut self.writer)?;
footer.serialize(&mut self.writer)?;
self.writer.terminate()
}
}