Compare commits

..

4 Commits

Author SHA1 Message Date
Paul Masurel
98d17c9c77 rem broken seg
brotli by default
2021-01-09 09:30:44 +09:00
Paul Masurel
5583e5d0ad test for seg3 2021-01-09 09:11:57 +09:00
Paul Masurel
1fcf656ed7 added test with other broken segment 2021-01-09 08:55:40 +09:00
Paul Masurel
68fa58d74b DO NOT MERGE 2021-01-09 08:55:40 +09:00
10 changed files with 89 additions and 60 deletions

View File

@@ -73,7 +73,7 @@ debug-assertions = true
overflow-checks = true
[features]
default = ["mmap"]
default = ["mmap", "brotli"]
mmap = ["fs2", "tempfile", "memmap"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4"]

View File

@@ -44,12 +44,12 @@ impl VecWriter {
impl Drop for VecWriter {
fn drop(&mut self) {
// if !self.is_flushed {
// panic!(
// "You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
// self.path
// )
// }
if !self.is_flushed {
panic!(
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
self.path
)
}
}
}

View File

@@ -25,10 +25,9 @@ use futures::future::Future;
use futures::future::TryFutureExt;
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::io::{self, Write};
use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
@@ -410,13 +409,6 @@ impl SegmentUpdater {
let _send_result = merging_future_send.send(segment_meta);
}
Err(e) => {
if let crate::TantivyError::IOError(ref io_err) = &e {
if io_err.kind() == io::ErrorKind::InvalidData {
println!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
error!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
process::exit(1);
}
}
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
@@ -431,9 +423,7 @@ impl SegmentUpdater {
});
Ok(merging_future_recv
.unwrap_or_else(|e| {
Err(crate::TantivyError::SystemError("Merge failed".to_string()))
}))
.unwrap_or_else(|_| Err(crate::TantivyError::SystemError("Merge failed".to_string()))))
}
async fn consider_merge_options(&self) {

View File

@@ -28,7 +28,8 @@ pub struct Checkpoint {
impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) && (self.start_offset == other.end_offset)
(self.start_doc == other.end_doc) &&
(self.start_offset == other.end_offset)
}
}
@@ -95,7 +96,7 @@ mod tests {
Checkpoint {
start_doc: 0,
end_doc: 3,
start_offset: 0,
start_offset: 4,
end_offset: 9,
},
Checkpoint {
@@ -200,21 +201,19 @@ mod tests {
Ok(())
}
fn integrate_delta(vals: Vec<u64>) -> Vec<u64> {
let mut output = Vec::with_capacity(vals.len() + 1);
output.push(0u64);
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
let mut prev = 0u64;
for val in vals {
let new_val = val + prev;
for val in vals.iter_mut() {
let new_val = *val + prev;
prev = new_val;
output.push(new_val);
*val = new_val;
}
output
vals
}
// Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(0..max_len)
(1..max_len)
.prop_flat_map(move |len: usize| {
(
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),

View File

@@ -77,28 +77,6 @@ impl SkipIndex {
SkipIndex { layers }
}
pub fn is_valid(&self) -> bool {
let checkpoints: Vec<Checkpoint> = self.checkpoints().collect();
let mut prev_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 0u32,
start_offset: 0u64,
end_offset: 0u64,
};
for checkpoint in checkpoints {
if !checkpoint.follows(&prev_checkpoint) {
return false;
}
prev_checkpoint = checkpoint;
}
true
}
pub(crate) fn from_bytes(data: &[u8]) -> SkipIndex {
let data = OwnedBytes::new(data.to_owned());
SkipIndex::open(data)
}
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()

View File

@@ -1,6 +1,6 @@
use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, SkipIndex, CHECKPOINT_PERIOD};
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use std::io;
use std::io::Write;
@@ -87,8 +87,7 @@ impl SkipIndexBuilder {
}
}
pub fn write<W: Write>(mut self, real_output: &mut W) -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
@@ -109,14 +108,10 @@ impl SkipIndexBuilder {
layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset));
}
layer_sizes.serialize(&mut output)?;
layer_sizes.serialize(output)?;
for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?;
}
if !SkipIndex::from_bytes(&output).is_valid() {
return Err(io::Error::new(io::ErrorKind::InvalidData, "about to write invalid skip index"));
}
real_output.write_all(&output)?;
Ok(())
}
}

View File

@@ -36,6 +36,8 @@ and should rely on either
mod index;
mod reader;
mod writer;
mod tests_store;
pub use self::reader::StoreReader;
pub use self::writer::StoreWriter;

View File

@@ -50,6 +50,22 @@ impl StoreReader {
self.skip_index.checkpoints()
}
pub fn documents(&self) -> Vec<Document> {
let mut documents = Vec::new();
for checkpoint in self.skip_index.checkpoints() {
println!("{:?}", checkpoint);
let block = self.read_block(&checkpoint).unwrap();
let mut cursor = &block[..];
while cursor.len() > 0 {
let doc_length = VInt::deserialize(&mut cursor).unwrap().val() as usize;
let doc = Document::deserialize(&mut &cursor[..doc_length]).unwrap();
documents.push(doc);
cursor = &cursor[doc_length..];
}
}
documents
}
fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
self.skip_index.seek(doc_id)
}
@@ -104,7 +120,6 @@ impl StoreReader {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[..doc_length];
Ok(Document::deserialize(&mut cursor)?)

50
src/store/tests_store.rs Normal file
View File

@@ -0,0 +1,50 @@
use std::path::Path;
use crate::HasLen;
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, RAMDirectory};
use crate::fastfield::DeleteBitSet;
use super::{StoreReader, StoreWriter};
#[test]
fn test_toto2() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
let path = Path::new("b6029ade1b954ea1acad15b432eaacb9.store");
assert!(directory.validate_checksum(path)?);
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let documents = store.documents();
// for doc in documents {
// println!("{:?}", doc);
// }
let doc= store.get(15_086)?;
Ok(())
}
#[test]
fn test_toto() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
assert!(directory.validate_checksum(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store"))?);
let store_file = directory.open_read(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store.patched"))?;
let store = StoreReader::open(store_file)?;
let doc= store.get(53)?;
println!("{:?}", doc);
// let documents = store.documents();
// let ram_directory = RAMDirectory::create();
// let path = Path::new("store");
// let store_wrt = ram_directory.open_write(path)?;
// let mut store_writer = StoreWriter::new(store_wrt);
// for doc in &documents {
// store_writer.store(doc)?;
// }
// store_writer.close()?;
// let store_data = ram_directory.open_read(path)?;
// let new_store = StoreReader::open(store_data)?;
// for doc in 0..59 {
// println!("{}", doc);
// let doc = new_store.get(doc)?;
// println!("{:?}", doc);
// }
Ok(())
}