Compare commits

...

9 Commits

Author SHA1 Message Date
Paul Masurel
acfb057462 Fail fast if the skip index being written is broken. 2021-01-11 12:38:13 +09:00
Paul Masurel
b17a10546a Minor change in unit test. 2021-01-11 11:33:59 +09:00
Paul Masurel
bf6e6e8a7c Merge pull request #972 from tantivy-search/issue/969
Issue/969
2021-01-07 22:49:31 +09:00
Paul Masurel
203b0256a3 Minor renaming 2021-01-07 22:47:57 +09:00
Paul Masurel
caf2a38b7e Closes #969.
The segment stacking optimization is not updating "first_doc_in_block".
2021-01-07 22:43:56 +09:00
Paul Masurel
96f24b078e Added failing unit test. 2021-01-07 22:43:28 +09:00
Paul Masurel
332b50a4eb Merge pull request #970 from tantivy-search/functional-test-store
Added a functional long running test to test store merging.
2021-01-07 14:27:08 +09:00
Paul Masurel
8ca0954b3b Added a functional long running test to test store merging. 2021-01-07 14:07:15 +09:00
Paul Masurel
36343e2de8 Merge pull request #968 from tantivy-search/add-bench-analyzer
added a simple bench for the default analyzer
2021-01-06 21:33:39 +09:00
9 changed files with 206 additions and 66 deletions

View File

@@ -44,12 +44,12 @@ impl VecWriter {
impl Drop for VecWriter {
fn drop(&mut self) {
if !self.is_flushed {
panic!(
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
self.path
)
}
// if !self.is_flushed {
// panic!(
// "You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
// self.path
// )
// }
}
}

View File

@@ -1,31 +1,76 @@
use rand::thread_rng;
use std::collections::HashSet;
use crate::schema::*;
use crate::Index;
use crate::Searcher;
use crate::{doc, schema::*};
use rand::thread_rng;
use rand::Rng;
use std::collections::HashSet;
fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) {
fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader()?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
}
Ok(())
}
#[test]
#[ignore]
fn test_indexing() {
fn test_functional_store() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let reader = index.reader()?;
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut doc_set: Vec<u64> = Vec::new();
let mut doc_id = 0u64;
for iteration in 0..500 {
dbg!(iteration);
let num_docs: usize = rng.gen_range(0..4);
if doc_set.len() >= 1 {
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
}
for _ in 0..num_docs {
doc_set.push(doc_id);
index_writer.add_document(doc!(id_field=>doc_id));
doc_id += 1;
}
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
check_index_content(&searcher, &doc_set)?;
}
Ok(())
}
#[test]
#[ignore]
fn test_functional_indexing() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED);
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema).unwrap();
let reader = index.reader().unwrap();
let index = Index::create_from_tempdir(schema)?;
let reader = index.reader()?;
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000)?;
let mut committed_docs: HashSet<u64> = HashSet::new();
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
@@ -33,13 +78,16 @@ fn test_indexing() {
for _ in 0..200 {
let random_val = rng.gen_range(0..20);
if random_val == 0 {
index_writer.commit().expect("Commit failed");
index_writer.commit()?;
committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear();
reader.reload().unwrap();
reader.reload()?;
let searcher = reader.searcher();
// check that everything is correct.
check_index_content(&searcher, &committed_docs);
check_index_content(
&searcher,
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
)?;
} else {
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
let doc_id_term = Term::from_field_u64(id_field, random_val);
@@ -55,4 +103,5 @@ fn test_indexing() {
}
}
}
Ok(())
}

View File

@@ -25,9 +25,10 @@ use futures::future::Future;
use futures::future::TryFutureExt;
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::io::Write;
use std::io::{self, Write};
use std::ops::Deref;
use std::path::PathBuf;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
@@ -409,6 +410,13 @@ impl SegmentUpdater {
let _send_result = merging_future_send.send(segment_meta);
}
Err(e) => {
if let crate::TantivyError::IOError(ref io_err) = &e {
if io_err.kind() == io::ErrorKind::InvalidData {
println!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
error!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
process::exit(1);
}
}
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
@@ -423,7 +431,9 @@ impl SegmentUpdater {
});
Ok(merging_future_recv
.unwrap_or_else(|_| Err(crate::TantivyError::SystemError("Merge failed".to_string()))))
.unwrap_or_else(|e| {
Err(crate::TantivyError::SystemError("Merge failed".to_string()))
}))
}
async fn consider_merge_options(&self) {

View File

@@ -43,6 +43,9 @@ impl CheckpointBlock {
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
if let Some(prev_checkpoint) = self.checkpoints.last() {
assert!(checkpoint.follows(prev_checkpoint));
}
self.checkpoints.push(checkpoint);
}

View File

@@ -26,6 +26,12 @@ pub struct Checkpoint {
pub end_offset: u64,
}
impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) && (self.start_offset == other.end_offset)
}
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
@@ -39,13 +45,16 @@ impl fmt::Debug for Checkpoint {
#[cfg(test)]
mod tests {
use std::io;
use std::{io, iter};
use futures::executor::block_on;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::indexer::NoMergePolicy;
use crate::schema::{SchemaBuilder, STORED, STRING};
use crate::store::index::Checkpoint;
use crate::DocId;
use crate::{DocAddress, DocId, Index, Term};
use super::{SkipIndex, SkipIndexBuilder};
@@ -54,7 +63,7 @@ mod tests {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
@@ -72,7 +81,7 @@ mod tests {
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
@@ -86,7 +95,7 @@ mod tests {
Checkpoint {
start_doc: 0,
end_doc: 3,
start_offset: 4,
start_offset: 0,
end_offset: 9,
},
Checkpoint {
@@ -121,7 +130,7 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
@@ -133,6 +142,40 @@ mod tests {
(doc as u64) * (doc as u64)
}
#[test]
fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
let text = schema_builder.add_text_field("text", STORED | STRING);
let body = schema_builder.add_text_field("body", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz")
.take(1_000)
.collect();
for _ in 0..20 {
index_writer.add_document(doc!(body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.add_document(doc!(text=>"testb"));
for _ in 0..10 {
index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.delete_term(Term::from_field_text(text, "testb"));
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 30);
for i in 0..searcher.num_docs() as u32 {
let _doc = searcher.doc(DocAddress(0u32, i))?;
}
Ok(())
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
@@ -150,26 +193,28 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
Ok(())
}
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
fn integrate_delta(vals: Vec<u64>) -> Vec<u64> {
let mut output = Vec::with_capacity(vals.len() + 1);
output.push(0u64);
let mut prev = 0u64;
for val in vals.iter_mut() {
let new_val = *val + prev;
for val in vals {
let new_val = val + prev;
prev = new_val;
*val = new_val;
output.push(new_val);
}
vals
output
}
// Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(1..max_len)
(0..max_len)
.prop_flat_map(move |len: usize| {
(
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
@@ -221,7 +266,7 @@ mod tests {
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);

View File

@@ -59,6 +59,46 @@ pub struct SkipIndex {
}
impl SkipIndex {
pub fn open(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
let layer = Layer {
data: data.slice(start_offset as usize, end_offset as usize),
};
layers.push(layer);
start_offset = end_offset;
}
SkipIndex { layers }
}
pub fn is_valid(&self) -> bool {
let checkpoints: Vec<Checkpoint> = self.checkpoints().collect();
let mut prev_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 0u32,
start_offset: 0u64,
end_offset: 0u64,
};
for checkpoint in checkpoints {
if !checkpoint.follows(&prev_checkpoint) {
return false;
}
prev_checkpoint = checkpoint;
}
true
}
pub(crate) fn from_bytes(data: &[u8]) -> SkipIndex {
let data = OwnedBytes::new(data.to_owned());
SkipIndex::open(data)
}
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
@@ -90,22 +130,3 @@ impl SkipIndex {
Some(cur_checkpoint)
}
}
impl From<OwnedBytes> for SkipIndex {
fn from(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
layers.push(Layer {
data: data.slice(start_offset as usize, end_offset as usize),
});
start_offset = end_offset;
}
SkipIndex { layers }
}
}

View File

@@ -1,6 +1,6 @@
use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use crate::store::index::{Checkpoint, SkipIndex, CHECKPOINT_PERIOD};
use std::io;
use std::io::Write;
@@ -28,18 +28,20 @@ impl LayerBuilder {
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
self.block.doc_interval().map(|(start_doc, end_doc)| {
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Checkpoint {
Some(Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
}
})
})
} else {
None
}
}
fn push(&mut self, checkpoint: Checkpoint) {
@@ -48,7 +50,7 @@ impl LayerBuilder {
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
if emit_skip_info {
self.flush_block()
} else {
@@ -85,7 +87,8 @@ impl SkipIndexBuilder {
}
}
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
pub fn write<W: Write>(mut self, real_output: &mut W) -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
@@ -106,10 +109,14 @@ impl SkipIndexBuilder {
layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset));
}
layer_sizes.serialize(output)?;
layer_sizes.serialize(&mut output)?;
for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?;
}
if !SkipIndex::from_bytes(&output).is_valid() {
return Err(io::Error::new(io::ErrorKind::InvalidData, "about to write invalid skip index"));
}
real_output.write_all(&output)?;
Ok(())
}
}

View File

@@ -35,7 +35,7 @@ impl StoreReader {
let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::from(index_data);
let skip_index = SkipIndex::open(index_data);
Ok(StoreReader {
data: data_file,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),

View File

@@ -72,6 +72,7 @@ impl StoreWriter {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
@@ -86,12 +87,17 @@ impl StoreWriter {
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
self.register_checkpoint(checkpoint);
}
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint);
self.first_doc_in_block = checkpoint.end_doc;
self.doc = checkpoint.end_doc;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
@@ -100,14 +106,13 @@ impl StoreWriter {
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.offset_index_writer.insert(Checkpoint {
self.register_checkpoint(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}