mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-02 16:40:43 +00:00
Refactoring of the skip index.
Merge pull request #927 from tantivy-search/compact-store-index The skip index now identifies both the start and the end offset of blocks. Checkpoints are compressed in blocks, reaching better compression.
This commit is contained in:
@@ -66,10 +66,6 @@ pub(crate) fn compute_num_bits(n: u64) -> u8 {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_power_of_2(n: usize) -> bool {
|
||||
(n > 0) && (n & (n - 1) == 0)
|
||||
}
|
||||
|
||||
/// Has length trait
|
||||
pub trait HasLen {
|
||||
/// Return length
|
||||
|
||||
@@ -533,6 +533,7 @@ mod tests {
|
||||
#![proptest_config(ProptestConfig::with_cases(500))]
|
||||
#[ignore]
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) {
|
||||
test_block_wand_aux(&posting_lists[..], &fieldnorms[..]);
|
||||
}
|
||||
|
||||
165
src/store/index/block.rs
Normal file
165
src/store/index/block.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
use crate::common::VInt;
|
||||
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
|
||||
use crate::DocId;
|
||||
use std::io;
|
||||
|
||||
/// Represents a block of checkpoints.
|
||||
///
|
||||
/// The DocStore index checkpoints are organized into block
|
||||
/// for code-readability and compression purpose.
|
||||
///
|
||||
/// A block can be of any size.
|
||||
pub struct CheckpointBlock {
|
||||
pub checkpoints: Vec<Checkpoint>,
|
||||
}
|
||||
|
||||
impl Default for CheckpointBlock {
|
||||
fn default() -> CheckpointBlock {
|
||||
CheckpointBlock {
|
||||
checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckpointBlock {
|
||||
/// If non-empty returns [start_doc, end_doc)
|
||||
/// for the overall block.
|
||||
pub fn doc_interval(&self) -> Option<(DocId, DocId)> {
|
||||
let start_doc_opt = self
|
||||
.checkpoints
|
||||
.first()
|
||||
.cloned()
|
||||
.map(|checkpoint| checkpoint.start_doc);
|
||||
let end_doc_opt = self
|
||||
.checkpoints
|
||||
.last()
|
||||
.cloned()
|
||||
.map(|checkpoint| checkpoint.end_doc);
|
||||
match (start_doc_opt, end_doc_opt) {
|
||||
(Some(start_doc), Some(end_doc)) => Some((start_doc, end_doc)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adding another checkpoint in the block.
|
||||
pub fn push(&mut self, checkpoint: Checkpoint) {
|
||||
self.checkpoints.push(checkpoint);
|
||||
}
|
||||
|
||||
/// Returns the number of checkpoints in the block.
|
||||
pub fn len(&self) -> usize {
|
||||
self.checkpoints.len()
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: usize) -> Checkpoint {
|
||||
self.checkpoints[idx]
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.checkpoints.clear();
|
||||
}
|
||||
|
||||
pub fn serialize(&mut self, buffer: &mut Vec<u8>) {
|
||||
VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer);
|
||||
if self.checkpoints.is_empty() {
|
||||
return;
|
||||
}
|
||||
VInt(self.checkpoints[0].start_doc as u64).serialize_into_vec(buffer);
|
||||
VInt(self.checkpoints[0].start_offset as u64).serialize_into_vec(buffer);
|
||||
for checkpoint in &self.checkpoints {
|
||||
let delta_doc = checkpoint.end_doc - checkpoint.start_doc;
|
||||
VInt(delta_doc as u64).serialize_into_vec(buffer);
|
||||
VInt(checkpoint.end_offset - checkpoint.start_offset).serialize_into_vec(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> {
|
||||
if data.is_empty() {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ""));
|
||||
}
|
||||
self.checkpoints.clear();
|
||||
let len = VInt::deserialize_u64(data)? as usize;
|
||||
if len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let mut doc = VInt::deserialize_u64(data)? as DocId;
|
||||
let mut start_offset = VInt::deserialize_u64(data)?;
|
||||
for _ in 0..len {
|
||||
let num_docs = VInt::deserialize_u64(data)? as DocId;
|
||||
let block_num_bytes = VInt::deserialize_u64(data)?;
|
||||
self.checkpoints.push(Checkpoint {
|
||||
start_doc: doc,
|
||||
end_doc: doc + num_docs,
|
||||
start_offset,
|
||||
end_offset: start_offset + block_num_bytes,
|
||||
});
|
||||
doc += num_docs;
|
||||
start_offset += block_num_bytes;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::store::index::block::CheckpointBlock;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
use std::io;
|
||||
|
||||
fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> {
|
||||
let mut block = CheckpointBlock::default();
|
||||
for &checkpoint in checkpoints {
|
||||
block.push(checkpoint);
|
||||
}
|
||||
let mut buffer = Vec::new();
|
||||
block.serialize(&mut buffer);
|
||||
let mut block_deser = CheckpointBlock::default();
|
||||
let checkpoint = Checkpoint {
|
||||
start_doc: 0,
|
||||
end_doc: 1,
|
||||
start_offset: 2,
|
||||
end_offset: 3,
|
||||
};
|
||||
block_deser.push(checkpoint); // < check that value is erased before deser
|
||||
let mut data = &buffer[..];
|
||||
block_deser.deserialize(&mut data)?;
|
||||
assert!(data.is_empty());
|
||||
assert_eq!(checkpoints, &block_deser.checkpoints[..]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_serialize_empty() -> io::Result<()> {
|
||||
test_aux_ser_deser(&[])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_serialize_simple() -> io::Result<()> {
|
||||
let checkpoints = vec![Checkpoint {
|
||||
start_doc: 10,
|
||||
end_doc: 12,
|
||||
start_offset: 100,
|
||||
end_offset: 120,
|
||||
}];
|
||||
test_aux_ser_deser(&checkpoints)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_serialize() -> io::Result<()> {
|
||||
let offsets: Vec<u64> = (0..11).map(|i| i * i * i).collect();
|
||||
let mut checkpoints = vec![];
|
||||
let mut start_doc = 0;
|
||||
for i in 0..10 {
|
||||
let end_doc = (i * i) as DocId;
|
||||
checkpoints.push(Checkpoint {
|
||||
start_doc,
|
||||
end_doc,
|
||||
start_offset: offsets[i],
|
||||
end_offset: offsets[i + 1],
|
||||
});
|
||||
start_doc = end_doc;
|
||||
}
|
||||
test_aux_ser_deser(&checkpoints)
|
||||
}
|
||||
}
|
||||
230
src/store/index/mod.rs
Normal file
230
src/store/index/mod.rs
Normal file
@@ -0,0 +1,230 @@
|
||||
const CHECKPOINT_PERIOD: usize = 8;
|
||||
|
||||
use std::fmt;
|
||||
mod block;
|
||||
mod skip_index;
|
||||
mod skip_index_builder;
|
||||
|
||||
use crate::DocId;
|
||||
|
||||
pub use self::skip_index::SkipIndex;
|
||||
pub use self::skip_index_builder::SkipIndexBuilder;
|
||||
|
||||
/// A checkpoint contains meta-information about
|
||||
/// a block. Either a block of documents, or another block
|
||||
/// of checkpoints.
|
||||
///
|
||||
/// All of the intervals here defined are semi-open.
|
||||
/// The checkpoint describes that the block within the bytes
|
||||
/// `[start_offset..end_offset)` spans over the docs
|
||||
/// `[start_doc..end_doc)`.
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
pub struct Checkpoint {
|
||||
pub start_doc: DocId,
|
||||
pub end_doc: DocId,
|
||||
pub start_offset: u64,
|
||||
pub end_offset: u64,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Checkpoint {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"(doc=[{}..{}), bytes=[{}..{}))",
|
||||
self.start_doc, self.end_doc, self.start_offset, self.end_offset
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::io;
|
||||
|
||||
use proptest::strategy::{BoxedStrategy, Strategy};
|
||||
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
|
||||
use super::{SkipIndex, SkipIndexBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_skip_index_empty() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
||||
skip_index_builder.write(&mut output)?;
|
||||
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
|
||||
let mut skip_cursor = skip_index.checkpoints();
|
||||
assert!(skip_cursor.next().is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skip_index_single_el() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
||||
let checkpoint = Checkpoint {
|
||||
start_doc: 0,
|
||||
end_doc: 2,
|
||||
start_offset: 0,
|
||||
end_offset: 3,
|
||||
};
|
||||
skip_index_builder.insert(checkpoint);
|
||||
skip_index_builder.write(&mut output)?;
|
||||
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
|
||||
let mut skip_cursor = skip_index.checkpoints();
|
||||
assert_eq!(skip_cursor.next(), Some(checkpoint));
|
||||
assert_eq!(skip_cursor.next(), None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skip_index() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let checkpoints = vec![
|
||||
Checkpoint {
|
||||
start_doc: 0,
|
||||
end_doc: 3,
|
||||
start_offset: 4,
|
||||
end_offset: 9,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 3,
|
||||
end_doc: 4,
|
||||
start_offset: 9,
|
||||
end_offset: 25,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 4,
|
||||
end_doc: 6,
|
||||
start_offset: 25,
|
||||
end_offset: 49,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 6,
|
||||
end_doc: 8,
|
||||
start_offset: 49,
|
||||
end_offset: 81,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 8,
|
||||
end_doc: 10,
|
||||
start_offset: 81,
|
||||
end_offset: 100,
|
||||
},
|
||||
];
|
||||
|
||||
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
||||
for &checkpoint in &checkpoints {
|
||||
skip_index_builder.insert(checkpoint);
|
||||
}
|
||||
skip_index_builder.write(&mut output)?;
|
||||
|
||||
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
|
||||
assert_eq!(
|
||||
&skip_index.checkpoints().collect::<Vec<_>>()[..],
|
||||
&checkpoints[..]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn offset_test(doc: DocId) -> u64 {
|
||||
(doc as u64) * (doc as u64)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skip_index_long() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let checkpoints: Vec<Checkpoint> = (0..1000)
|
||||
.map(|i| Checkpoint {
|
||||
start_doc: i,
|
||||
end_doc: i + 1,
|
||||
start_offset: offset_test(i),
|
||||
end_offset: offset_test(i + 1),
|
||||
})
|
||||
.collect();
|
||||
let mut skip_index_builder = SkipIndexBuilder::new();
|
||||
for checkpoint in &checkpoints {
|
||||
skip_index_builder.insert(*checkpoint);
|
||||
}
|
||||
skip_index_builder.write(&mut output)?;
|
||||
assert_eq!(output.len(), 4035);
|
||||
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
|
||||
.checkpoints()
|
||||
.collect();
|
||||
assert_eq!(&resulting_checkpoints, &checkpoints);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
|
||||
let mut prev = 0u64;
|
||||
for val in vals.iter_mut() {
|
||||
let new_val = *val + prev;
|
||||
prev = new_val;
|
||||
*val = new_val;
|
||||
}
|
||||
vals
|
||||
}
|
||||
|
||||
// Generates a sequence of n valid checkpoints, with n < max_len.
|
||||
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
|
||||
(1..max_len)
|
||||
.prop_flat_map(move |len: usize| {
|
||||
(
|
||||
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
|
||||
proptest::collection::vec(1u64..26u64, len as usize).prop_map(integrate_delta),
|
||||
)
|
||||
.prop_map(|(docs, offsets)| {
|
||||
(0..docs.len() - 1)
|
||||
.map(move |i| Checkpoint {
|
||||
start_doc: docs[i] as DocId,
|
||||
end_doc: docs[i + 1] as DocId,
|
||||
start_offset: offsets[i],
|
||||
end_offset: offsets[i + 1],
|
||||
})
|
||||
.collect::<Vec<Checkpoint>>()
|
||||
})
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn seek_manual<I: Iterator<Item = Checkpoint>>(
|
||||
checkpoints: I,
|
||||
target: DocId,
|
||||
) -> Option<Checkpoint> {
|
||||
checkpoints
|
||||
.into_iter()
|
||||
.filter(|checkpoint| checkpoint.end_doc > target)
|
||||
.next()
|
||||
}
|
||||
|
||||
fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) {
|
||||
if let Some(last_checkpoint) = checkpoints.last() {
|
||||
for doc in 0u32..last_checkpoint.end_doc {
|
||||
let expected = seek_manual(skip_index.checkpoints(), doc);
|
||||
assert_eq!(expected, skip_index.seek(doc), "Doc {}", doc);
|
||||
}
|
||||
assert!(skip_index.seek(last_checkpoint.end_doc).is_none());
|
||||
}
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(20))]
|
||||
#[test]
|
||||
fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) {
|
||||
let mut skip_index_builder = SkipIndexBuilder::new();
|
||||
for checkpoint in checkpoints.iter().cloned() {
|
||||
skip_index_builder.insert(checkpoint);
|
||||
}
|
||||
let mut buffer = Vec::new();
|
||||
skip_index_builder.write(&mut buffer).unwrap();
|
||||
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
|
||||
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
|
||||
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
|
||||
test_skip_index_aux(skip_index, &checkpoints[..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
112
src/store/index/skip_index.rs
Normal file
112
src/store/index/skip_index.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::store::index::block::CheckpointBlock;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
|
||||
pub struct LayerCursor<'a> {
|
||||
remaining: &'a [u8],
|
||||
block: CheckpointBlock,
|
||||
cursor: usize,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for LayerCursor<'a> {
|
||||
type Item = Checkpoint;
|
||||
|
||||
fn next(&mut self) -> Option<Checkpoint> {
|
||||
if self.cursor == self.block.len() {
|
||||
if self.remaining.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
|
||||
if let Err(_) = block_mut.deserialize(remaining_mut) {
|
||||
return None;
|
||||
}
|
||||
self.cursor = 0;
|
||||
}
|
||||
let res = Some(self.block.get(self.cursor));
|
||||
self.cursor += 1;
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
struct Layer {
|
||||
data: OwnedBytes,
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
fn cursor<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
self.cursor_at_offset(0u64)
|
||||
}
|
||||
|
||||
fn cursor_at_offset<'a>(&'a self, start_offset: u64) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
let data = &self.data.as_slice();
|
||||
LayerCursor {
|
||||
remaining: &data[start_offset as usize..],
|
||||
block: CheckpointBlock::default(),
|
||||
cursor: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
|
||||
self.cursor_at_offset(offset)
|
||||
.filter(|checkpoint| checkpoint.end_doc > target)
|
||||
.next()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipIndex {
|
||||
layers: Vec<Layer>,
|
||||
}
|
||||
|
||||
impl SkipIndex {
|
||||
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
self.layers
|
||||
.last()
|
||||
.into_iter()
|
||||
.flat_map(|layer| layer.cursor())
|
||||
}
|
||||
|
||||
pub fn seek(&self, target: DocId) -> Option<Checkpoint> {
|
||||
let first_layer_len = self
|
||||
.layers
|
||||
.first()
|
||||
.map(|layer| layer.data.len() as u64)
|
||||
.unwrap_or(0u64);
|
||||
let mut cur_checkpoint = Checkpoint {
|
||||
start_doc: 0u32,
|
||||
end_doc: 1u32,
|
||||
start_offset: 0u64,
|
||||
end_offset: first_layer_len,
|
||||
};
|
||||
for layer in &self.layers {
|
||||
if let Some(checkpoint) =
|
||||
layer.seek_start_at_offset(target, cur_checkpoint.start_offset)
|
||||
{
|
||||
cur_checkpoint = checkpoint;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
Some(cur_checkpoint)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OwnedBytes> for SkipIndex {
|
||||
fn from(mut data: OwnedBytes) -> SkipIndex {
|
||||
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|el| el.0)
|
||||
.collect();
|
||||
let mut start_offset = 0;
|
||||
let mut layers = Vec::new();
|
||||
for end_offset in offsets {
|
||||
layers.push(Layer {
|
||||
data: data.slice(start_offset as usize, end_offset as usize),
|
||||
});
|
||||
start_offset = end_offset;
|
||||
}
|
||||
SkipIndex { layers }
|
||||
}
|
||||
}
|
||||
115
src/store/index/skip_index_builder.rs
Normal file
115
src/store/index/skip_index_builder.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::store::index::block::CheckpointBlock;
|
||||
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
// Each skip contains iterator over pairs (last doc in block, offset to start of block).
|
||||
|
||||
struct LayerBuilder {
|
||||
buffer: Vec<u8>,
|
||||
pub block: CheckpointBlock,
|
||||
}
|
||||
|
||||
impl LayerBuilder {
|
||||
fn finish(self) -> Vec<u8> {
|
||||
self.buffer
|
||||
}
|
||||
|
||||
fn new() -> LayerBuilder {
|
||||
LayerBuilder {
|
||||
buffer: Vec::new(),
|
||||
block: CheckpointBlock::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the block, and return a checkpoint representing
|
||||
/// the entire block.
|
||||
///
|
||||
/// If the block was empty to begin with, simply return None.
|
||||
fn flush_block(&mut self) -> Option<Checkpoint> {
|
||||
self.block.doc_interval().map(|(start_doc, end_doc)| {
|
||||
let start_offset = self.buffer.len() as u64;
|
||||
self.block.serialize(&mut self.buffer);
|
||||
let end_offset = self.buffer.len() as u64;
|
||||
self.block.clear();
|
||||
Checkpoint {
|
||||
start_doc,
|
||||
end_doc,
|
||||
start_offset,
|
||||
end_offset,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn push(&mut self, checkpoint: Checkpoint) {
|
||||
self.block.push(checkpoint);
|
||||
}
|
||||
|
||||
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
|
||||
self.push(checkpoint);
|
||||
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
|
||||
if emit_skip_info {
|
||||
self.flush_block()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipIndexBuilder {
|
||||
layers: Vec<LayerBuilder>,
|
||||
}
|
||||
|
||||
impl SkipIndexBuilder {
|
||||
pub fn new() -> SkipIndexBuilder {
|
||||
SkipIndexBuilder { layers: Vec::new() }
|
||||
}
|
||||
|
||||
fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {
|
||||
if layer_id == self.layers.len() {
|
||||
let layer_builder = LayerBuilder::new();
|
||||
self.layers.push(layer_builder);
|
||||
}
|
||||
&mut self.layers[layer_id]
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, checkpoint: Checkpoint) {
|
||||
let mut skip_pointer = Some(checkpoint);
|
||||
for layer_id in 0.. {
|
||||
if let Some(checkpoint) = skip_pointer {
|
||||
skip_pointer = self.get_layer(layer_id).insert(checkpoint);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
|
||||
let mut last_pointer = None;
|
||||
for skip_layer in self.layers.iter_mut() {
|
||||
if let Some(checkpoint) = last_pointer {
|
||||
skip_layer.push(checkpoint);
|
||||
}
|
||||
last_pointer = skip_layer.flush_block();
|
||||
}
|
||||
let layer_buffers: Vec<Vec<u8>> = self
|
||||
.layers
|
||||
.into_iter()
|
||||
.rev()
|
||||
.map(|layer| layer.finish())
|
||||
.collect();
|
||||
|
||||
let mut layer_offset = 0;
|
||||
let mut layer_sizes = Vec::new();
|
||||
for layer_buffer in &layer_buffers {
|
||||
layer_offset += layer_buffer.len() as u64;
|
||||
layer_sizes.push(VInt(layer_offset));
|
||||
}
|
||||
layer_sizes.serialize(output)?;
|
||||
for layer_buffer in layer_buffers {
|
||||
output.write_all(&layer_buffer[..])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -33,8 +33,8 @@ and should rely on either
|
||||
|
||||
!*/
|
||||
|
||||
mod index;
|
||||
mod reader;
|
||||
mod skiplist;
|
||||
mod writer;
|
||||
pub use self::reader::StoreReader;
|
||||
pub use self::writer::StoreWriter;
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use super::decompress;
|
||||
use super::skiplist::SkipList;
|
||||
use super::index::SkipIndex;
|
||||
use crate::common::VInt;
|
||||
use crate::common::{BinarySerializable, HasLen};
|
||||
use crate::directory::{FileSlice, OwnedBytes};
|
||||
use crate::schema::Document;
|
||||
use crate::space_usage::StoreSpaceUsage;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
use lru::LruCache;
|
||||
use std::io;
|
||||
@@ -16,69 +17,73 @@ const LRU_CACHE_CAPACITY: usize = 100;
|
||||
|
||||
type Block = Arc<Vec<u8>>;
|
||||
|
||||
type BlockCache = Arc<Mutex<LruCache<usize, Block>>>;
|
||||
type BlockCache = Arc<Mutex<LruCache<u64, Block>>>;
|
||||
|
||||
/// Reads document off tantivy's [`Store`](./index.html)
|
||||
#[derive(Clone)]
|
||||
pub struct StoreReader {
|
||||
data: FileSlice,
|
||||
offset_index_file: OwnedBytes,
|
||||
max_doc: DocId,
|
||||
cache: BlockCache,
|
||||
cache_hits: Arc<AtomicUsize>,
|
||||
cache_misses: Arc<AtomicUsize>,
|
||||
skip_index: Arc<SkipIndex>,
|
||||
space_usage: StoreSpaceUsage,
|
||||
}
|
||||
|
||||
impl StoreReader {
|
||||
/// Opens a store reader
|
||||
// TODO rename open
|
||||
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
|
||||
let (data_file, offset_index_file, max_doc) = split_file(store_file)?;
|
||||
let (data_file, offset_index_file) = split_file(store_file)?;
|
||||
let index_data = offset_index_file.read_bytes()?;
|
||||
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
|
||||
let skip_index = SkipIndex::from(index_data);
|
||||
Ok(StoreReader {
|
||||
data: data_file,
|
||||
offset_index_file: offset_index_file.read_bytes()?,
|
||||
max_doc,
|
||||
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
skip_index: Arc::new(skip_index),
|
||||
space_usage,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn block_index(&self) -> SkipList<'_, u64> {
|
||||
SkipList::from(self.offset_index_file.as_slice())
|
||||
pub(crate) fn block_checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
self.skip_index.checkpoints()
|
||||
}
|
||||
|
||||
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
||||
self.block_index()
|
||||
.seek(u64::from(doc_id) + 1)
|
||||
.map(|(doc, offset)| (doc as DocId, offset))
|
||||
.unwrap_or((0u32, 0u64))
|
||||
fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
|
||||
self.skip_index.seek(doc_id)
|
||||
}
|
||||
|
||||
pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> {
|
||||
self.data.read_bytes()
|
||||
}
|
||||
|
||||
fn compressed_block(&self, addr: usize) -> io::Result<OwnedBytes> {
|
||||
let (block_len_bytes, block_body) = self.data.slice_from(addr).split(4);
|
||||
let block_len = u32::deserialize(&mut block_len_bytes.read_bytes()?)?;
|
||||
block_body.slice_to(block_len as usize).read_bytes()
|
||||
fn compressed_block(&self, checkpoint: &Checkpoint) -> io::Result<OwnedBytes> {
|
||||
self.data
|
||||
.slice(
|
||||
checkpoint.start_offset as usize,
|
||||
checkpoint.end_offset as usize,
|
||||
)
|
||||
.read_bytes()
|
||||
}
|
||||
|
||||
fn read_block(&self, block_offset: usize) -> io::Result<Block> {
|
||||
if let Some(block) = self.cache.lock().unwrap().get(&block_offset) {
|
||||
fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
|
||||
if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.start_offset) {
|
||||
self.cache_hits.fetch_add(1, Ordering::SeqCst);
|
||||
return Ok(block.clone());
|
||||
}
|
||||
|
||||
self.cache_misses.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let compressed_block = self.compressed_block(block_offset)?;
|
||||
let compressed_block = self.compressed_block(checkpoint)?;
|
||||
let mut decompressed_block = vec![];
|
||||
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
|
||||
|
||||
let block = Arc::new(decompressed_block);
|
||||
self.cache.lock().unwrap().put(block_offset, block.clone());
|
||||
self.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.put(checkpoint.start_offset, block.clone());
|
||||
|
||||
Ok(block)
|
||||
}
|
||||
@@ -91,10 +96,11 @@ impl StoreReader {
|
||||
/// It should not be called to score documents
|
||||
/// for instance.
|
||||
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
|
||||
let (first_doc_id, block_offset) = self.block_offset(doc_id);
|
||||
let mut cursor = &self.read_block(block_offset as usize)?[..];
|
||||
|
||||
for _ in first_doc_id..doc_id {
|
||||
let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| {
|
||||
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
|
||||
})?;
|
||||
let mut cursor = &self.read_block(&checkpoint)?[..];
|
||||
for _ in checkpoint.start_doc..doc_id {
|
||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
|
||||
cursor = &cursor[doc_length..];
|
||||
}
|
||||
@@ -106,23 +112,16 @@ impl StoreReader {
|
||||
|
||||
/// Summarize total space usage of this store reader.
|
||||
pub fn space_usage(&self) -> StoreSpaceUsage {
|
||||
StoreSpaceUsage::new(self.data.len(), self.offset_index_file.len())
|
||||
self.space_usage.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> {
|
||||
let data_len = data.len();
|
||||
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>();
|
||||
let serialized_offset: OwnedBytes = data.slice(footer_offset, data_len).read_bytes()?;
|
||||
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> {
|
||||
let (data, footer_len_bytes) = data.split_from_end(size_of::<u64>());
|
||||
let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;
|
||||
let mut serialized_offset_buf = serialized_offset.as_slice();
|
||||
let offset = u64::deserialize(&mut serialized_offset_buf)?;
|
||||
let offset = offset as usize;
|
||||
let max_doc = u32::deserialize(&mut serialized_offset_buf)?;
|
||||
Ok((
|
||||
data.slice(0, offset),
|
||||
data.slice(offset, footer_offset),
|
||||
max_doc,
|
||||
))
|
||||
let offset = u64::deserialize(&mut serialized_offset_buf)? as usize;
|
||||
Ok(data.split(offset))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -197,7 +196,7 @@ mod tests {
|
||||
.unwrap()
|
||||
.peek_lru()
|
||||
.map(|(&k, _)| k as usize),
|
||||
Some(18862)
|
||||
Some(18806)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,168 +0,0 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
mod skiplist;
|
||||
mod skiplist_builder;
|
||||
|
||||
pub use self::skiplist::SkipList;
|
||||
pub use self::skiplist_builder::SkipListBuilder;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{SkipList, SkipListBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_skiplist() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
|
||||
skip_list_builder.insert(2, &3).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next(), Some((2, 3)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist2() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist3() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(7, &()).unwrap();
|
||||
skip_list_builder.insert(9, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (3, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (5, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (7, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (9, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist4() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(7, &()).unwrap();
|
||||
skip_list_builder.insert(9, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
skip_list.seek(5);
|
||||
assert_eq!(skip_list.next().unwrap(), (5, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (7, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (9, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist5() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(6, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
skip_list.seek(6);
|
||||
assert_eq!(skip_list.next().unwrap(), (6, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist6() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(7, &()).unwrap();
|
||||
skip_list_builder.insert(9, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
skip_list.seek(10);
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist7() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
for i in 0..1000 {
|
||||
skip_list_builder.insert(i, &()).unwrap();
|
||||
}
|
||||
skip_list_builder.insert(1004, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (0, ()));
|
||||
skip_list.seek(431);
|
||||
assert_eq!(skip_list.next().unwrap(), (431, ()));
|
||||
skip_list.seek(1003);
|
||||
assert_eq!(skip_list.next().unwrap(), (1004, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist8() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(8);
|
||||
skip_list_builder.insert(2, &3).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 11);
|
||||
assert_eq!(output[0], 1u8 + 128u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist9() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(4);
|
||||
for i in 0..4 * 4 * 4 {
|
||||
skip_list_builder.insert(i, &i).unwrap();
|
||||
}
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 774);
|
||||
assert_eq!(output[0], 4u8 + 128u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist10() {
|
||||
// checking that void gets serialized to nothing.
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
for i in 0..((4 * 4 * 4) - 1) {
|
||||
skip_list_builder.insert(i, &()).unwrap();
|
||||
}
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 230);
|
||||
assert_eq!(output[0], 128u8 + 3u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist11() {
|
||||
// checking that void gets serialized to nothing.
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
for i in 0..(4 * 4) {
|
||||
skip_list_builder.insert(i, &()).unwrap();
|
||||
}
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 65);
|
||||
assert_eq!(output[0], 128u8 + 3u8);
|
||||
}
|
||||
}
|
||||
@@ -1,133 +0,0 @@
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use std::cmp::max;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
static EMPTY: [u8; 0] = [];
|
||||
|
||||
struct Layer<'a, T> {
|
||||
data: &'a [u8],
|
||||
cursor: &'a [u8],
|
||||
next_id: Option<u64>,
|
||||
_phantom_: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> {
|
||||
type Item = (u64, T);
|
||||
|
||||
fn next(&mut self) -> Option<(u64, T)> {
|
||||
if let Some(cur_id) = self.next_id {
|
||||
let cur_val = T::deserialize(&mut self.cursor).unwrap();
|
||||
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
|
||||
Some((cur_id, cur_val))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> {
|
||||
fn from(data: &'a [u8]) -> Layer<'a, T> {
|
||||
let mut cursor = data;
|
||||
let next_id = VInt::deserialize_u64(&mut cursor).ok();
|
||||
Layer {
|
||||
data,
|
||||
cursor,
|
||||
next_id,
|
||||
_phantom_: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> Layer<'a, T> {
|
||||
fn empty() -> Layer<'a, T> {
|
||||
Layer {
|
||||
data: &EMPTY,
|
||||
cursor: &EMPTY,
|
||||
next_id: None,
|
||||
_phantom_: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
fn seek_offset(&mut self, offset: usize) {
|
||||
self.cursor = &self.data[offset..];
|
||||
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
|
||||
}
|
||||
|
||||
// Returns the last element (key, val)
|
||||
// such that (key < doc_id)
|
||||
//
|
||||
// If there is no such element anymore,
|
||||
// returns None.
|
||||
//
|
||||
// If the element exists, it will be returned
|
||||
// at the next call to `.next()`.
|
||||
fn seek(&mut self, key: u64) -> Option<(u64, T)> {
|
||||
let mut result: Option<(u64, T)> = None;
|
||||
loop {
|
||||
if let Some(next_id) = self.next_id {
|
||||
if next_id < key {
|
||||
if let Some(v) = self.next() {
|
||||
result = Some(v);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipList<'a, T: BinarySerializable> {
|
||||
data_layer: Layer<'a, T>,
|
||||
skip_layers: Vec<Layer<'a, u64>>,
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> {
|
||||
type Item = (u64, T);
|
||||
|
||||
fn next(&mut self) -> Option<(u64, T)> {
|
||||
self.data_layer.next()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> SkipList<'a, T> {
|
||||
pub fn seek(&mut self, key: u64) -> Option<(u64, T)> {
|
||||
let mut next_layer_skip: Option<(u64, u64)> = None;
|
||||
for skip_layer in &mut self.skip_layers {
|
||||
if let Some((_, offset)) = next_layer_skip {
|
||||
skip_layer.seek_offset(offset as usize);
|
||||
}
|
||||
next_layer_skip = skip_layer.seek(key);
|
||||
}
|
||||
if let Some((_, offset)) = next_layer_skip {
|
||||
self.data_layer.seek_offset(offset as usize);
|
||||
}
|
||||
self.data_layer.seek(key)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> {
|
||||
fn from(mut data: &'a [u8]) -> SkipList<'a, T> {
|
||||
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|el| el.0)
|
||||
.collect();
|
||||
let num_layers = offsets.len();
|
||||
let layers_data: &[u8] = data;
|
||||
let data_layer: Layer<'a, T> = if num_layers == 0 {
|
||||
Layer::empty()
|
||||
} else {
|
||||
let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize];
|
||||
Layer::from(first_layer_data)
|
||||
};
|
||||
let skip_layers = (0..max(1, num_layers) - 1)
|
||||
.map(|i| (offsets[i] as usize, offsets[i + 1] as usize))
|
||||
.map(|(start, stop)| Layer::from(&layers_data[start..stop]))
|
||||
.collect();
|
||||
SkipList {
|
||||
skip_layers,
|
||||
data_layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,98 +0,0 @@
|
||||
use crate::common::{is_power_of_2, BinarySerializable, VInt};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
struct LayerBuilder<T: BinarySerializable> {
|
||||
period_mask: usize,
|
||||
buffer: Vec<u8>,
|
||||
len: usize,
|
||||
_phantom_: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T: BinarySerializable> LayerBuilder<T> {
|
||||
fn written_size(&self) -> usize {
|
||||
self.buffer.len()
|
||||
}
|
||||
|
||||
fn write(&self, output: &mut dyn Write) -> Result<(), io::Error> {
|
||||
output.write_all(&self.buffer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn with_period(period: usize) -> LayerBuilder<T> {
|
||||
assert!(is_power_of_2(period), "The period has to be a power of 2.");
|
||||
LayerBuilder {
|
||||
period_mask: (period - 1),
|
||||
buffer: Vec::new(),
|
||||
len: 0,
|
||||
_phantom_: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(&mut self, key: u64, value: &T) -> io::Result<Option<(u64, u64)>> {
|
||||
self.len += 1;
|
||||
let offset = self.written_size() as u64;
|
||||
VInt(key).serialize_into_vec(&mut self.buffer);
|
||||
value.serialize(&mut self.buffer)?;
|
||||
let emit_skip_info = (self.period_mask & self.len) == 0;
|
||||
if emit_skip_info {
|
||||
Ok(Some((key, offset)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipListBuilder<T: BinarySerializable> {
|
||||
period: usize,
|
||||
data_layer: LayerBuilder<T>,
|
||||
skip_layers: Vec<LayerBuilder<u64>>,
|
||||
}
|
||||
|
||||
impl<T: BinarySerializable> SkipListBuilder<T> {
|
||||
pub fn new(period: usize) -> SkipListBuilder<T> {
|
||||
SkipListBuilder {
|
||||
period,
|
||||
data_layer: LayerBuilder::with_period(period),
|
||||
skip_layers: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder<u64> {
|
||||
if layer_id == self.skip_layers.len() {
|
||||
let layer_builder = LayerBuilder::with_period(self.period);
|
||||
self.skip_layers.push(layer_builder);
|
||||
}
|
||||
&mut self.skip_layers[layer_id]
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> {
|
||||
let mut skip_pointer = self.data_layer.insert(key, dest)?;
|
||||
for layer_id in 0.. {
|
||||
if let Some((skip_doc_id, skip_offset)) = skip_pointer {
|
||||
skip_pointer = self
|
||||
.get_skip_layer(layer_id)
|
||||
.insert(skip_doc_id, &skip_offset)?;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> {
|
||||
let mut size: u64 = self.data_layer.buffer.len() as u64;
|
||||
let mut layer_sizes = vec![VInt(size)];
|
||||
for layer in self.skip_layers.iter().rev() {
|
||||
size += layer.buffer.len() as u64;
|
||||
layer_sizes.push(VInt(size));
|
||||
}
|
||||
layer_sizes.serialize(output)?;
|
||||
self.data_layer.write(output)?;
|
||||
for layer in self.skip_layers.iter().rev() {
|
||||
layer.write(output)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,12 @@
|
||||
use super::compress;
|
||||
use super::skiplist::SkipListBuilder;
|
||||
use super::index::SkipIndexBuilder;
|
||||
use super::StoreReader;
|
||||
use crate::common::CountingWriter;
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::directory::TerminatingWrite;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::schema::Document;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
use std::io::{self, Write};
|
||||
|
||||
@@ -21,7 +22,8 @@ const BLOCK_SIZE: usize = 16_384;
|
||||
///
|
||||
pub struct StoreWriter {
|
||||
doc: DocId,
|
||||
offset_index_writer: SkipListBuilder<u64>,
|
||||
first_doc_in_block: DocId,
|
||||
offset_index_writer: SkipIndexBuilder,
|
||||
writer: CountingWriter<WritePtr>,
|
||||
intermediary_buffer: Vec<u8>,
|
||||
current_block: Vec<u8>,
|
||||
@@ -35,7 +37,8 @@ impl StoreWriter {
|
||||
pub fn new(writer: WritePtr) -> StoreWriter {
|
||||
StoreWriter {
|
||||
doc: 0,
|
||||
offset_index_writer: SkipListBuilder::new(4),
|
||||
first_doc_in_block: 0,
|
||||
offset_index_writer: SkipIndexBuilder::new(),
|
||||
writer: CountingWriter::wrap(writer),
|
||||
intermediary_buffer: Vec::new(),
|
||||
current_block: Vec::new(),
|
||||
@@ -68,11 +71,9 @@ impl StoreWriter {
|
||||
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
|
||||
if !self.current_block.is_empty() {
|
||||
self.write_and_compress_block()?;
|
||||
self.offset_index_writer
|
||||
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
|
||||
}
|
||||
let doc_offset = self.doc;
|
||||
let start_offset = self.writer.written_bytes() as u64;
|
||||
let doc_shift = self.doc;
|
||||
let start_shift = self.writer.written_bytes() as u64;
|
||||
|
||||
// just bulk write all of the block of the given reader.
|
||||
self.writer
|
||||
@@ -80,22 +81,33 @@ impl StoreWriter {
|
||||
|
||||
// concatenate the index of the `store_reader`, after translating
|
||||
// its start doc id and its start file offset.
|
||||
for (next_doc_id, block_addr) in store_reader.block_index() {
|
||||
self.doc = doc_offset + next_doc_id as u32;
|
||||
self.offset_index_writer
|
||||
.insert(u64::from(self.doc), &(start_offset + block_addr))?;
|
||||
for mut checkpoint in store_reader.block_checkpoints() {
|
||||
checkpoint.start_doc += doc_shift;
|
||||
checkpoint.end_doc += doc_shift;
|
||||
checkpoint.start_offset += start_shift;
|
||||
checkpoint.end_offset += start_shift;
|
||||
self.offset_index_writer.insert(checkpoint);
|
||||
self.doc = checkpoint.end_doc;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
||||
assert!(self.doc > 0);
|
||||
self.intermediary_buffer.clear();
|
||||
compress(&self.current_block[..], &mut self.intermediary_buffer)?;
|
||||
(self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?;
|
||||
let start_offset = self.writer.written_bytes();
|
||||
self.writer.write_all(&self.intermediary_buffer)?;
|
||||
self.offset_index_writer
|
||||
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
|
||||
let end_offset = self.writer.written_bytes();
|
||||
let end_doc = self.doc;
|
||||
self.offset_index_writer.insert(Checkpoint {
|
||||
start_doc: self.first_doc_in_block,
|
||||
end_doc,
|
||||
start_offset,
|
||||
end_offset,
|
||||
});
|
||||
self.current_block.clear();
|
||||
self.first_doc_in_block = self.doc;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -110,7 +122,6 @@ impl StoreWriter {
|
||||
let header_offset: u64 = self.writer.written_bytes() as u64;
|
||||
self.offset_index_writer.write(&mut self.writer)?;
|
||||
header_offset.serialize(&mut self.writer)?;
|
||||
self.doc.serialize(&mut self.writer)?;
|
||||
self.writer.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user