The segment stacking optimization is not updating "first_doc_in_block".
This commit is contained in:
Paul Masurel
2021-01-07 18:36:12 +09:00
parent 96f24b078e
commit caf2a38b7e
4 changed files with 28 additions and 10 deletions

View File

@@ -43,6 +43,9 @@ impl CheckpointBlock {
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
if let Some(prev_checkpoint) = self.checkpoints.last() {
assert!(checkpoint.follows(prev_checkpoint));
}
self.checkpoints.push(checkpoint);
}

View File

@@ -26,6 +26,13 @@ pub struct Checkpoint {
pub end_offset: u64,
}
impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) &&
(self.start_offset == other.end_offset)
}
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(

View File

@@ -28,18 +28,20 @@ impl LayerBuilder {
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
self.block.doc_interval().map(|(start_doc, end_doc)| {
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Checkpoint {
Some(Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
}
})
})
} else {
None
}
}
fn push(&mut self, checkpoint: Checkpoint) {
@@ -48,7 +50,7 @@ impl LayerBuilder {
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
if emit_skip_info {
self.flush_block()
} else {
@@ -57,13 +59,14 @@ impl LayerBuilder {
}
}
#[derive(Default)]
pub struct SkipIndexBuilder {
layers: Vec<LayerBuilder>,
}
impl SkipIndexBuilder {
pub fn new() -> SkipIndexBuilder {
SkipIndexBuilder { layers: Vec::new() }
Self::default()
}
fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {

View File

@@ -72,6 +72,7 @@ impl StoreWriter {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
@@ -86,12 +87,17 @@ impl StoreWriter {
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
self.register_checkpoint(checkpoint);
}
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint);
self.first_doc_in_block = checkpoint.end_doc;
self.doc = checkpoint.end_doc;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
@@ -100,14 +106,13 @@ impl StoreWriter {
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.offset_index_writer.insert(Checkpoint {
self.register_checkpoint(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}