From acfb057462422db52f7800e954a5df2fceaf735a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 11 Jan 2021 11:51:19 +0900 Subject: [PATCH] Fail fast if the skip index being written is broken. --- src/directory/ram_directory.rs | 12 ++++++------ src/indexer/segment_updater.rs | 14 ++++++++++++-- src/store/index/skip_index.rs | 22 ++++++++++++++++++++++ src/store/index/skip_index_builder.rs | 11 ++++++++--- 4 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index b3c4d05e5..6e7685c80 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -44,12 +44,12 @@ impl VecWriter { impl Drop for VecWriter { fn drop(&mut self) { - if !self.is_flushed { - panic!( - "You forgot to flush {:?} before its writter got Drop. Do not rely on drop.", - self.path - ) - } + // if !self.is_flushed { + // panic!( + // "You forgot to flush {:?} before its writter got Drop. Do not rely on drop.", + // self.path + // ) + // } } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index d0cb240bc..f4349e655 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -25,9 +25,10 @@ use futures::future::Future; use futures::future::TryFutureExt; use std::borrow::BorrowMut; use std::collections::HashSet; -use std::io::Write; +use std::io::{self, Write}; use std::ops::Deref; use std::path::PathBuf; +use std::process; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::sync::RwLock; @@ -409,6 +410,13 @@ impl SegmentUpdater { let _send_result = merging_future_send.send(segment_meta); } Err(e) => { + if let crate::TantivyError::IOError(ref io_err) = &e { + if io_err.kind() == io::ErrorKind::InvalidData { + println!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids()); + error!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids()); + process::exit(1); + } + } warn!( "Merge of {:?} was cancelled: {:?}", merge_operation.segment_ids().to_vec(), @@ -423,7 +431,9 @@ impl SegmentUpdater { }); Ok(merging_future_recv - .unwrap_or_else(|_| Err(crate::TantivyError::SystemError("Merge failed".to_string())))) + .unwrap_or_else(|e| { + Err(crate::TantivyError::SystemError("Merge failed".to_string())) + })) } async fn consider_merge_options(&self) { diff --git a/src/store/index/skip_index.rs b/src/store/index/skip_index.rs index e8b431246..30a7e668a 100644 --- a/src/store/index/skip_index.rs +++ b/src/store/index/skip_index.rs @@ -77,6 +77,28 @@ impl SkipIndex { SkipIndex { layers } } + pub fn is_valid(&self) -> bool { + let checkpoints: Vec = self.checkpoints().collect(); + let mut prev_checkpoint = Checkpoint { + start_doc: 0u32, + end_doc: 0u32, + start_offset: 0u64, + end_offset: 0u64, + }; + for checkpoint in checkpoints { + if !checkpoint.follows(&prev_checkpoint) { + return false; + } + prev_checkpoint = checkpoint; + } + true + } + + pub(crate) fn from_bytes(data: &[u8]) -> SkipIndex { + let data = OwnedBytes::new(data.to_owned()); + SkipIndex::open(data) + } + pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator + 'a { self.layers .last() diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index 6d46dabed..5d6d42ea0 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -1,6 +1,6 @@ use crate::common::{BinarySerializable, VInt}; use crate::store::index::block::CheckpointBlock; -use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD}; +use crate::store::index::{Checkpoint, SkipIndex, CHECKPOINT_PERIOD}; use std::io; use std::io::Write; @@ -87,7 +87,8 @@ impl SkipIndexBuilder { } } - pub fn write(mut self, output: &mut W) -> io::Result<()> { + pub fn write(mut self, real_output: &mut W) -> io::Result<()> { + let mut output: Vec = Vec::new(); let mut last_pointer = None; for skip_layer in self.layers.iter_mut() { if let Some(checkpoint) = last_pointer { @@ -108,10 +109,14 @@ impl SkipIndexBuilder { layer_offset += layer_buffer.len() as u64; layer_sizes.push(VInt(layer_offset)); } - layer_sizes.serialize(output)?; + layer_sizes.serialize(&mut output)?; for layer_buffer in layer_buffers { output.write_all(&layer_buffer[..])?; } + if !SkipIndex::from_bytes(&output).is_valid() { + return Err(io::Error::new(io::ErrorKind::InvalidData, "about to write invalid skip index")); + } + real_output.write_all(&output)?; Ok(()) } }