mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-08 01:52:54 +00:00
Issue 227 Faster merge when there are no deletes
This commit is contained in:
@@ -335,12 +335,16 @@ impl IndexMerger {
|
||||
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> {
|
||||
for reader in &self.readers {
|
||||
let store_reader = reader.get_store_reader();
|
||||
for doc_id in 0..reader.max_doc() {
|
||||
if !reader.is_deleted(doc_id) {
|
||||
let doc = store_reader.get(doc_id)?;
|
||||
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
|
||||
store_writer.store(&field_values)?;
|
||||
if reader.num_deleted_docs() > 0 {
|
||||
for doc_id in 0..reader.max_doc() {
|
||||
if !reader.is_deleted(doc_id) {
|
||||
let doc = store_reader.get(doc_id)?;
|
||||
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
|
||||
store_writer.store(&field_values)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
store_writer.stack(store_reader)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -34,22 +34,33 @@ impl StoreReader {
|
||||
}
|
||||
}
|
||||
|
||||
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
||||
pub fn block_index(&self) -> SkipList<u64> {
|
||||
SkipList::from(self.offset_index_source.as_slice())
|
||||
}
|
||||
|
||||
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
||||
self.block_index()
|
||||
.seek(doc_id + 1)
|
||||
.unwrap_or((0u32, 0u64))
|
||||
}
|
||||
|
||||
pub fn block_data(&self) -> &[u8] {
|
||||
self.data.as_slice()
|
||||
}
|
||||
|
||||
pub fn compressed_block(&self, addr: usize) -> &[u8] {
|
||||
let total_buffer = self.data.as_slice();
|
||||
let mut buffer = &total_buffer[addr..];
|
||||
let block_len = u32::deserialize(&mut buffer).expect("") as usize;
|
||||
&buffer[..block_len]
|
||||
}
|
||||
|
||||
fn read_block(&self, block_offset: usize) -> io::Result<()> {
|
||||
if block_offset != *self.current_block_offset.borrow() {
|
||||
let mut current_block_mut = self.current_block.borrow_mut();
|
||||
current_block_mut.clear();
|
||||
let total_buffer = self.data.as_slice();
|
||||
let mut cursor = &total_buffer[block_offset..];
|
||||
let block_length = u32::deserialize(&mut cursor).unwrap();
|
||||
let block_array: &[u8] = &total_buffer
|
||||
[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)];
|
||||
let mut lz4_decoder = lz4::Decoder::new(block_array)?;
|
||||
let compressed_block = self.compressed_block(block_offset);
|
||||
let mut lz4_decoder = lz4::Decoder::new(compressed_block)?;
|
||||
*self.current_block_offset.borrow_mut() = usize::max_value();
|
||||
lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ())?;
|
||||
*self.current_block_offset.borrow_mut() = block_offset;
|
||||
|
||||
@@ -3,6 +3,7 @@ use DocId;
|
||||
use schema::FieldValue;
|
||||
use common::BinarySerializable;
|
||||
use std::io::{self, Write};
|
||||
use super::StoreReader;
|
||||
use lz4;
|
||||
use datastruct::SkipListBuilder;
|
||||
use common::CountingWriter;
|
||||
@@ -60,6 +61,35 @@ impl StoreWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stacks a store reader on top of the documents written so far.
|
||||
/// This method is an optimization compared to iterating over the documents
|
||||
/// in the store and adding them one by one, as the store's data will
|
||||
/// not be decompressed and then recompressed.
|
||||
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
|
||||
if !self.current_block.is_empty() {
|
||||
self.write_and_compress_block()?;
|
||||
self.offset_index_writer.insert(
|
||||
self.doc,
|
||||
&(self.writer.written_bytes() as u64),
|
||||
)?;
|
||||
}
|
||||
let doc_offset = self.doc;
|
||||
let start_offset = self.writer.written_bytes() as u64;
|
||||
|
||||
// just bulk write all of the block of the given reader.
|
||||
self.writer.write_all(store_reader.block_data())?;
|
||||
|
||||
// concatenate the index of the `store_reader`, after translating
|
||||
// its start doc id and its start file offset.
|
||||
for (next_doc_id, block_addr) in store_reader.block_index() {
|
||||
self.doc = doc_offset + next_doc_id;
|
||||
self.offset_index_writer.insert(
|
||||
self.doc,
|
||||
&(start_offset + block_addr))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
||||
self.intermediary_buffer.clear();
|
||||
{
|
||||
|
||||
@@ -49,19 +49,26 @@ impl TermDeltaDecoder {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// code
|
||||
// first bit represents whether the prefix / suffix len can be encoded
|
||||
// on the same byte. (the next one)
|
||||
//
|
||||
|
||||
#[inline(always)]
|
||||
pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] {
|
||||
let (prefix_len, suffix_len): (usize, usize) = if (code & 1u8) == 1u8 {
|
||||
let b = cursor[0];
|
||||
cursor = &cursor[1..];
|
||||
let prefix_len = (b & 15u8) as usize;
|
||||
let suffix_len = (b >> 4u8) as usize;
|
||||
(prefix_len, suffix_len)
|
||||
} else {
|
||||
let prefix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
let suffix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
(prefix_len as usize, suffix_len as usize)
|
||||
};
|
||||
let (prefix_len, suffix_len): (usize, usize) =
|
||||
if (code & 1u8) == 1u8 {
|
||||
let b = cursor[0];
|
||||
cursor = &cursor[1..];
|
||||
let prefix_len = (b & 15u8) as usize;
|
||||
let suffix_len = (b >> 4u8) as usize;
|
||||
(prefix_len, suffix_len)
|
||||
} else {
|
||||
let prefix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
let suffix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
(prefix_len as usize, suffix_len as usize)
|
||||
};
|
||||
unsafe { self.term.set_len(prefix_len) };
|
||||
self.term.extend_from_slice(&(*cursor)[..suffix_len]);
|
||||
&cursor[suffix_len..]
|
||||
@@ -75,8 +82,8 @@ impl TermDeltaDecoder {
|
||||
#[derive(Default)]
|
||||
pub struct DeltaTermInfo {
|
||||
pub doc_freq: u32,
|
||||
pub delta_postings_offset: u32,
|
||||
pub delta_positions_offset: u32,
|
||||
pub delta_postings_offset: u64,
|
||||
pub delta_positions_offset: u64,
|
||||
pub positions_inner_offset: u8,
|
||||
}
|
||||
|
||||
@@ -101,7 +108,7 @@ impl TermInfoDeltaEncoder {
|
||||
let mut delta_term_info = DeltaTermInfo {
|
||||
doc_freq: term_info.doc_freq,
|
||||
delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset,
|
||||
delta_positions_offset: 0,
|
||||
delta_positions_offset: 0u64,
|
||||
positions_inner_offset: 0,
|
||||
};
|
||||
if self.has_positions {
|
||||
@@ -152,7 +159,7 @@ impl TermInfoDeltaDecoder {
|
||||
let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) };
|
||||
let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq);
|
||||
v >>= (num_bytes_docfreq as u64) * 8u64;
|
||||
let delta_postings_offset: u32 = (v as u32) & make_mask(num_bytes_postings_offset);
|
||||
let delta_postings_offset: u64 = v & make_mask(num_bytes_postings_offset);
|
||||
cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..];
|
||||
self.term_info.doc_freq = doc_freq;
|
||||
self.term_info.postings_offset += delta_postings_offset;
|
||||
|
||||
@@ -195,6 +195,10 @@ pub trait TokenStream {
|
||||
/// Returns a mutable reference to the current token.
|
||||
fn token_mut(&mut self) -> &mut Token;
|
||||
|
||||
/// Helper to iterate over tokens. It
|
||||
/// simply combines a call to `.advance()`
|
||||
/// and `.token()`.
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate tantivy;
|
||||
/// # use tantivy::tokenizer::*;
|
||||
|
||||
Reference in New Issue
Block a user