mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 16:22:55 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3903e68b1 | ||
|
|
79894657df |
@@ -1,7 +1,12 @@
|
|||||||
Tantivy 0.11.0
|
Tantivy 0.10.3
|
||||||
|
==========================
|
||||||
|
|
||||||
|
- Fix crash when committing multiple times with deleted documents. #681 (@brainlock)
|
||||||
|
|
||||||
|
Tantivy 0.10.2
|
||||||
=====================
|
=====================
|
||||||
|
|
||||||
- Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima)
|
- Closes #656. Solving memory leak.
|
||||||
|
|
||||||
Tantivy 0.10.1
|
Tantivy 0.10.1
|
||||||
=====================
|
=====================
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tantivy"
|
name = "tantivy"
|
||||||
version = "0.10.1"
|
version = "0.10.3"
|
||||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
categories = ["database-implementations", "data-structures"]
|
categories = ["database-implementations", "data-structures"]
|
||||||
@@ -98,4 +98,4 @@ features = ["failpoints"]
|
|||||||
[[test]]
|
[[test]]
|
||||||
name = "failpoints"
|
name = "failpoints"
|
||||||
path = "tests/failpoints/mod.rs"
|
path = "tests/failpoints/mod.rs"
|
||||||
required-features = ["fail/failpoints"]
|
required-features = ["fail/failpoints"]
|
||||||
|
|||||||
@@ -151,6 +151,21 @@ impl SegmentMeta {
|
|||||||
self.num_deleted_docs() > 0
|
self.num_deleted_docs() > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Updates the max_doc value from the `SegmentMeta`.
|
||||||
|
///
|
||||||
|
/// This method is only used when updating `max_doc` from 0
|
||||||
|
/// as we finalize a fresh new segment.
|
||||||
|
pub(crate) fn with_max_doc(self, max_doc: u32) -> SegmentMeta {
|
||||||
|
assert_eq!(self.tracked.max_doc, 0);
|
||||||
|
assert!(self.tracked.deletes.is_none());
|
||||||
|
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
|
||||||
|
segment_id: inner_meta.segment_id,
|
||||||
|
max_doc,
|
||||||
|
deletes: None,
|
||||||
|
});
|
||||||
|
SegmentMeta { tracked }
|
||||||
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
|
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
|
||||||
let delete_meta = DeleteMeta {
|
let delete_meta = DeleteMeta {
|
||||||
|
|||||||
@@ -50,6 +50,17 @@ impl Segment {
|
|||||||
&self.meta
|
&self.meta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Updates the max_doc value from the `SegmentMeta`.
|
||||||
|
///
|
||||||
|
/// This method is only used when updating `max_doc` from 0
|
||||||
|
/// as we finalize a fresh new segment.
|
||||||
|
pub(crate) fn with_max_doc(self, max_doc: u32) -> Segment {
|
||||||
|
Segment {
|
||||||
|
index: self.index,
|
||||||
|
meta: self.meta.with_max_doc(max_doc),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
|
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
|
||||||
Segment {
|
Segment {
|
||||||
|
|||||||
@@ -141,42 +141,28 @@ impl MmapCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct InnerWatcherWrapper {
|
|
||||||
_watcher: Mutex<notify::RecommendedWatcher>,
|
|
||||||
watcher_router: WatchCallbackList,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InnerWatcherWrapper {
|
|
||||||
pub fn new(path: &Path) -> Result<(Self, Receiver<notify::RawEvent>), notify::Error> {
|
|
||||||
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
|
|
||||||
// We need to initialize the
|
|
||||||
let mut watcher = notify::raw_watcher(tx)?;
|
|
||||||
watcher.watch(path, RecursiveMode::Recursive)?;
|
|
||||||
let inner = InnerWatcherWrapper {
|
|
||||||
_watcher: Mutex::new(watcher),
|
|
||||||
watcher_router: Default::default(),
|
|
||||||
};
|
|
||||||
Ok((inner, watcher_recv))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct WatcherWrapper {
|
struct WatcherWrapper {
|
||||||
inner: Arc<InnerWatcherWrapper>,
|
_watcher: Mutex<notify::RecommendedWatcher>,
|
||||||
|
watcher_router: Arc<WatchCallbackList>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WatcherWrapper {
|
impl WatcherWrapper {
|
||||||
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
|
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
|
||||||
let (inner, watcher_recv) = InnerWatcherWrapper::new(path).map_err(|err| match err {
|
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
|
||||||
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
|
// We need to initialize the
|
||||||
_ => {
|
let watcher = notify::raw_watcher(tx)
|
||||||
panic!("Unknown error while starting watching directory {:?}", path);
|
.and_then(|mut watcher| {
|
||||||
}
|
watcher.watch(path, RecursiveMode::Recursive)?;
|
||||||
})?;
|
Ok(watcher)
|
||||||
let watcher_wrapper = WatcherWrapper {
|
})
|
||||||
inner: Arc::new(inner),
|
.map_err(|err| match err {
|
||||||
};
|
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
|
||||||
let watcher_wrapper_clone = watcher_wrapper.clone();
|
_ => {
|
||||||
|
panic!("Unknown error while starting watching directory {:?}", path);
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
let watcher_router: Arc<WatchCallbackList> = Default::default();
|
||||||
|
let watcher_router_clone = watcher_router.clone();
|
||||||
thread::Builder::new()
|
thread::Builder::new()
|
||||||
.name("meta-file-watch-thread".to_string())
|
.name("meta-file-watch-thread".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
@@ -187,7 +173,7 @@ impl WatcherWrapper {
|
|||||||
// We might want to be more accurate than this at one point.
|
// We might want to be more accurate than this at one point.
|
||||||
if let Some(filename) = changed_path.file_name() {
|
if let Some(filename) = changed_path.file_name() {
|
||||||
if filename == *META_FILEPATH {
|
if filename == *META_FILEPATH {
|
||||||
watcher_wrapper_clone.inner.watcher_router.broadcast();
|
watcher_router_clone.broadcast();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -200,13 +186,15 @@ impl WatcherWrapper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})?;
|
||||||
.expect("Failed to spawn thread to watch meta.json");
|
Ok(WatcherWrapper {
|
||||||
Ok(watcher_wrapper)
|
_watcher: Mutex::new(watcher),
|
||||||
|
watcher_router,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
|
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
|
||||||
self.inner.watcher_router.subscribe(watch_callback)
|
self.watcher_router.subscribe(watch_callback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,11 +10,14 @@ use std::io::Write;
|
|||||||
/// Write a delete `BitSet`
|
/// Write a delete `BitSet`
|
||||||
///
|
///
|
||||||
/// where `delete_bitset` is the set of deleted `DocId`.
|
/// where `delete_bitset` is the set of deleted `DocId`.
|
||||||
pub fn write_delete_bitset(delete_bitset: &BitSet, writer: &mut WritePtr) -> io::Result<()> {
|
pub fn write_delete_bitset(
|
||||||
let max_doc = delete_bitset.capacity();
|
delete_bitset: &BitSet,
|
||||||
|
max_doc: u32,
|
||||||
|
writer: &mut WritePtr,
|
||||||
|
) -> io::Result<()> {
|
||||||
let mut byte = 0u8;
|
let mut byte = 0u8;
|
||||||
let mut shift = 0u8;
|
let mut shift = 0u8;
|
||||||
for doc in 0..max_doc {
|
for doc in 0..(max_doc as usize) {
|
||||||
if delete_bitset.contains(doc) {
|
if delete_bitset.contains(doc) {
|
||||||
byte |= 1 << shift;
|
byte |= 1 << shift;
|
||||||
}
|
}
|
||||||
@@ -86,18 +89,17 @@ mod tests {
|
|||||||
use bit_set::BitSet;
|
use bit_set::BitSet;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
fn test_delete_bitset_helper(bitset: &BitSet) {
|
fn test_delete_bitset_helper(bitset: &BitSet, max_doc: u32) {
|
||||||
let test_path = PathBuf::from("test");
|
let test_path = PathBuf::from("test");
|
||||||
let mut directory = RAMDirectory::create();
|
let mut directory = RAMDirectory::create();
|
||||||
{
|
{
|
||||||
let mut writer = directory.open_write(&*test_path).unwrap();
|
let mut writer = directory.open_write(&*test_path).unwrap();
|
||||||
write_delete_bitset(bitset, &mut writer).unwrap();
|
write_delete_bitset(bitset, max_doc, &mut writer).unwrap();
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let source = directory.open_read(&test_path).unwrap();
|
let source = directory.open_read(&test_path).unwrap();
|
||||||
let delete_bitset = DeleteBitSet::open(source);
|
let delete_bitset = DeleteBitSet::open(source);
|
||||||
let n = bitset.capacity();
|
for doc in 0..max_doc as usize {
|
||||||
for doc in 0..n {
|
|
||||||
assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId));
|
assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId));
|
||||||
}
|
}
|
||||||
assert_eq!(delete_bitset.len(), bitset.len());
|
assert_eq!(delete_bitset.len(), bitset.len());
|
||||||
@@ -110,7 +112,7 @@ mod tests {
|
|||||||
let mut bitset = BitSet::with_capacity(10);
|
let mut bitset = BitSet::with_capacity(10);
|
||||||
bitset.insert(1);
|
bitset.insert(1);
|
||||||
bitset.insert(9);
|
bitset.insert(9);
|
||||||
test_delete_bitset_helper(&bitset);
|
test_delete_bitset_helper(&bitset, 10);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let mut bitset = BitSet::with_capacity(8);
|
let mut bitset = BitSet::with_capacity(8);
|
||||||
@@ -119,7 +121,7 @@ mod tests {
|
|||||||
bitset.insert(3);
|
bitset.insert(3);
|
||||||
bitset.insert(5);
|
bitset.insert(5);
|
||||||
bitset.insert(7);
|
bitset.insert(7);
|
||||||
test_delete_bitset_helper(&bitset);
|
test_delete_bitset_helper(&bitset, 8);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -147,7 +147,6 @@ pub(crate) fn advance_deletes(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let delete_cursor = segment_entry.delete_cursor();
|
let delete_cursor = segment_entry.delete_cursor();
|
||||||
|
|
||||||
compute_deleted_bitset(
|
compute_deleted_bitset(
|
||||||
&mut delete_bitset,
|
&mut delete_bitset,
|
||||||
&segment_reader,
|
&segment_reader,
|
||||||
@@ -167,8 +166,8 @@ pub(crate) fn advance_deletes(
|
|||||||
if num_deleted_docs > 0 {
|
if num_deleted_docs > 0 {
|
||||||
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
|
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||||
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
|
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
|
||||||
write_delete_bitset(&delete_bitset, &mut delete_file)?;
|
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
segment_entry.set_meta(segment.meta().clone());
|
segment_entry.set_meta(segment.meta().clone());
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -176,13 +175,13 @@ pub(crate) fn advance_deletes(
|
|||||||
|
|
||||||
fn index_documents(
|
fn index_documents(
|
||||||
memory_budget: usize,
|
memory_budget: usize,
|
||||||
segment: &Segment,
|
segment: Segment,
|
||||||
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
|
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
|
||||||
segment_updater: &mut SegmentUpdater,
|
segment_updater: &mut SegmentUpdater,
|
||||||
mut delete_cursor: DeleteCursor,
|
mut delete_cursor: DeleteCursor,
|
||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
let schema = segment.schema();
|
let schema = segment.schema();
|
||||||
let segment_id = segment.id();
|
|
||||||
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
|
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
|
||||||
for document_group in grouped_document_iterator {
|
for document_group in grouped_document_iterator {
|
||||||
for doc in document_group {
|
for doc in document_group {
|
||||||
@@ -202,24 +201,28 @@ fn index_documents(
|
|||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_docs = segment_writer.max_doc();
|
let max_doc = segment_writer.max_doc();
|
||||||
|
|
||||||
// this is ensured by the call to peek before starting
|
// this is ensured by the call to peek before starting
|
||||||
// the worker thread.
|
// the worker thread.
|
||||||
assert!(num_docs > 0);
|
assert!(max_doc > 0);
|
||||||
|
|
||||||
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
|
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
|
||||||
let segment_meta = segment
|
let segment_with_max_doc = segment.with_max_doc(max_doc);
|
||||||
.index()
|
|
||||||
.inventory()
|
|
||||||
.new_segment_meta(segment_id, num_docs);
|
|
||||||
|
|
||||||
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
|
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
|
||||||
|
|
||||||
let delete_bitset_opt =
|
let delete_bitset_opt = apply_deletes(
|
||||||
apply_deletes(&segment, &mut delete_cursor, &doc_opstamps, last_docstamp)?;
|
&segment_with_max_doc,
|
||||||
|
&mut delete_cursor,
|
||||||
|
&doc_opstamps,
|
||||||
|
last_docstamp,
|
||||||
|
)?;
|
||||||
|
|
||||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
|
let segment_entry = SegmentEntry::new(
|
||||||
|
segment_with_max_doc.meta().clone(),
|
||||||
|
delete_cursor,
|
||||||
|
delete_bitset_opt,
|
||||||
|
);
|
||||||
Ok(segment_updater.add_segment(segment_entry))
|
Ok(segment_updater.add_segment(segment_entry))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -236,7 +239,9 @@ fn apply_deletes(
|
|||||||
}
|
}
|
||||||
let segment_reader = SegmentReader::open(segment)?;
|
let segment_reader = SegmentReader::open(segment)?;
|
||||||
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
|
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
|
||||||
let mut deleted_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
|
|
||||||
|
let max_doc = segment.meta().max_doc();
|
||||||
|
let mut deleted_bitset = BitSet::with_capacity(max_doc as usize);
|
||||||
let may_have_deletes = compute_deleted_bitset(
|
let may_have_deletes = compute_deleted_bitset(
|
||||||
&mut deleted_bitset,
|
&mut deleted_bitset,
|
||||||
&segment_reader,
|
&segment_reader,
|
||||||
@@ -408,7 +413,7 @@ impl IndexWriter {
|
|||||||
let segment = index.new_segment();
|
let segment = index.new_segment();
|
||||||
index_documents(
|
index_documents(
|
||||||
mem_budget,
|
mem_budget,
|
||||||
&segment,
|
segment,
|
||||||
&mut document_iterator,
|
&mut document_iterator,
|
||||||
&mut segment_updater,
|
&mut segment_updater,
|
||||||
delete_cursor.clone(),
|
delete_cursor.clone(),
|
||||||
|
|||||||
@@ -28,3 +28,25 @@ pub use self::segment_writer::SegmentWriter;
|
|||||||
|
|
||||||
/// Alias for the default merge policy, which is the `LogMergePolicy`.
|
/// Alias for the default merge policy, which is the `LogMergePolicy`.
|
||||||
pub type DefaultMergePolicy = LogMergePolicy;
|
pub type DefaultMergePolicy = LogMergePolicy;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::schema::{self, Schema};
|
||||||
|
use crate::{Index, Term};
|
||||||
|
#[test]
|
||||||
|
fn test_advance_delete_bug() {
|
||||||
|
let mut schema_builder = Schema::builder();
|
||||||
|
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||||
|
let index = Index::create_from_tempdir(schema_builder.build()).unwrap();
|
||||||
|
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||||
|
// there must be one deleted document in the segment
|
||||||
|
index_writer.add_document(doc!(text_field=>"b"));
|
||||||
|
index_writer.delete_term(Term::from_field_text(text_field, "b"));
|
||||||
|
// we need enough data to trigger the bug (at least 32 documents)
|
||||||
|
for _ in 0..32 {
|
||||||
|
index_writer.add_document(doc!(text_field=>"c"));
|
||||||
|
}
|
||||||
|
index_writer.commit().unwrap();
|
||||||
|
index_writer.commit().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -12,6 +12,19 @@ use std::io::Write;
|
|||||||
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
|
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
|
||||||
pub struct Field(pub u32);
|
pub struct Field(pub u32);
|
||||||
|
|
||||||
|
impl Field {
|
||||||
|
/// Create a new field object for the given FieldId.
|
||||||
|
pub fn from_field_id(field_id: u32) -> Field {
|
||||||
|
Field(field_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a u32 identifying uniquely a field within a schema.
|
||||||
|
#[allow(clippy::trivially_copy_pass_by_ref)]
|
||||||
|
pub fn field_id(&self) -> u32 {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl BinarySerializable for Field {
|
impl BinarySerializable for Field {
|
||||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||||
self.0.serialize(writer)
|
self.0.serialize(writer)
|
||||||
|
|||||||
Reference in New Issue
Block a user