Compare commits

..

2 Commits

Author SHA1 Message Date
Paul Masurel
acfb057462 Fail fast if the skip index being written is broken. 2021-01-11 12:38:13 +09:00
Paul Masurel
b17a10546a Minor change in unit test. 2021-01-11 11:33:59 +09:00
10 changed files with 64 additions and 84 deletions

View File

@@ -47,7 +47,6 @@ murmurhash32 = "0.2"
chrono = "0.4" chrono = "0.4"
smallvec = "1" smallvec = "1"
rayon = "1" rayon = "1"
env_logger = "0.8"
lru = "0.6" lru = "0.6"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]

View File

@@ -35,18 +35,12 @@ fn load_metas(
inventory: &SegmentMetaInventory, inventory: &SegmentMetaInventory,
) -> crate::Result<IndexMeta> { ) -> crate::Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?; let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8(meta_data) let meta_string = String::from_utf8_lossy(&meta_data);
.map_err(|utf8_err| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file is not valid utf-8. {:?}", utf8_err)
)
})?;
IndexMeta::deserialize(&meta_string, &inventory) IndexMeta::deserialize(&meta_string, &inventory)
.map_err(|e| { .map_err(|e| {
DataCorruption::new( DataCorruption::new(
META_FILEPATH.to_path_buf(), META_FILEPATH.to_path_buf(),
format!("Meta file cannot be deserialized. {:?}. content = {}", e, meta_string), format!("Meta file cannot be deserialized. {:?}.", e),
) )
}) })
.map_err(From::from) .map_err(From::from)

View File

@@ -44,12 +44,12 @@ impl VecWriter {
impl Drop for VecWriter { impl Drop for VecWriter {
fn drop(&mut self) { fn drop(&mut self) {
if !self.is_flushed { // if !self.is_flushed {
panic!( // panic!(
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop.", // "You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
self.path // self.path
) // )
} // }
} }
} }

View File

@@ -20,7 +20,6 @@ fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
#[test] #[test]
#[ignore] #[ignore]
fn test_functional_store() -> crate::Result<()> { fn test_functional_store() -> crate::Result<()> {
env_logger::init();
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED); let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
@@ -36,7 +35,8 @@ fn test_functional_store() -> crate::Result<()> {
let mut doc_set: Vec<u64> = Vec::new(); let mut doc_set: Vec<u64> = Vec::new();
let mut doc_id = 0u64; let mut doc_id = 0u64;
for iteration in 0.. { for iteration in 0..500 {
dbg!(iteration);
let num_docs: usize = rng.gen_range(0..4); let num_docs: usize = rng.gen_range(0..4);
if doc_set.len() >= 1 { if doc_set.len() >= 1 {
let doc_to_remove_id = rng.gen_range(0..doc_set.len()); let doc_to_remove_id = rng.gen_range(0..doc_set.len());
@@ -51,7 +51,6 @@ fn test_functional_store() -> crate::Result<()> {
index_writer.commit()?; index_writer.commit()?;
reader.reload()?; reader.reload()?;
let searcher = reader.searcher(); let searcher = reader.searcher();
println!("#{} - {}", iteration, searcher.segment_readers().len());
check_index_content(&searcher, &doc_set)?; check_index_content(&searcher, &doc_set)?;
} }
Ok(()) Ok(())

View File

@@ -25,9 +25,10 @@ use futures::future::Future;
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::collections::HashSet; use std::collections::HashSet;
use std::io::Write; use std::io::{self, Write};
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
@@ -409,6 +410,13 @@ impl SegmentUpdater {
let _send_result = merging_future_send.send(segment_meta); let _send_result = merging_future_send.send(segment_meta);
} }
Err(e) => { Err(e) => {
if let crate::TantivyError::IOError(ref io_err) = &e {
if io_err.kind() == io::ErrorKind::InvalidData {
println!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
error!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
process::exit(1);
}
}
warn!( warn!(
"Merge of {:?} was cancelled: {:?}", "Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(), merge_operation.segment_ids().to_vec(),
@@ -423,7 +431,9 @@ impl SegmentUpdater {
}); });
Ok(merging_future_recv Ok(merging_future_recv
.unwrap_or_else(|_| Err(crate::TantivyError::SystemError("Merge failed".to_string())))) .unwrap_or_else(|e| {
Err(crate::TantivyError::SystemError("Merge failed".to_string()))
}))
} }
async fn consider_merge_options(&self) { async fn consider_merge_options(&self) {

View File

@@ -1,4 +1,4 @@
const CHECKPOINT_PERIOD: usize = 2; const CHECKPOINT_PERIOD: usize = 8;
use std::fmt; use std::fmt;
mod block; mod block;
@@ -28,8 +28,7 @@ pub struct Checkpoint {
impl Checkpoint { impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool { pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) && (self.start_doc == other.end_doc) && (self.start_offset == other.end_offset)
(self.start_offset == other.end_offset)
} }
} }
@@ -96,7 +95,7 @@ mod tests {
Checkpoint { Checkpoint {
start_doc: 0, start_doc: 0,
end_doc: 3, end_doc: 3,
start_offset: 4, start_offset: 0,
end_offset: 9, end_offset: 9,
}, },
Checkpoint { Checkpoint {
@@ -201,19 +200,21 @@ mod tests {
Ok(()) Ok(())
} }
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> { fn integrate_delta(vals: Vec<u64>) -> Vec<u64> {
let mut output = Vec::with_capacity(vals.len() + 1);
output.push(0u64);
let mut prev = 0u64; let mut prev = 0u64;
for val in vals.iter_mut() { for val in vals {
let new_val = *val + prev; let new_val = val + prev;
prev = new_val; prev = new_val;
*val = new_val; output.push(new_val);
} }
vals output
} }
// Generates a sequence of n valid checkpoints, with n < max_len. // Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> { fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(1..max_len) (0..max_len)
.prop_flat_map(move |len: usize| { .prop_flat_map(move |len: usize| {
( (
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta), proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),

View File

@@ -77,6 +77,28 @@ impl SkipIndex {
SkipIndex { layers } SkipIndex { layers }
} }
pub fn is_valid(&self) -> bool {
let checkpoints: Vec<Checkpoint> = self.checkpoints().collect();
let mut prev_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 0u32,
start_offset: 0u64,
end_offset: 0u64,
};
for checkpoint in checkpoints {
if !checkpoint.follows(&prev_checkpoint) {
return false;
}
prev_checkpoint = checkpoint;
}
true
}
pub(crate) fn from_bytes(data: &[u8]) -> SkipIndex {
let data = OwnedBytes::new(data.to_owned());
SkipIndex::open(data)
}
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a { pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers self.layers
.last() .last()

View File

@@ -1,6 +1,6 @@
use crate::common::{BinarySerializable, VInt}; use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock; use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD}; use crate::store::index::{Checkpoint, SkipIndex, CHECKPOINT_PERIOD};
use std::io; use std::io;
use std::io::Write; use std::io::Write;
@@ -87,7 +87,8 @@ impl SkipIndexBuilder {
} }
} }
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> { pub fn write<W: Write>(mut self, real_output: &mut W) -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let mut last_pointer = None; let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() { for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer { if let Some(checkpoint) = last_pointer {
@@ -108,10 +109,14 @@ impl SkipIndexBuilder {
layer_offset += layer_buffer.len() as u64; layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset)); layer_sizes.push(VInt(layer_offset));
} }
layer_sizes.serialize(output)?; layer_sizes.serialize(&mut output)?;
for layer_buffer in layer_buffers { for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?; output.write_all(&layer_buffer[..])?;
} }
if !SkipIndex::from_bytes(&output).is_valid() {
return Err(io::Error::new(io::ErrorKind::InvalidData, "about to write invalid skip index"));
}
real_output.write_all(&output)?;
Ok(()) Ok(())
} }
} }

View File

@@ -1,50 +0,0 @@
use std::path::Path;
use crate::HasLen;
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, RAMDirectory};
use crate::fastfield::DeleteBitSet;
use super::{StoreReader, StoreWriter};
#[test]
fn test_toto2() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
let path = Path::new("b6029ade1b954ea1acad15b432eaacb9.store");
assert!(directory.validate_checksum(path)?);
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let documents = store.documents();
// for doc in documents {
// println!("{:?}", doc);
// }
let doc= store.get(15_086)?;
Ok(())
}
#[test]
fn test_toto() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
assert!(directory.validate_checksum(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store"))?);
let store_file = directory.open_read(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store.patched"))?;
let store = StoreReader::open(store_file)?;
let doc= store.get(53)?;
println!("{:?}", doc);
// let documents = store.documents();
// let ram_directory = RAMDirectory::create();
// let path = Path::new("store");
// let store_wrt = ram_directory.open_write(path)?;
// let mut store_writer = StoreWriter::new(store_wrt);
// for doc in &documents {
// store_writer.store(doc)?;
// }
// store_writer.close()?;
// let store_data = ram_directory.open_read(path)?;
// let new_store = StoreReader::open(store_data)?;
// for doc in 0..59 {
// println!("{}", doc);
// let doc = new_store.get(doc)?;
// println!("{:?}", doc);
// }
Ok(())
}

View File

@@ -10,7 +10,7 @@ use crate::store::index::Checkpoint;
use crate::DocId; use crate::DocId;
use std::io::{self, Write}; use std::io::{self, Write};
const BLOCK_SIZE: usize = 30; const BLOCK_SIZE: usize = 16_384;
/// Write tantivy's [`Store`](./index.html) /// Write tantivy's [`Store`](./index.html)
/// ///