Compare commits

...

2 Commits

Author SHA1 Message Date
Paul Masurel
7ef5c007c3 Hotfix #681
Fixing the construction of the DeleteBitset.
Cherry picked from master.

Changing version to 0.10.3
2019-11-10 13:49:03 +09:00
Paul Masurel
79894657df Address #656
Broke the reference loop to make sure that the watch_router can
be dropped, and the thread exits.
2019-09-30 12:54:02 +09:00
9 changed files with 128 additions and 66 deletions

View File

@@ -1,7 +1,12 @@
Tantivy 0.11.0
Tantivy 0.10.3
==========================
- Fix crash when committing multiple times with deleted documents. #681 (@brainlock)
Tantivy 0.10.2
=====================
- Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima)
- Closes #656. Solving memory leak.
Tantivy 0.10.1
=====================

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.10.1"
version = "0.10.3"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -86,6 +86,7 @@ travis-ci = { repository = "tantivy-search/tantivy" }
[dev-dependencies.fail]
features = ["failpoints"]
version = "0.3"
# Following the "fail" crate best practises, we isolate
@@ -98,4 +99,4 @@ features = ["failpoints"]
[[test]]
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
required-features = ["fail/failpoints"]

View File

@@ -151,6 +151,21 @@ impl SegmentMeta {
self.num_deleted_docs() > 0
}
/// Updates the max_doc value from the `SegmentMeta`.
///
/// This method is only used when updating `max_doc` from 0
/// as we finalize a fresh new segment.
pub(crate) fn with_max_doc(self, max_doc: u32) -> SegmentMeta {
assert_eq!(self.tracked.max_doc, 0);
assert!(self.tracked.deletes.is_none());
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc,
deletes: None,
});
SegmentMeta { tracked }
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
let delete_meta = DeleteMeta {

View File

@@ -50,6 +50,17 @@ impl Segment {
&self.meta
}
/// Updates the max_doc value from the `SegmentMeta`.
///
/// This method is only used when updating `max_doc` from 0
/// as we finalize a fresh new segment.
pub(crate) fn with_max_doc(self, max_doc: u32) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_max_doc(max_doc),
}
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
Segment {

View File

@@ -141,42 +141,28 @@ impl MmapCache {
}
}
struct InnerWatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: WatchCallbackList,
}
impl InnerWatcherWrapper {
pub fn new(path: &Path) -> Result<(Self, Receiver<notify::RawEvent>), notify::Error> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let mut watcher = notify::raw_watcher(tx)?;
watcher.watch(path, RecursiveMode::Recursive)?;
let inner = InnerWatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router: Default::default(),
};
Ok((inner, watcher_recv))
}
}
#[derive(Clone)]
struct WatcherWrapper {
inner: Arc<InnerWatcherWrapper>,
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (inner, watcher_recv) = InnerWatcherWrapper::new(path).map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_wrapper = WatcherWrapper {
inner: Arc::new(inner),
};
let watcher_wrapper_clone = watcher_wrapper.clone();
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
.and_then(|mut watcher| {
watcher.watch(path, RecursiveMode::Recursive)?;
Ok(watcher)
})
.map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
.spawn(move || {
@@ -187,7 +173,7 @@ impl WatcherWrapper {
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
watcher_wrapper_clone.inner.watcher_router.broadcast();
watcher_router_clone.broadcast();
}
}
}
@@ -200,13 +186,15 @@ impl WatcherWrapper {
}
}
}
})
.expect("Failed to spawn thread to watch meta.json");
Ok(watcher_wrapper)
})?;
Ok(WatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router,
})
}
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.inner.watcher_router.subscribe(watch_callback)
self.watcher_router.subscribe(watch_callback)
}
}

View File

@@ -10,11 +10,14 @@ use std::io::Write;
/// Write a delete `BitSet`
///
/// where `delete_bitset` is the set of deleted `DocId`.
pub fn write_delete_bitset(delete_bitset: &BitSet, writer: &mut WritePtr) -> io::Result<()> {
let max_doc = delete_bitset.capacity();
pub fn write_delete_bitset(
delete_bitset: &BitSet,
max_doc: u32,
writer: &mut WritePtr,
) -> io::Result<()> {
let mut byte = 0u8;
let mut shift = 0u8;
for doc in 0..max_doc {
for doc in 0..(max_doc as usize) {
if delete_bitset.contains(doc) {
byte |= 1 << shift;
}
@@ -86,18 +89,17 @@ mod tests {
use bit_set::BitSet;
use std::path::PathBuf;
fn test_delete_bitset_helper(bitset: &BitSet) {
fn test_delete_bitset_helper(bitset: &BitSet, max_doc: u32) {
let test_path = PathBuf::from("test");
let mut directory = RAMDirectory::create();
{
let mut writer = directory.open_write(&*test_path).unwrap();
write_delete_bitset(bitset, &mut writer).unwrap();
write_delete_bitset(bitset, max_doc, &mut writer).unwrap();
}
{
let source = directory.open_read(&test_path).unwrap();
let delete_bitset = DeleteBitSet::open(source);
let n = bitset.capacity();
for doc in 0..n {
for doc in 0..max_doc as usize {
assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId));
}
assert_eq!(delete_bitset.len(), bitset.len());
@@ -110,7 +112,7 @@ mod tests {
let mut bitset = BitSet::with_capacity(10);
bitset.insert(1);
bitset.insert(9);
test_delete_bitset_helper(&bitset);
test_delete_bitset_helper(&bitset, 10);
}
{
let mut bitset = BitSet::with_capacity(8);
@@ -119,7 +121,7 @@ mod tests {
bitset.insert(3);
bitset.insert(5);
bitset.insert(7);
test_delete_bitset_helper(&bitset);
test_delete_bitset_helper(&bitset, 8);
}
}
}

View File

@@ -147,7 +147,6 @@ pub(crate) fn advance_deletes(
};
let delete_cursor = segment_entry.delete_cursor();
compute_deleted_bitset(
&mut delete_bitset,
&segment_reader,
@@ -167,8 +166,8 @@ pub(crate) fn advance_deletes(
if num_deleted_docs > 0 {
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
}
}
segment_entry.set_meta(segment.meta().clone());
Ok(())
@@ -176,13 +175,13 @@ pub(crate) fn advance_deletes(
fn index_documents(
memory_budget: usize,
segment: &Segment,
segment: Segment,
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor,
) -> Result<bool> {
let schema = segment.schema();
let segment_id = segment.id();
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
for document_group in grouped_document_iterator {
for doc in document_group {
@@ -202,24 +201,28 @@ fn index_documents(
return Ok(false);
}
let num_docs = segment_writer.max_doc();
let max_doc = segment_writer.max_doc();
// this is ensured by the call to peek before starting
// the worker thread.
assert!(num_docs > 0);
assert!(max_doc > 0);
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
let segment_meta = segment
.index()
.inventory()
.new_segment_meta(segment_id, num_docs);
let segment_with_max_doc = segment.with_max_doc(max_doc);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
let delete_bitset_opt =
apply_deletes(&segment, &mut delete_cursor, &doc_opstamps, last_docstamp)?;
let delete_bitset_opt = apply_deletes(
&segment_with_max_doc,
&mut delete_cursor,
&doc_opstamps,
last_docstamp,
)?;
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
let segment_entry = SegmentEntry::new(
segment_with_max_doc.meta().clone(),
delete_cursor,
delete_bitset_opt,
);
Ok(segment_updater.add_segment(segment_entry))
}
@@ -236,7 +239,9 @@ fn apply_deletes(
}
let segment_reader = SegmentReader::open(segment)?;
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let mut deleted_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
let max_doc = segment.meta().max_doc();
let mut deleted_bitset = BitSet::with_capacity(max_doc as usize);
let may_have_deletes = compute_deleted_bitset(
&mut deleted_bitset,
&segment_reader,
@@ -408,7 +413,7 @@ impl IndexWriter {
let segment = index.new_segment();
index_documents(
mem_budget,
&segment,
segment,
&mut document_iterator,
&mut segment_updater,
delete_cursor.clone(),

View File

@@ -28,3 +28,25 @@ pub use self::segment_writer::SegmentWriter;
/// Alias for the default merge policy, which is the `LogMergePolicy`.
pub type DefaultMergePolicy = LogMergePolicy;
#[cfg(test)]
mod tests {
use crate::schema::{self, Schema};
use crate::{Index, Term};
#[test]
fn test_advance_delete_bug() {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_from_tempdir(schema_builder.build()).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
// there must be one deleted document in the segment
index_writer.add_document(doc!(text_field=>"b"));
index_writer.delete_term(Term::from_field_text(text_field, "b"));
// we need enough data to trigger the bug (at least 32 documents)
for _ in 0..32 {
index_writer.add_document(doc!(text_field=>"c"));
}
index_writer.commit().unwrap();
index_writer.commit().unwrap();
}
}

View File

@@ -12,6 +12,19 @@ use std::io::Write;
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
pub struct Field(pub u32);
impl Field {
/// Create a new field object for the given FieldId.
pub fn from_field_id(field_id: u32) -> Field {
Field(field_id)
}
/// Returns a u32 identifying uniquely a field within a schema.
#[allow(clippy::trivially_copy_pass_by_ref)]
pub fn field_id(&self) -> u32 {
self.0
}
}
impl BinarySerializable for Field {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.0.serialize(writer)