mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
Compare commits
1 Commits
range
...
fail-fast-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
acfb057462 |
@@ -44,12 +44,12 @@ impl VecWriter {
|
|||||||
|
|
||||||
impl Drop for VecWriter {
|
impl Drop for VecWriter {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if !self.is_flushed {
|
// if !self.is_flushed {
|
||||||
panic!(
|
// panic!(
|
||||||
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
|
// "You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
|
||||||
self.path
|
// self.path
|
||||||
)
|
// )
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,9 +25,10 @@ use futures::future::Future;
|
|||||||
use futures::future::TryFutureExt;
|
use futures::future::TryFutureExt;
|
||||||
use std::borrow::BorrowMut;
|
use std::borrow::BorrowMut;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::io::Write;
|
use std::io::{self, Write};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::process;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
@@ -409,6 +410,13 @@ impl SegmentUpdater {
|
|||||||
let _send_result = merging_future_send.send(segment_meta);
|
let _send_result = merging_future_send.send(segment_meta);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
if let crate::TantivyError::IOError(ref io_err) = &e {
|
||||||
|
if io_err.kind() == io::ErrorKind::InvalidData {
|
||||||
|
println!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
|
||||||
|
error!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
|
||||||
|
process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
warn!(
|
warn!(
|
||||||
"Merge of {:?} was cancelled: {:?}",
|
"Merge of {:?} was cancelled: {:?}",
|
||||||
merge_operation.segment_ids().to_vec(),
|
merge_operation.segment_ids().to_vec(),
|
||||||
@@ -423,7 +431,9 @@ impl SegmentUpdater {
|
|||||||
});
|
});
|
||||||
|
|
||||||
Ok(merging_future_recv
|
Ok(merging_future_recv
|
||||||
.unwrap_or_else(|_| Err(crate::TantivyError::SystemError("Merge failed".to_string()))))
|
.unwrap_or_else(|e| {
|
||||||
|
Err(crate::TantivyError::SystemError("Merge failed".to_string()))
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn consider_merge_options(&self) {
|
async fn consider_merge_options(&self) {
|
||||||
|
|||||||
@@ -77,6 +77,28 @@ impl SkipIndex {
|
|||||||
SkipIndex { layers }
|
SkipIndex { layers }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_valid(&self) -> bool {
|
||||||
|
let checkpoints: Vec<Checkpoint> = self.checkpoints().collect();
|
||||||
|
let mut prev_checkpoint = Checkpoint {
|
||||||
|
start_doc: 0u32,
|
||||||
|
end_doc: 0u32,
|
||||||
|
start_offset: 0u64,
|
||||||
|
end_offset: 0u64,
|
||||||
|
};
|
||||||
|
for checkpoint in checkpoints {
|
||||||
|
if !checkpoint.follows(&prev_checkpoint) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
prev_checkpoint = checkpoint;
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn from_bytes(data: &[u8]) -> SkipIndex {
|
||||||
|
let data = OwnedBytes::new(data.to_owned());
|
||||||
|
SkipIndex::open(data)
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||||
self.layers
|
self.layers
|
||||||
.last()
|
.last()
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use crate::common::{BinarySerializable, VInt};
|
use crate::common::{BinarySerializable, VInt};
|
||||||
use crate::store::index::block::CheckpointBlock;
|
use crate::store::index::block::CheckpointBlock;
|
||||||
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
|
use crate::store::index::{Checkpoint, SkipIndex, CHECKPOINT_PERIOD};
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|
||||||
@@ -87,7 +87,8 @@ impl SkipIndexBuilder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
|
pub fn write<W: Write>(mut self, real_output: &mut W) -> io::Result<()> {
|
||||||
|
let mut output: Vec<u8> = Vec::new();
|
||||||
let mut last_pointer = None;
|
let mut last_pointer = None;
|
||||||
for skip_layer in self.layers.iter_mut() {
|
for skip_layer in self.layers.iter_mut() {
|
||||||
if let Some(checkpoint) = last_pointer {
|
if let Some(checkpoint) = last_pointer {
|
||||||
@@ -108,10 +109,14 @@ impl SkipIndexBuilder {
|
|||||||
layer_offset += layer_buffer.len() as u64;
|
layer_offset += layer_buffer.len() as u64;
|
||||||
layer_sizes.push(VInt(layer_offset));
|
layer_sizes.push(VInt(layer_offset));
|
||||||
}
|
}
|
||||||
layer_sizes.serialize(output)?;
|
layer_sizes.serialize(&mut output)?;
|
||||||
for layer_buffer in layer_buffers {
|
for layer_buffer in layer_buffers {
|
||||||
output.write_all(&layer_buffer[..])?;
|
output.write_all(&layer_buffer[..])?;
|
||||||
}
|
}
|
||||||
|
if !SkipIndex::from_bytes(&output).is_valid() {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::InvalidData, "about to write invalid skip index"));
|
||||||
|
}
|
||||||
|
real_output.write_all(&output)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user