diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index b9b8b5635..55e0e0b88 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -335,12 +335,16 @@ impl IndexMerger { fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> { for reader in &self.readers { let store_reader = reader.get_store_reader(); - for doc_id in 0..reader.max_doc() { - if !reader.is_deleted(doc_id) { - let doc = store_reader.get(doc_id)?; - let field_values: Vec<&FieldValue> = doc.field_values().iter().collect(); - store_writer.store(&field_values)?; + if reader.num_deleted_docs() > 0 { + for doc_id in 0..reader.max_doc() { + if !reader.is_deleted(doc_id) { + let doc = store_reader.get(doc_id)?; + let field_values: Vec<&FieldValue> = doc.field_values().iter().collect(); + store_writer.store(&field_values)?; + } } + } else { + store_writer.stack(store_reader)?; } } Ok(()) diff --git a/src/store/reader.rs b/src/store/reader.rs index 14c9b5d25..c96324149 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -34,22 +34,33 @@ impl StoreReader { } } - fn block_offset(&self, doc_id: DocId) -> (DocId, u64) { + pub fn block_index(&self) -> SkipList { SkipList::from(self.offset_index_source.as_slice()) + } + + fn block_offset(&self, doc_id: DocId) -> (DocId, u64) { + self.block_index() .seek(doc_id + 1) .unwrap_or((0u32, 0u64)) } + pub fn block_data(&self) -> &[u8] { + self.data.as_slice() + } + + pub fn compressed_block(&self, addr: usize) -> &[u8] { + let total_buffer = self.data.as_slice(); + let mut buffer = &total_buffer[addr..]; + let block_len = u32::deserialize(&mut buffer).expect("") as usize; + &buffer[..block_len] + } + fn read_block(&self, block_offset: usize) -> io::Result<()> { if block_offset != *self.current_block_offset.borrow() { let mut current_block_mut = self.current_block.borrow_mut(); current_block_mut.clear(); - let total_buffer = self.data.as_slice(); - let mut cursor = &total_buffer[block_offset..]; - let block_length = u32::deserialize(&mut cursor).unwrap(); - let block_array: &[u8] = &total_buffer - [(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)]; - let mut lz4_decoder = lz4::Decoder::new(block_array)?; + let compressed_block = self.compressed_block(block_offset); + let mut lz4_decoder = lz4::Decoder::new(compressed_block)?; *self.current_block_offset.borrow_mut() = usize::max_value(); lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ())?; *self.current_block_offset.borrow_mut() = block_offset; diff --git a/src/store/writer.rs b/src/store/writer.rs index 6dacfd6a1..7d0482272 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -3,6 +3,7 @@ use DocId; use schema::FieldValue; use common::BinarySerializable; use std::io::{self, Write}; +use super::StoreReader; use lz4; use datastruct::SkipListBuilder; use common::CountingWriter; @@ -60,6 +61,35 @@ impl StoreWriter { Ok(()) } + /// Stacks a store reader on top of the documents written so far. + /// This method is an optimization compared to iterating over the documents + /// in the store and adding them one by one, as the store's data will + /// not be decompressed and then recompressed. + pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { + if !self.current_block.is_empty() { + self.write_and_compress_block()?; + self.offset_index_writer.insert( + self.doc, + &(self.writer.written_bytes() as u64), + )?; + } + let doc_offset = self.doc; + let start_offset = self.writer.written_bytes() as u64; + + // just bulk write all of the block of the given reader. + self.writer.write_all(store_reader.block_data())?; + + // concatenate the index of the `store_reader`, after translating + // its start doc id and its start file offset. + for (next_doc_id, block_addr) in store_reader.block_index() { + self.doc = doc_offset + next_doc_id; + self.offset_index_writer.insert( + self.doc, + &(start_offset + block_addr))?; + } + Ok(()) + } + fn write_and_compress_block(&mut self) -> io::Result<()> { self.intermediary_buffer.clear(); { diff --git a/src/termdict/streamdict/delta_encoder.rs b/src/termdict/streamdict/delta_encoder.rs index b3d73e3b8..cf47105f5 100644 --- a/src/termdict/streamdict/delta_encoder.rs +++ b/src/termdict/streamdict/delta_encoder.rs @@ -49,19 +49,26 @@ impl TermDeltaDecoder { } } + + // code + // first bit represents whether the prefix / suffix len can be encoded + // on the same byte. (the next one) + // + #[inline(always)] pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] { - let (prefix_len, suffix_len): (usize, usize) = if (code & 1u8) == 1u8 { - let b = cursor[0]; - cursor = &cursor[1..]; - let prefix_len = (b & 15u8) as usize; - let suffix_len = (b >> 4u8) as usize; - (prefix_len, suffix_len) - } else { - let prefix_len = u32::deserialize(&mut cursor).unwrap(); - let suffix_len = u32::deserialize(&mut cursor).unwrap(); - (prefix_len as usize, suffix_len as usize) - }; + let (prefix_len, suffix_len): (usize, usize) = + if (code & 1u8) == 1u8 { + let b = cursor[0]; + cursor = &cursor[1..]; + let prefix_len = (b & 15u8) as usize; + let suffix_len = (b >> 4u8) as usize; + (prefix_len, suffix_len) + } else { + let prefix_len = u32::deserialize(&mut cursor).unwrap(); + let suffix_len = u32::deserialize(&mut cursor).unwrap(); + (prefix_len as usize, suffix_len as usize) + }; unsafe { self.term.set_len(prefix_len) }; self.term.extend_from_slice(&(*cursor)[..suffix_len]); &cursor[suffix_len..] @@ -75,8 +82,8 @@ impl TermDeltaDecoder { #[derive(Default)] pub struct DeltaTermInfo { pub doc_freq: u32, - pub delta_postings_offset: u32, - pub delta_positions_offset: u32, + pub delta_postings_offset: u64, + pub delta_positions_offset: u64, pub positions_inner_offset: u8, } @@ -101,7 +108,7 @@ impl TermInfoDeltaEncoder { let mut delta_term_info = DeltaTermInfo { doc_freq: term_info.doc_freq, delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset, - delta_positions_offset: 0, + delta_positions_offset: 0u64, positions_inner_offset: 0, }; if self.has_positions { @@ -152,7 +159,7 @@ impl TermInfoDeltaDecoder { let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) }; let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq); v >>= (num_bytes_docfreq as u64) * 8u64; - let delta_postings_offset: u32 = (v as u32) & make_mask(num_bytes_postings_offset); + let delta_postings_offset: u64 = v & make_mask(num_bytes_postings_offset); cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..]; self.term_info.doc_freq = doc_freq; self.term_info.postings_offset += delta_postings_offset; diff --git a/src/tokenizer/tokenizer.rs b/src/tokenizer/tokenizer.rs index 50fc42a39..8526e812b 100644 --- a/src/tokenizer/tokenizer.rs +++ b/src/tokenizer/tokenizer.rs @@ -195,6 +195,10 @@ pub trait TokenStream { /// Returns a mutable reference to the current token. fn token_mut(&mut self) -> &mut Token; + /// Helper to iterate over tokens. It + /// simply combines a call to `.advance()` + /// and `.token()`. + /// /// ``` /// # extern crate tantivy; /// # use tantivy::tokenizer::*;