mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-27 20:42:54 +00:00
Compare commits
10 Commits
issue/896
...
segment_wr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7428f6ccd3 | ||
|
|
8a488b8315 | ||
|
|
4448854f73 | ||
|
|
b449749d63 | ||
|
|
2ba6a12ddc | ||
|
|
bbbf95018b | ||
|
|
6d76a82fea | ||
|
|
444048d225 | ||
|
|
4c6bbca661 | ||
|
|
c3d3b3113b |
@@ -1,3 +1,7 @@
|
||||
Tantivy 0.12.1
|
||||
=====================
|
||||
- By default IndexReader are in `Manual` mode.
|
||||
|
||||
Tantivy 0.12.0
|
||||
======================
|
||||
- Removing static dispatch in tokenizers for simplicity. (#762)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use super::segment::create_segment;
|
||||
use super::segment::Segment;
|
||||
use crate::core::Executor;
|
||||
use crate::core::IndexMeta;
|
||||
@@ -283,7 +282,7 @@ impl Index {
|
||||
TantivyError::LockFailure(
|
||||
err,
|
||||
Some(
|
||||
"Failed to acquire index lock. If you are using\
|
||||
"Failed to acquire index lock. If you are using \
|
||||
a regular directory, this means there is already an \
|
||||
`IndexWriter` working on this `Directory`, in this process \
|
||||
or in a different process."
|
||||
@@ -335,9 +334,8 @@ impl Index {
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
|
||||
create_segment(self.clone(), segment_meta)
|
||||
pub(crate) fn segment(&self, segment_meta: SegmentMeta) -> Segment {
|
||||
Segment::for_index(self.clone(), segment_meta)
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
@@ -348,6 +346,13 @@ impl Index {
|
||||
self.segment(segment_meta)
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
pub(crate) fn new_segment_unpersisted(&self) -> Segment {
|
||||
let meta = self
|
||||
.inventory
|
||||
.new_segment_meta(SegmentId::generate_random(), 0);
|
||||
Segment::new_volatile(meta, self.schema())
|
||||
}
|
||||
/// Return a reference to the index directory.
|
||||
pub fn directory(&self) -> &ManagedDirectory {
|
||||
&self.directory
|
||||
@@ -433,7 +438,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_should_wipeoff_existing() {
|
||||
fn create_should_wipe_off_existing() {
|
||||
let directory = RAMDirectory::create();
|
||||
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
|
||||
assert!(Index::exists(&directory));
|
||||
@@ -470,7 +475,7 @@ mod tests {
|
||||
.try_into()
|
||||
.unwrap();
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
test_index_on_commit_reload_policy_aux(field, &index, &reader);
|
||||
test_index_on_commit_reload_policy_aux(field, index.clone(), &index, &reader);
|
||||
}
|
||||
|
||||
#[cfg(feature = "mmap")]
|
||||
@@ -494,7 +499,7 @@ mod tests {
|
||||
.try_into()
|
||||
.unwrap();
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
test_index_on_commit_reload_policy_aux(field, &index, &reader);
|
||||
test_index_on_commit_reload_policy_aux(field, index.clone(), &index, &reader);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -536,12 +541,16 @@ mod tests {
|
||||
.try_into()
|
||||
.unwrap();
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
test_index_on_commit_reload_policy_aux(field, &write_index, &reader);
|
||||
test_index_on_commit_reload_policy_aux(field, read_index, &write_index, &reader);
|
||||
}
|
||||
}
|
||||
|
||||
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
|
||||
let mut reader_index = reader.index();
|
||||
fn test_index_on_commit_reload_policy_aux(
|
||||
field: Field,
|
||||
mut reader_index: Index,
|
||||
index: &Index,
|
||||
reader: &IndexReader,
|
||||
) {
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || {
|
||||
let _ = sender.send(());
|
||||
|
||||
@@ -3,19 +3,59 @@ use crate::core::Index;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::error::{OpenReadError, OpenWriteError};
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::{Directory, ManagedDirectory, RAMDirectory};
|
||||
use crate::directory::{ReadOnlySource, WritePtr};
|
||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
||||
use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use std::fmt;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum SegmentDirectory {
|
||||
Persisted(ManagedDirectory),
|
||||
Volatile(RAMDirectory),
|
||||
}
|
||||
|
||||
impl SegmentDirectory {
|
||||
pub fn new_volatile() -> SegmentDirectory {
|
||||
SegmentDirectory::Volatile(RAMDirectory::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ManagedDirectory> for SegmentDirectory {
|
||||
fn from(directory: ManagedDirectory) -> Self {
|
||||
SegmentDirectory::Persisted(directory)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SegmentDirectory {
|
||||
type Target = dyn Directory;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match self {
|
||||
SegmentDirectory::Volatile(dir) => dir,
|
||||
SegmentDirectory::Persisted(dir) => dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SegmentDirectory {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match self {
|
||||
SegmentDirectory::Volatile(dir) => dir,
|
||||
SegmentDirectory::Persisted(dir) => dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A segment is a piece of the index.
|
||||
#[derive(Clone)]
|
||||
pub struct Segment {
|
||||
index: Index,
|
||||
schema: Schema,
|
||||
meta: SegmentMeta,
|
||||
directory: SegmentDirectory,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Segment {
|
||||
@@ -24,23 +64,56 @@ impl fmt::Debug for Segment {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
///
|
||||
/// The function is here to make it private outside `tantivy`.
|
||||
/// #[doc(hidden)]
|
||||
pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment { index, meta }
|
||||
}
|
||||
|
||||
impl Segment {
|
||||
/// Returns the index the segment belongs to.
|
||||
pub fn index(&self) -> &Index {
|
||||
&self.index
|
||||
/// Returns our index's schema.
|
||||
// TODO return a ref.
|
||||
pub fn schema(&self) -> Schema {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
/// Returns our index's schema.
|
||||
pub fn schema(&self) -> Schema {
|
||||
self.index.schema()
|
||||
pub(crate) fn new_persisted(
|
||||
meta: SegmentMeta,
|
||||
directory: ManagedDirectory,
|
||||
schema: Schema,
|
||||
) -> Segment {
|
||||
Segment {
|
||||
meta,
|
||||
schema,
|
||||
directory: SegmentDirectory::from(directory),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment that embeds its own `RAMDirectory`.
|
||||
///
|
||||
/// That segment is entirely dissociated from the index directory.
|
||||
/// It will be persisted by a background thread in charge of IO.
|
||||
pub fn new_volatile(meta: SegmentMeta, schema: Schema) -> Segment {
|
||||
Segment {
|
||||
schema,
|
||||
meta,
|
||||
directory: SegmentDirectory::new_volatile(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new segment given an `Index` and a `SegmentId`
|
||||
pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment {
|
||||
Segment {
|
||||
directory: SegmentDirectory::Persisted(index.directory().clone()),
|
||||
schema: index.schema(),
|
||||
meta,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn persist(&mut self, mut dest_directory: ManagedDirectory) -> crate::Result<()> {
|
||||
if let SegmentDirectory::Persisted(_) = self.directory {
|
||||
// this segment is already persisted.
|
||||
return Ok(());
|
||||
}
|
||||
if let SegmentDirectory::Volatile(ram_directory) = &self.directory {
|
||||
ram_directory.persist(&mut dest_directory)?;
|
||||
}
|
||||
self.directory = SegmentDirectory::Persisted(dest_directory);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the segment meta-information
|
||||
@@ -54,7 +127,8 @@ impl Segment {
|
||||
/// as we finalize a fresh new segment.
|
||||
pub(crate) fn with_max_doc(self, max_doc: u32) -> Segment {
|
||||
Segment {
|
||||
index: self.index,
|
||||
directory: self.directory,
|
||||
schema: self.schema,
|
||||
meta: self.meta.with_max_doc(max_doc),
|
||||
}
|
||||
}
|
||||
@@ -62,7 +136,8 @@ impl Segment {
|
||||
#[doc(hidden)]
|
||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
|
||||
Segment {
|
||||
index: self.index,
|
||||
directory: self.directory,
|
||||
schema: self.schema,
|
||||
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),
|
||||
}
|
||||
}
|
||||
@@ -83,14 +158,14 @@ impl Segment {
|
||||
/// Open one of the component file for a *regular* read.
|
||||
pub fn open_read(&self, component: SegmentComponent) -> Result<ReadOnlySource, OpenReadError> {
|
||||
let path = self.relative_path(component);
|
||||
let source = self.index.directory().open_read(&path)?;
|
||||
let source = self.directory.open_read(&path)?;
|
||||
Ok(source)
|
||||
}
|
||||
|
||||
/// Open one of the component file for *regular* write.
|
||||
pub fn open_write(&mut self, component: SegmentComponent) -> Result<WritePtr, OpenWriteError> {
|
||||
let path = self.relative_path(component);
|
||||
let write = self.index.directory_mut().open_write(&path)?;
|
||||
let write = self.directory.open_write(&path)?;
|
||||
Ok(write)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,68 @@ pub struct SegmentReader {
|
||||
}
|
||||
|
||||
impl SegmentReader {
|
||||
/// Open a new segment for reading.
|
||||
pub fn open(segment: &Segment) -> crate::Result<SegmentReader> {
|
||||
let termdict_source = segment.open_read(SegmentComponent::TERMS)?;
|
||||
let termdict_composite = CompositeFile::open(&termdict_source)?;
|
||||
|
||||
let store_source = segment.open_read(SegmentComponent::STORE)?;
|
||||
|
||||
fail_point!("SegmentReader::open#middle");
|
||||
|
||||
let postings_source = segment.open_read(SegmentComponent::POSTINGS)?;
|
||||
let postings_composite = CompositeFile::open(&postings_source)?;
|
||||
|
||||
let positions_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let positions_idx_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let schema = segment.schema();
|
||||
|
||||
let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
|
||||
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
|
||||
let fast_field_readers =
|
||||
Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?);
|
||||
|
||||
let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
|
||||
let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?;
|
||||
|
||||
let delete_bitset_opt = if segment.meta().has_deletes() {
|
||||
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
Some(DeleteBitSet::open(delete_data))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(SegmentReader {
|
||||
inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
max_doc: segment.meta().max_doc(),
|
||||
num_docs: segment.meta().num_docs(),
|
||||
termdict_composite,
|
||||
postings_composite,
|
||||
fast_fields_readers: fast_field_readers,
|
||||
fieldnorms_composite,
|
||||
segment_id: segment.id(),
|
||||
store_source,
|
||||
delete_bitset_opt,
|
||||
positions_composite,
|
||||
positions_idx_composite,
|
||||
schema,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the highest document id ever attributed in
|
||||
/// this segment + 1.
|
||||
/// Today, `tantivy` does not handle deletes, so it happens
|
||||
@@ -142,68 +204,68 @@ impl SegmentReader {
|
||||
pub fn get_store_reader(&self) -> StoreReader {
|
||||
StoreReader::from_source(self.store_source.clone())
|
||||
}
|
||||
|
||||
/// Open a new segment for reading.
|
||||
pub fn open(segment: &Segment) -> crate::Result<SegmentReader> {
|
||||
let termdict_source = segment.open_read(SegmentComponent::TERMS)?;
|
||||
let termdict_composite = CompositeFile::open(&termdict_source)?;
|
||||
|
||||
let store_source = segment.open_read(SegmentComponent::STORE)?;
|
||||
|
||||
fail_point!("SegmentReader::open#middle");
|
||||
|
||||
let postings_source = segment.open_read(SegmentComponent::POSTINGS)?;
|
||||
let postings_composite = CompositeFile::open(&postings_source)?;
|
||||
|
||||
let positions_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let positions_idx_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let schema = segment.schema();
|
||||
|
||||
let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
|
||||
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
|
||||
let fast_field_readers =
|
||||
Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?);
|
||||
|
||||
let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
|
||||
let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?;
|
||||
|
||||
let delete_bitset_opt = if segment.meta().has_deletes() {
|
||||
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
Some(DeleteBitSet::open(delete_data))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(SegmentReader {
|
||||
inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
max_doc: segment.meta().max_doc(),
|
||||
num_docs: segment.meta().num_docs(),
|
||||
termdict_composite,
|
||||
postings_composite,
|
||||
fast_fields_readers: fast_field_readers,
|
||||
fieldnorms_composite,
|
||||
segment_id: segment.id(),
|
||||
store_source,
|
||||
delete_bitset_opt,
|
||||
positions_composite,
|
||||
positions_idx_composite,
|
||||
schema,
|
||||
})
|
||||
}
|
||||
//
|
||||
// /// Open a new segment for reading.
|
||||
// pub fn open(segment: &Segment) -> crate::Result<SegmentReader> {
|
||||
// let termdict_source = segment.open_read(SegmentComponent::TERMS)?;
|
||||
// let termdict_composite = CompositeFile::open(&termdict_source)?;
|
||||
//
|
||||
// let store_source = segment.open_read(SegmentComponent::STORE)?;
|
||||
//
|
||||
// fail_point!("SegmentReader::open#middle");
|
||||
//
|
||||
// let postings_source = segment.open_read(SegmentComponent::POSTINGS)?;
|
||||
// let postings_composite = CompositeFile::open(&postings_source)?;
|
||||
//
|
||||
// let positions_composite = {
|
||||
// if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) {
|
||||
// CompositeFile::open(&source)?
|
||||
// } else {
|
||||
// CompositeFile::empty()
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// let positions_idx_composite = {
|
||||
// if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) {
|
||||
// CompositeFile::open(&source)?
|
||||
// } else {
|
||||
// CompositeFile::empty()
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// let schema = segment.schema();
|
||||
//
|
||||
// let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
|
||||
// let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
|
||||
// let fast_field_readers =
|
||||
// Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?);
|
||||
//
|
||||
// let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
|
||||
// let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?;
|
||||
//
|
||||
// let delete_bitset_opt = if segment.meta().has_deletes() {
|
||||
// let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
// Some(DeleteBitSet::open(delete_data))
|
||||
// } else {
|
||||
// None
|
||||
// };
|
||||
//
|
||||
// Ok(SegmentReader {
|
||||
// inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
// max_doc: segment.meta().max_doc(),
|
||||
// num_docs: segment.meta().num_docs(),
|
||||
// termdict_composite,
|
||||
// postings_composite,
|
||||
// fast_fields_readers: fast_field_readers,
|
||||
// fieldnorms_composite,
|
||||
// segment_id: segment.id(),
|
||||
// store_source,
|
||||
// delete_bitset_opt,
|
||||
// positions_composite,
|
||||
// positions_idx_composite,
|
||||
// schema,
|
||||
// })
|
||||
// }
|
||||
|
||||
/// Returns a field reader associated to the field given in argument.
|
||||
/// If the field was not present in the index during indexing time,
|
||||
|
||||
@@ -144,6 +144,16 @@ impl RAMDirectory {
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
self.fs.read().unwrap().total_mem_usage()
|
||||
}
|
||||
|
||||
pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> {
|
||||
let wlock = self.fs.write().unwrap();
|
||||
for (path, source) in wlock.fs.iter() {
|
||||
let mut dest_wrt = dest.open_write(path)?;
|
||||
dest_wrt.write_all(source.as_slice())?;
|
||||
dest_wrt.terminate()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
|
||||
@@ -8,30 +8,33 @@ use crate::core::SegmentComponent;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::directory::TerminatingWrite;
|
||||
use crate::directory::{DirectoryLock, GarbageCollectionResult};
|
||||
use crate::directory::{TerminatingWrite, WatchCallbackList};
|
||||
use crate::docset::DocSet;
|
||||
use crate::error::TantivyError;
|
||||
use crate::fastfield::write_delete_bitset;
|
||||
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||
use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping;
|
||||
use crate::indexer::operation::DeleteOperation;
|
||||
use crate::indexer::segment_manager::SegmentRegisters;
|
||||
use crate::indexer::segment_register::SegmentRegister;
|
||||
use crate::indexer::stamper::Stamper;
|
||||
use crate::indexer::MergePolicy;
|
||||
use crate::indexer::SegmentEntry;
|
||||
use crate::indexer::SegmentWriter;
|
||||
use crate::reader::NRTReader;
|
||||
use crate::schema::Document;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::schema::Term;
|
||||
use crate::Opstamp;
|
||||
use crate::tokenizer::TokenizerManager;
|
||||
use crate::{IndexReader, Opstamp};
|
||||
use crossbeam::channel;
|
||||
use futures::executor::block_on;
|
||||
use futures::future::Future;
|
||||
use smallvec::smallvec;
|
||||
use smallvec::SmallVec;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::mem;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
@@ -68,6 +71,8 @@ pub struct IndexWriter {
|
||||
// lifetime of the lock with that of the IndexWriter.
|
||||
_directory_lock: Option<DirectoryLock>,
|
||||
|
||||
segment_registers: Arc<RwLock<SegmentRegisters>>,
|
||||
|
||||
index: Index,
|
||||
|
||||
heap_size_in_bytes_per_thread: usize,
|
||||
@@ -87,6 +92,8 @@ pub struct IndexWriter {
|
||||
|
||||
stamper: Stamper,
|
||||
committed_opstamp: Opstamp,
|
||||
|
||||
on_commit: WatchCallbackList,
|
||||
}
|
||||
|
||||
fn compute_deleted_bitset(
|
||||
@@ -133,7 +140,6 @@ fn compute_deleted_bitset(
|
||||
/// For instance, there was no delete operation between the state of the `segment_entry` and
|
||||
/// the `target_opstamp`, `segment_entry` is not updated.
|
||||
pub(crate) fn advance_deletes(
|
||||
mut segment: Segment,
|
||||
segment_entry: &mut SegmentEntry,
|
||||
target_opstamp: Opstamp,
|
||||
) -> crate::Result<()> {
|
||||
@@ -142,28 +148,38 @@ pub(crate) fn advance_deletes(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
|
||||
let delete_bitset_opt = segment_entry.take_delete_bitset();
|
||||
|
||||
// We avoid directly advancing the `SegmentEntry` delete cursor, because
|
||||
// we do not want to end up in an invalid state if the delete bitset
|
||||
// serialization fails.
|
||||
let mut delete_cursor = segment_entry.delete_cursor();
|
||||
|
||||
if delete_bitset_opt.is_none() && delete_cursor.get().is_none() {
|
||||
// There has been no `DeleteOperation` between the segment status and `target_opstamp`.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We open our current serialized segment to compute the new deleted bitset.
|
||||
let segment = segment_entry.segment().clone();
|
||||
let segment_reader = SegmentReader::open(&segment)?;
|
||||
|
||||
let max_doc = segment_reader.max_doc();
|
||||
let mut delete_bitset: BitSet = match segment_entry.delete_bitset() {
|
||||
Some(previous_delete_bitset) => (*previous_delete_bitset).clone(),
|
||||
None => BitSet::with_max_value(max_doc),
|
||||
};
|
||||
let mut delete_bitset: BitSet =
|
||||
delete_bitset_opt.unwrap_or_else(|| BitSet::with_max_value(max_doc));
|
||||
|
||||
let num_deleted_docs_before = segment.meta().num_deleted_docs();
|
||||
|
||||
compute_deleted_bitset(
|
||||
&mut delete_bitset,
|
||||
&segment_reader,
|
||||
segment_entry.delete_cursor(),
|
||||
&mut delete_cursor,
|
||||
&DocToOpstampMapping::None,
|
||||
target_opstamp,
|
||||
)?;
|
||||
|
||||
// TODO optimize
|
||||
// TODO optimize... We are simply manipulating bitsets here.
|
||||
// We should be able to compute the union much faster.
|
||||
if let Some(seg_delete_bitset) = segment_reader.delete_bitset() {
|
||||
for doc in 0u32..max_doc {
|
||||
if seg_delete_bitset.is_deleted(doc) {
|
||||
@@ -172,15 +188,23 @@ pub(crate) fn advance_deletes(
|
||||
}
|
||||
}
|
||||
|
||||
let num_deleted_docs = delete_bitset.len();
|
||||
if num_deleted_docs > 0 {
|
||||
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
|
||||
let num_deleted_docs = delete_bitset.len() as u32;
|
||||
|
||||
if num_deleted_docs > num_deleted_docs_before {
|
||||
// We need to write a new delete file.
|
||||
let mut delete_file = segment
|
||||
.with_delete_meta(num_deleted_docs, target_opstamp)
|
||||
.open_write(SegmentComponent::DELETE)?;
|
||||
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
|
||||
delete_file.terminate()?;
|
||||
segment_entry.reset_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||
}
|
||||
|
||||
segment_entry.set_meta(segment.meta().clone());
|
||||
// Regardless of whether we did end up having to write a new file or not
|
||||
// we advance the `delete_cursor`. This is an optimisation. We want to ensure we do not
|
||||
// check that a given deleted term does not match any of our docs more than once.
|
||||
segment_entry.set_delete_cursor(delete_cursor);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -189,11 +213,12 @@ fn index_documents(
|
||||
segment: Segment,
|
||||
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
|
||||
segment_updater: &mut SegmentUpdater,
|
||||
tokenizers: &TokenizerManager,
|
||||
mut delete_cursor: DeleteCursor,
|
||||
) -> crate::Result<bool> {
|
||||
let schema = segment.schema();
|
||||
|
||||
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
|
||||
let mut segment_writer =
|
||||
SegmentWriter::for_segment(memory_budget, segment.clone(), &schema, tokenizers)?;
|
||||
for document_group in grouped_document_iterator {
|
||||
for doc in document_group {
|
||||
segment_writer.add_document(doc, &schema)?;
|
||||
@@ -231,11 +256,7 @@ fn index_documents(
|
||||
last_docstamp,
|
||||
)?;
|
||||
|
||||
let segment_entry = SegmentEntry::new(
|
||||
segment_with_max_doc.meta().clone(),
|
||||
delete_cursor,
|
||||
delete_bitset_opt,
|
||||
);
|
||||
let segment_entry = SegmentEntry::new(segment_with_max_doc, delete_cursor, delete_bitset_opt);
|
||||
block_on(segment_updater.schedule_add_segment(segment_entry))?;
|
||||
Ok(true)
|
||||
}
|
||||
@@ -307,16 +328,24 @@ impl IndexWriter {
|
||||
|
||||
let delete_queue = DeleteQueue::new();
|
||||
|
||||
let current_opstamp = index.load_metas()?.opstamp;
|
||||
let meta = index.load_metas()?;
|
||||
|
||||
let stamper = Stamper::new(current_opstamp);
|
||||
let stamper = Stamper::new(meta.opstamp);
|
||||
|
||||
let commited_segments = SegmentRegister::new(
|
||||
index.directory(),
|
||||
&index.schema(),
|
||||
meta.segments,
|
||||
&delete_queue.cursor(),
|
||||
);
|
||||
let segment_registers = Arc::new(RwLock::new(SegmentRegisters::new(commited_segments)));
|
||||
|
||||
let segment_updater =
|
||||
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||
SegmentUpdater::create(segment_registers.clone(), index.clone(), stamper.clone())?;
|
||||
|
||||
let mut index_writer = IndexWriter {
|
||||
_directory_lock: Some(directory_lock),
|
||||
|
||||
segment_registers,
|
||||
heap_size_in_bytes_per_thread,
|
||||
index: index.clone(),
|
||||
|
||||
@@ -330,10 +359,12 @@ impl IndexWriter {
|
||||
|
||||
delete_queue,
|
||||
|
||||
committed_opstamp: current_opstamp,
|
||||
committed_opstamp: meta.opstamp,
|
||||
stamper,
|
||||
|
||||
worker_id: 0,
|
||||
|
||||
on_commit: Default::default(),
|
||||
};
|
||||
index_writer.start_workers()?;
|
||||
Ok(index_writer)
|
||||
@@ -373,13 +404,6 @@ impl IndexWriter {
|
||||
result
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> {
|
||||
let delete_cursor = self.delete_queue.cursor();
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
|
||||
block_on(self.segment_updater.schedule_add_segment(segment_entry))
|
||||
}
|
||||
|
||||
/// Creates a new segment.
|
||||
///
|
||||
/// This method is useful only for users trying to do complex
|
||||
@@ -428,12 +452,13 @@ impl IndexWriter {
|
||||
// was dropped.
|
||||
return Ok(());
|
||||
}
|
||||
let segment = index.new_segment();
|
||||
let segment = index.new_segment_unpersisted();
|
||||
index_documents(
|
||||
mem_budget,
|
||||
segment,
|
||||
&mut document_iterator,
|
||||
&mut segment_updater,
|
||||
index.tokenizers(),
|
||||
delete_cursor.clone(),
|
||||
)?;
|
||||
}
|
||||
@@ -460,6 +485,21 @@ impl IndexWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO move me
|
||||
pub(crate) fn trigger_commit(&self) -> impl Future<Output = ()> {
|
||||
self.on_commit.broadcast()
|
||||
}
|
||||
|
||||
pub fn reader(&self, num_searchers: usize) -> crate::Result<IndexReader> {
|
||||
let nrt_reader = NRTReader::create(
|
||||
num_searchers,
|
||||
self.index.clone(),
|
||||
self.segment_registers.clone(),
|
||||
&self.on_commit,
|
||||
)?;
|
||||
Ok(IndexReader::NRT(nrt_reader))
|
||||
}
|
||||
|
||||
/// Detects and removes the files that are not used by the index anymore.
|
||||
pub fn garbage_collect_files(
|
||||
&self,
|
||||
@@ -603,7 +643,7 @@ impl IndexWriter {
|
||||
/// It is also possible to add a payload to the `commit`
|
||||
/// using this API.
|
||||
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
||||
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
|
||||
pub fn prepare_commit(&mut self, soft_commit: bool) -> crate::Result<PreparedCommit> {
|
||||
// Here, because we join all of the worker threads,
|
||||
// all of the segment update for this commit have been
|
||||
// sent.
|
||||
@@ -631,7 +671,7 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
let commit_opstamp = self.stamper.stamp();
|
||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
|
||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft_commit);
|
||||
info!("Prepared commit {}", commit_opstamp);
|
||||
Ok(prepared_commit)
|
||||
}
|
||||
@@ -651,7 +691,11 @@ impl IndexWriter {
|
||||
/// that made it in the commit.
|
||||
///
|
||||
pub fn commit(&mut self) -> crate::Result<Opstamp> {
|
||||
self.prepare_commit()?.commit()
|
||||
self.prepare_commit(false)?.commit()
|
||||
}
|
||||
|
||||
pub fn soft_commit(&mut self) -> crate::Result<Opstamp> {
|
||||
self.prepare_commit(true)?.commit()
|
||||
}
|
||||
|
||||
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
|
||||
@@ -776,7 +820,6 @@ impl Drop for IndexWriter {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::super::operation::UserOperation;
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::error::LockError;
|
||||
@@ -1009,7 +1052,8 @@ mod tests {
|
||||
index_writer.add_document(doc!(text_field => "a"));
|
||||
}
|
||||
{
|
||||
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||
let mut prepared_commit =
|
||||
index_writer.prepare_commit(false).expect("commit failed");
|
||||
prepared_commit.set_payload("first commit");
|
||||
prepared_commit.commit().expect("commit failed");
|
||||
}
|
||||
@@ -1042,7 +1086,8 @@ mod tests {
|
||||
index_writer.add_document(doc!(text_field => "a"));
|
||||
}
|
||||
{
|
||||
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||
let mut prepared_commit =
|
||||
index_writer.prepare_commit(false).expect("commit failed");
|
||||
prepared_commit.set_payload("first commit");
|
||||
prepared_commit.abort().expect("commit failed");
|
||||
}
|
||||
@@ -1217,7 +1262,43 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(idfield=>"myid"));
|
||||
let commit = index_writer.commit();
|
||||
assert!(commit.is_ok());
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_writer_reader() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let idfield = schema_builder.add_text_field("id", STRING);
|
||||
schema_builder.add_text_field("optfield", STRING);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(idfield=>"myid"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
let reader = index_writer.reader(2).unwrap();
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.num_docs(), 1u64);
|
||||
index_writer.add_document(doc!(idfield=>"myid"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
assert_eq!(reader.searcher().num_docs(), 2u64);
|
||||
assert_eq!(searcher.num_docs(), 1u64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_writer_reader_soft_commit() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let idfield = schema_builder.add_text_field("id", STRING);
|
||||
schema_builder.add_text_field("optfield", STRING);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(idfield=>"myid"));
|
||||
assert!(index_writer.soft_commit().is_ok());
|
||||
let nrt_reader = index_writer.reader(2).unwrap();
|
||||
let normal_reader = index.reader_builder().try_into().unwrap();
|
||||
assert_eq!(nrt_reader.searcher().num_docs(), 1u64);
|
||||
assert_eq!(normal_reader.searcher().num_docs(), 0u64);
|
||||
assert!(index_writer.commit().is_ok());
|
||||
assert!(normal_reader.reload().is_ok());
|
||||
assert_eq!(nrt_reader.searcher().num_docs(), 1u64);
|
||||
assert_eq!(normal_reader.searcher().num_docs(), 1u64);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||
pub use self::prepared_commit::PreparedCommit;
|
||||
pub use self::segment_entry::SegmentEntry;
|
||||
pub use self::segment_manager::SegmentManager;
|
||||
pub(crate) use self::segment_manager::SegmentRegisters;
|
||||
pub use self::segment_serializer::SegmentSerializer;
|
||||
pub use self::segment_writer::SegmentWriter;
|
||||
|
||||
|
||||
@@ -7,14 +7,20 @@ pub struct PreparedCommit<'a> {
|
||||
index_writer: &'a mut IndexWriter,
|
||||
payload: Option<String>,
|
||||
opstamp: Opstamp,
|
||||
soft_commit: bool,
|
||||
}
|
||||
|
||||
impl<'a> PreparedCommit<'a> {
|
||||
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit<'_> {
|
||||
pub(crate) fn new(
|
||||
index_writer: &'a mut IndexWriter,
|
||||
opstamp: Opstamp,
|
||||
soft_commit: bool,
|
||||
) -> PreparedCommit {
|
||||
PreparedCommit {
|
||||
index_writer,
|
||||
payload: None,
|
||||
opstamp,
|
||||
soft_commit,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,11 +38,12 @@ impl<'a> PreparedCommit<'a> {
|
||||
|
||||
pub fn commit(self) -> crate::Result<Opstamp> {
|
||||
info!("committing {}", self.opstamp);
|
||||
let _ = block_on(
|
||||
self.index_writer
|
||||
.segment_updater()
|
||||
.schedule_commit(self.opstamp, self.payload),
|
||||
);
|
||||
block_on(self.index_writer.segment_updater().schedule_commit(
|
||||
self.opstamp,
|
||||
self.payload,
|
||||
self.soft_commit,
|
||||
))?;
|
||||
block_on(self.index_writer.trigger_commit());
|
||||
Ok(self.opstamp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use crate::common::BitSet;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::ManagedDirectory;
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::{Opstamp, Segment};
|
||||
use std::fmt;
|
||||
|
||||
/// A segment entry describes the state of
|
||||
@@ -19,55 +21,81 @@ use std::fmt;
|
||||
/// in the .del file or in the `delete_bitset`.
|
||||
#[derive(Clone)]
|
||||
pub struct SegmentEntry {
|
||||
meta: SegmentMeta,
|
||||
segment: Segment,
|
||||
delete_bitset: Option<BitSet>,
|
||||
delete_cursor: DeleteCursor,
|
||||
}
|
||||
|
||||
impl SegmentEntry {
|
||||
/// Create a new `SegmentEntry`
|
||||
pub fn new(
|
||||
segment_meta: SegmentMeta,
|
||||
pub(crate) fn new(
|
||||
segment: Segment,
|
||||
delete_cursor: DeleteCursor,
|
||||
delete_bitset: Option<BitSet>,
|
||||
) -> SegmentEntry {
|
||||
SegmentEntry {
|
||||
meta: segment_meta,
|
||||
segment,
|
||||
delete_bitset,
|
||||
delete_cursor,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a reference to the segment entry deleted bitset.
|
||||
pub fn persist(&mut self, dest_directory: ManagedDirectory) -> crate::Result<()> {
|
||||
// TODO take in account delete bitset?
|
||||
self.segment.persist(dest_directory)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn segment(&self) -> &Segment {
|
||||
&self.segment
|
||||
}
|
||||
|
||||
/// `Takes` (as in Option::take) the delete bitset of
|
||||
/// a segment entry.
|
||||
///
|
||||
/// `DocId` in this bitset are flagged as deleted.
|
||||
pub fn delete_bitset(&self) -> Option<&BitSet> {
|
||||
self.delete_bitset.as_ref()
|
||||
pub fn take_delete_bitset(&mut self) -> Option<BitSet> {
|
||||
self.delete_bitset.take()
|
||||
}
|
||||
|
||||
/// Set the `SegmentMeta` for this segment.
|
||||
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
|
||||
self.meta = segment_meta;
|
||||
/// Reset the delete informmationo in this segment.
|
||||
///
|
||||
/// The `SegmentEntry` segment's `SegmentMeta` gets updated, and
|
||||
/// any delete bitset is drop and set to None.
|
||||
pub fn reset_delete_meta(&mut self, num_deleted_docs: u32, target_opstamp: Opstamp) {
|
||||
self.segment = self
|
||||
.segment
|
||||
.clone()
|
||||
.with_delete_meta(num_deleted_docs, target_opstamp);
|
||||
self.delete_bitset = None;
|
||||
}
|
||||
|
||||
pub fn set_delete_cursor(&mut self, delete_cursor: DeleteCursor) {
|
||||
self.delete_cursor = delete_cursor;
|
||||
}
|
||||
/// Return a reference to the segment_entry's delete cursor
|
||||
pub fn delete_cursor(&mut self) -> &mut DeleteCursor {
|
||||
&mut self.delete_cursor
|
||||
pub fn delete_cursor(&mut self) -> DeleteCursor {
|
||||
self.delete_cursor.clone()
|
||||
}
|
||||
|
||||
/// Returns the segment id.
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
self.meta.id()
|
||||
self.segment.id()
|
||||
}
|
||||
|
||||
/// Accessor to the `SegmentMeta`
|
||||
pub fn meta(&self) -> &SegmentMeta {
|
||||
&self.meta
|
||||
self.segment.meta()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SegmentEntry {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(formatter, "SegmentEntry({:?})", self.meta)
|
||||
let num_deletes = self.delete_bitset.as_ref().map(|bitset| bitset.len());
|
||||
write!(
|
||||
formatter,
|
||||
"SegmentEntry(seg={:?}, ndel={:?})",
|
||||
self.segment, num_deletes
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
use super::segment_register::SegmentRegister;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::error::TantivyError;
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::indexer::SegmentEntry;
|
||||
use crate::Segment;
|
||||
use std::collections::hash_set::HashSet;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::sync::RwLock;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
#[derive(Default)]
|
||||
struct SegmentRegisters {
|
||||
pub(crate) struct SegmentRegisters {
|
||||
uncommitted: SegmentRegister,
|
||||
committed: SegmentRegister,
|
||||
}
|
||||
@@ -22,6 +20,17 @@ pub(crate) enum SegmentsStatus {
|
||||
}
|
||||
|
||||
impl SegmentRegisters {
|
||||
pub fn new(committed: SegmentRegister) -> SegmentRegisters {
|
||||
SegmentRegisters {
|
||||
uncommitted: Default::default(),
|
||||
committed,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn committed_segment(&self) -> Vec<Segment> {
|
||||
self.committed.segments()
|
||||
}
|
||||
|
||||
/// Check if all the segments are committed or uncommited.
|
||||
///
|
||||
/// If some segment is missing or segments are in a different state (this should not happen
|
||||
@@ -44,18 +53,7 @@ impl SegmentRegisters {
|
||||
/// changes (merges especially)
|
||||
#[derive(Default)]
|
||||
pub struct SegmentManager {
|
||||
registers: RwLock<SegmentRegisters>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentManager {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
let lock = self.read();
|
||||
write!(
|
||||
f,
|
||||
"{{ uncommitted: {:?}, committed: {:?} }}",
|
||||
lock.uncommitted, lock.committed
|
||||
)
|
||||
}
|
||||
registers: Arc<RwLock<SegmentRegisters>>,
|
||||
}
|
||||
|
||||
pub fn get_mergeable_segments(
|
||||
@@ -74,16 +72,8 @@ pub fn get_mergeable_segments(
|
||||
}
|
||||
|
||||
impl SegmentManager {
|
||||
pub fn from_segments(
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> SegmentManager {
|
||||
SegmentManager {
|
||||
registers: RwLock::new(SegmentRegisters {
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
||||
}),
|
||||
}
|
||||
pub(crate) fn new(registers: Arc<RwLock<SegmentRegisters>>) -> SegmentManager {
|
||||
SegmentManager { registers }
|
||||
}
|
||||
|
||||
/// Returns all of the segment entries (committed or uncommitted)
|
||||
@@ -165,7 +155,7 @@ impl SegmentManager {
|
||||
let error_msg = "Merge operation sent for segments that are not \
|
||||
all uncommited or commited."
|
||||
.to_string();
|
||||
return Err(TantivyError::InvalidArgument(error_msg));
|
||||
return Err(crate::Error::InvalidArgument(error_msg));
|
||||
}
|
||||
Ok(segment_entries)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::ManagedDirectory;
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::indexer::segment_entry::SegmentEntry;
|
||||
use crate::schema::Schema;
|
||||
use crate::Segment;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
@@ -46,6 +49,13 @@ impl SegmentRegister {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn segments(&self) -> Vec<Segment> {
|
||||
self.segment_states
|
||||
.values()
|
||||
.map(|segment_entry| segment_entry.segment().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
||||
self.segment_states.values().cloned().collect()
|
||||
}
|
||||
@@ -79,11 +89,17 @@ impl SegmentRegister {
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
}
|
||||
|
||||
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
|
||||
pub fn new(
|
||||
directory: &ManagedDirectory,
|
||||
schema: &Schema,
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> SegmentRegister {
|
||||
let mut segment_states = HashMap::new();
|
||||
for segment_meta in segment_metas {
|
||||
let segment_id = segment_meta.id();
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None);
|
||||
for meta in segment_metas {
|
||||
let segment_id = meta.id();
|
||||
let segment = Segment::new_persisted(meta, directory.clone(), schema.clone());
|
||||
let segment_entry = SegmentEntry::new(segment, delete_cursor.clone(), None);
|
||||
segment_states.insert(segment_id, segment_entry);
|
||||
}
|
||||
SegmentRegister { segment_states }
|
||||
@@ -95,6 +111,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::core::{SegmentId, SegmentMetaInventory};
|
||||
use crate::indexer::delete_queue::*;
|
||||
use crate::schema::Schema;
|
||||
|
||||
fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {
|
||||
segment_register
|
||||
@@ -108,6 +125,7 @@ mod tests {
|
||||
fn test_segment_register() {
|
||||
let inventory = SegmentMetaInventory::default();
|
||||
let delete_queue = DeleteQueue::new();
|
||||
let schema = Schema::builder().build();
|
||||
|
||||
let mut segment_register = SegmentRegister::default();
|
||||
let segment_id_a = SegmentId::generate_random();
|
||||
@@ -115,21 +133,24 @@ mod tests {
|
||||
let segment_id_merged = SegmentId::generate_random();
|
||||
|
||||
{
|
||||
let segment_meta = inventory.new_segment_meta(segment_id_a, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
let meta = inventory.new_segment_meta(segment_id_a, 0u32);
|
||||
let segment = Segment::new_volatile(meta, schema.clone());
|
||||
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
|
||||
{
|
||||
let segment_meta = inventory.new_segment_meta(segment_id_b, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
|
||||
let segment = Segment::new_volatile(segment_meta, schema.clone());
|
||||
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
segment_register.remove_segment(&segment_id_a);
|
||||
segment_register.remove_segment(&segment_id_b);
|
||||
{
|
||||
let segment_meta_merged = inventory.new_segment_meta(segment_id_merged, 0u32);
|
||||
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
|
||||
let segment = Segment::new_volatile(segment_meta_merged, schema);
|
||||
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
|
||||
segment_register.add_segment_entry(segment_entry);
|
||||
}
|
||||
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
|
||||
|
||||
@@ -7,11 +7,10 @@ use crate::core::SegmentMeta;
|
||||
use crate::core::SerializableSegment;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::indexer::index_writer::advance_deletes;
|
||||
use crate::indexer::merge_operation::MergeOperationInventory;
|
||||
use crate::indexer::merger::IndexMerger;
|
||||
use crate::indexer::segment_manager::SegmentsStatus;
|
||||
use crate::indexer::segment_manager::{SegmentRegisters, SegmentsStatus};
|
||||
use crate::indexer::stamper::Stamper;
|
||||
use crate::indexer::SegmentEntry;
|
||||
use crate::indexer::SegmentSerializer;
|
||||
@@ -117,8 +116,7 @@ fn merge(
|
||||
|
||||
// First we apply all of the delet to the merged segment, up to the target opstamp.
|
||||
for segment_entry in &mut segment_entries {
|
||||
let segment = index.segment(segment_entry.meta().clone());
|
||||
advance_deletes(segment, segment_entry, target_opstamp)?;
|
||||
advance_deletes(segment_entry, target_opstamp)?;
|
||||
}
|
||||
|
||||
let delete_cursor = segment_entries[0].delete_cursor().clone();
|
||||
@@ -134,11 +132,13 @@ fn merge(
|
||||
// ... we just serialize this index merger in our new segment to merge the two segments.
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
|
||||
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
let max_doc = merger.write(segment_serializer)?;
|
||||
|
||||
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
|
||||
|
||||
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
|
||||
Ok(SegmentEntry::new(
|
||||
merged_segment.with_max_doc(max_doc),
|
||||
delete_cursor,
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) struct InnerSegmentUpdater {
|
||||
@@ -162,12 +162,11 @@ pub(crate) struct InnerSegmentUpdater {
|
||||
|
||||
impl SegmentUpdater {
|
||||
pub fn create(
|
||||
segment_registers: Arc<RwLock<SegmentRegisters>>,
|
||||
index: Index,
|
||||
stamper: Stamper,
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> crate::Result<SegmentUpdater> {
|
||||
let segments = index.searchable_segment_metas()?;
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||
let segment_manager = SegmentManager::new(segment_registers);
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.name_prefix("segment_updater")
|
||||
.pool_size(1)
|
||||
@@ -234,6 +233,7 @@ impl SegmentUpdater {
|
||||
&self,
|
||||
segment_entry: SegmentEntry,
|
||||
) -> impl Future<Output = crate::Result<()>> {
|
||||
// TODO temporary: serializing the segment at this point.
|
||||
let segment_updater = self.clone();
|
||||
self.schedule_future(async move {
|
||||
segment_updater.segment_manager.add_segment(segment_entry);
|
||||
@@ -262,8 +262,7 @@ impl SegmentUpdater {
|
||||
fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result<Vec<SegmentEntry>> {
|
||||
let mut segment_entries = self.segment_manager.segment_entries();
|
||||
for segment_entry in &mut segment_entries {
|
||||
let segment = self.index.segment(segment_entry.meta().clone());
|
||||
advance_deletes(segment, segment_entry, target_opstamp)?;
|
||||
advance_deletes(segment_entry, target_opstamp)?;
|
||||
}
|
||||
Ok(segment_entries)
|
||||
}
|
||||
@@ -331,12 +330,21 @@ impl SegmentUpdater {
|
||||
&self,
|
||||
opstamp: Opstamp,
|
||||
payload: Option<String>,
|
||||
soft_commit: bool,
|
||||
) -> impl Future<Output = crate::Result<()>> {
|
||||
let segment_updater: SegmentUpdater = self.clone();
|
||||
let directory = self.index.directory().clone();
|
||||
self.schedule_future(async move {
|
||||
let segment_entries = segment_updater.purge_deletes(opstamp)?;
|
||||
let mut segment_entries = segment_updater.purge_deletes(opstamp)?;
|
||||
if !soft_commit {
|
||||
for segment_entry in &mut segment_entries {
|
||||
segment_entry.persist(directory.clone())?;
|
||||
}
|
||||
}
|
||||
segment_updater.segment_manager.commit(segment_entries);
|
||||
segment_updater.save_metas(opstamp, payload)?;
|
||||
if !soft_commit {
|
||||
segment_updater.save_metas(opstamp, payload)?;
|
||||
}
|
||||
let _ = garbage_collect_files(segment_updater.clone()).await;
|
||||
segment_updater.consider_merge_options().await;
|
||||
Ok(())
|
||||
@@ -474,17 +482,14 @@ impl SegmentUpdater {
|
||||
let end_merge_future = self.schedule_future(async move {
|
||||
info!("End merge {:?}", after_merge_segment_entry.meta());
|
||||
{
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor();
|
||||
if let Some(delete_operation) = delete_cursor.get() {
|
||||
let committed_opstamp = segment_updater.load_metas().opstamp;
|
||||
if delete_operation.opstamp < committed_opstamp {
|
||||
let index = &segment_updater.index;
|
||||
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
||||
if let Err(e) = advance_deletes(
|
||||
segment,
|
||||
&mut after_merge_segment_entry,
|
||||
committed_opstamp,
|
||||
) {
|
||||
let _index = &segment_updater.index;
|
||||
if let Err(e) =
|
||||
advance_deletes(&mut after_merge_segment_entry, committed_opstamp)
|
||||
{
|
||||
error!(
|
||||
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
|
||||
merge_operation.segment_ids(),
|
||||
|
||||
@@ -11,8 +11,9 @@ use crate::schema::Schema;
|
||||
use crate::schema::Term;
|
||||
use crate::schema::Value;
|
||||
use crate::schema::{Field, FieldEntry};
|
||||
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
||||
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
||||
use crate::tokenizer::TokenizerManager;
|
||||
use crate::tokenizer::{BoxTokenStream, FacetTokenizer};
|
||||
use crate::tokenizer::{PreTokenizedStream, TextAnalyzer};
|
||||
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
||||
use crate::DocId;
|
||||
use crate::Opstamp;
|
||||
@@ -64,10 +65,12 @@ impl SegmentWriter {
|
||||
memory_budget: usize,
|
||||
mut segment: Segment,
|
||||
schema: &Schema,
|
||||
tokenizer_manager: &TokenizerManager,
|
||||
) -> crate::Result<SegmentWriter> {
|
||||
let table_num_bits = initial_table_size(memory_budget)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
|
||||
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
|
||||
let multifield_postings = MultiFieldPostingsWriter::new(&schema, table_num_bits);
|
||||
|
||||
let tokenizers = schema
|
||||
.fields()
|
||||
.map(
|
||||
@@ -76,7 +79,7 @@ impl SegmentWriter {
|
||||
.get_indexing_options()
|
||||
.and_then(|text_index_option| {
|
||||
let tokenizer_name = &text_index_option.tokenizer();
|
||||
segment.index().tokenizers().get(tokenizer_name)
|
||||
tokenizer_manager.get(tokenizer_name)
|
||||
}),
|
||||
_ => None,
|
||||
},
|
||||
@@ -85,9 +88,9 @@ impl SegmentWriter {
|
||||
Ok(SegmentWriter {
|
||||
max_doc: 0,
|
||||
multifield_postings,
|
||||
fieldnorms_writer: FieldNormsWriter::for_schema(schema),
|
||||
fieldnorms_writer: FieldNormsWriter::for_schema(&schema),
|
||||
segment_serializer,
|
||||
fast_field_writers: FastFieldsWriter::from_schema(schema),
|
||||
fast_field_writers: FastFieldsWriter::from_schema(&schema),
|
||||
doc_opstamps: Vec::with_capacity(1_000),
|
||||
tokenizers,
|
||||
})
|
||||
|
||||
@@ -121,6 +121,7 @@ mod functional_test;
|
||||
mod macros;
|
||||
|
||||
pub use crate::error::TantivyError;
|
||||
pub use crate::error::TantivyError as Error;
|
||||
pub use chrono;
|
||||
|
||||
/// Tantivy result.
|
||||
|
||||
@@ -220,7 +220,8 @@ pub mod tests {
|
||||
|
||||
{
|
||||
let mut segment_writer =
|
||||
SegmentWriter::for_segment(3_000_000, segment.clone(), &schema).unwrap();
|
||||
SegmentWriter::for_segment(3_000_000, segment.clone(), &schema, index.tokenizers())
|
||||
.unwrap();
|
||||
{
|
||||
let mut doc = Document::default();
|
||||
// checking that position works if the field has two values
|
||||
|
||||
@@ -112,6 +112,7 @@ mod tests {
|
||||
let term_a = Term::from_field_text(text_field, "a");
|
||||
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
|
||||
let reader = index.reader().unwrap();
|
||||
assert_eq!(reader.searcher().segment_readers().len(), 1);
|
||||
assert_eq!(term_query.count(&*reader.searcher()).unwrap(), 1);
|
||||
}
|
||||
|
||||
|
||||
83
src/reader/index_writer_reader.rs
Normal file
83
src/reader/index_writer_reader.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use crate::directory::{WatchCallbackList, WatchHandle};
|
||||
use crate::indexer::SegmentRegisters;
|
||||
use crate::reader::pool::Pool;
|
||||
use crate::{Index, LeasedItem, Searcher, Segment, SegmentReader};
|
||||
use std::iter::repeat_with;
|
||||
use std::sync::{Arc, RwLock, Weak};
|
||||
|
||||
struct InnerNRTReader {
|
||||
num_searchers: usize,
|
||||
index: Index,
|
||||
searcher_pool: Pool<Searcher>,
|
||||
segment_registers: Arc<RwLock<SegmentRegisters>>,
|
||||
}
|
||||
|
||||
impl InnerNRTReader {
|
||||
fn load_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
|
||||
let segments: Vec<Segment> = {
|
||||
let rlock = self.segment_registers.read().unwrap();
|
||||
rlock.committed_segment()
|
||||
};
|
||||
segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
.collect::<crate::Result<Vec<SegmentReader>>>()
|
||||
}
|
||||
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
let segment_readers: Vec<SegmentReader> = self.load_segment_readers()?;
|
||||
let schema = self.index.schema();
|
||||
let searchers = repeat_with(|| {
|
||||
Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())
|
||||
})
|
||||
.take(self.num_searchers)
|
||||
.collect();
|
||||
self.searcher_pool.publish_new_generation(searchers);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.searcher_pool.acquire()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NRTReader {
|
||||
inner: Arc<InnerNRTReader>,
|
||||
watch_handle: WatchHandle,
|
||||
}
|
||||
|
||||
impl NRTReader {
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
self.inner.reload()
|
||||
}
|
||||
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.inner.searcher()
|
||||
}
|
||||
|
||||
pub(crate) fn create(
|
||||
num_searchers: usize,
|
||||
index: Index,
|
||||
segment_registers: Arc<RwLock<SegmentRegisters>>,
|
||||
watch_callback_list: &WatchCallbackList,
|
||||
) -> crate::Result<Self> {
|
||||
let inner_reader: Arc<InnerNRTReader> = Arc::new(InnerNRTReader {
|
||||
num_searchers,
|
||||
index,
|
||||
searcher_pool: Pool::new(),
|
||||
segment_registers,
|
||||
});
|
||||
let inner_reader_weak: Weak<InnerNRTReader> = Arc::downgrade(&inner_reader);
|
||||
let watch_handle = watch_callback_list.subscribe(Box::new(move || {
|
||||
if let Some(nrt_reader_arc) = inner_reader_weak.upgrade() {
|
||||
let _ = nrt_reader_arc.reload();
|
||||
}
|
||||
}));
|
||||
inner_reader.reload()?;
|
||||
Ok(NRTReader {
|
||||
inner: inner_reader,
|
||||
watch_handle,
|
||||
})
|
||||
}
|
||||
}
|
||||
180
src/reader/meta_file_reader.rs
Normal file
180
src/reader/meta_file_reader.rs
Normal file
@@ -0,0 +1,180 @@
|
||||
use super::pool::Pool;
|
||||
use crate::core::Segment;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::Searcher;
|
||||
use crate::SegmentReader;
|
||||
use crate::{Index, LeasedItem};
|
||||
use crate::{IndexReader, Result};
|
||||
use std::iter::repeat_with;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Defines when a new version of the index should be reloaded.
|
||||
///
|
||||
/// Regardless of whether you search and index in the same process, tantivy does not necessarily
|
||||
/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define
|
||||
/// when you want your index to be reloaded.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum ReloadPolicy {
|
||||
/// The index is entirely reloaded manually.
|
||||
/// All updates of the index should be manual.
|
||||
///
|
||||
/// No change is reflected automatically. You are required to call `.load_seacher()` manually.
|
||||
Manual,
|
||||
/// The index is reloaded within milliseconds after a new commit is available.
|
||||
/// This is made possible by watching changes in the `meta.json` file.
|
||||
OnCommit, // TODO add NEAR_REAL_TIME(target_ms)
|
||||
}
|
||||
|
||||
/// `IndexReader` builder
|
||||
///
|
||||
/// It makes it possible to set the following values.
|
||||
///
|
||||
/// - `num_searchers` (by default, the number of detected CPU threads):
|
||||
///
|
||||
/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block
|
||||
/// until the one of the searcher in-use gets released.
|
||||
/// - `reload_policy` (by default `ReloadPolicy::OnCommit`):
|
||||
///
|
||||
/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
|
||||
#[derive(Clone)]
|
||||
pub struct IndexReaderBuilder {
|
||||
num_searchers: usize,
|
||||
reload_policy: ReloadPolicy,
|
||||
index: Index,
|
||||
}
|
||||
|
||||
impl IndexReaderBuilder {
|
||||
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
|
||||
IndexReaderBuilder {
|
||||
num_searchers: num_cpus::get(),
|
||||
reload_policy: ReloadPolicy::Manual,
|
||||
index,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the reader.
|
||||
///
|
||||
/// Building the reader is a non-trivial operation that requires
|
||||
/// to open different segment readers. It may take hundreds of milliseconds
|
||||
/// of time and it may return an error.
|
||||
/// TODO(pmasurel) Use the `TryInto` trait once it is available in stable.
|
||||
pub fn try_into(self) -> Result<IndexReader> {
|
||||
let inner_reader = MetaFileIndexReaderInner {
|
||||
index: self.index,
|
||||
num_searchers: self.num_searchers,
|
||||
searcher_pool: Pool::new(),
|
||||
};
|
||||
inner_reader.reload()?;
|
||||
let inner_reader_arc = Arc::new(inner_reader);
|
||||
let watch_handle_opt: Option<WatchHandle>;
|
||||
match self.reload_policy {
|
||||
ReloadPolicy::Manual => {
|
||||
// No need to set anything...
|
||||
watch_handle_opt = None;
|
||||
}
|
||||
ReloadPolicy::OnCommit => {
|
||||
let inner_reader_arc_clone = inner_reader_arc.clone();
|
||||
let callback = move || {
|
||||
if let Err(err) = inner_reader_arc_clone.reload() {
|
||||
error!(
|
||||
"Error while loading searcher after commit was detected. {:?}",
|
||||
err
|
||||
);
|
||||
}
|
||||
};
|
||||
let watch_handle = inner_reader_arc
|
||||
.index
|
||||
.directory()
|
||||
.watch(Box::new(callback))?;
|
||||
watch_handle_opt = Some(watch_handle);
|
||||
}
|
||||
}
|
||||
Ok(IndexReader::from(MetaFileIndexReader {
|
||||
inner: inner_reader_arc,
|
||||
watch_handle_opt,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Sets the reload_policy.
|
||||
///
|
||||
/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
|
||||
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
|
||||
self.reload_policy = reload_policy;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the number of `Searcher` in the searcher pool.
|
||||
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
|
||||
self.num_searchers = num_searchers;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
struct MetaFileIndexReaderInner {
|
||||
num_searchers: usize,
|
||||
searcher_pool: Pool<Searcher>,
|
||||
index: Index,
|
||||
}
|
||||
|
||||
impl MetaFileIndexReaderInner {
|
||||
fn load_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
|
||||
// We keep the lock until we have effectively finished opening the
|
||||
// the `SegmentReader` because it prevents a diffferent process
|
||||
// to garbage collect these file while we open them.
|
||||
//
|
||||
// Once opened, on linux & mac, the mmap will remain valid after
|
||||
// the file has been deleted
|
||||
// On windows, the file deletion will fail.
|
||||
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
|
||||
let searchable_segments = self.searchable_segments()?;
|
||||
searchable_segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
.collect::<Result<_>>()
|
||||
}
|
||||
|
||||
fn reload(&self) -> crate::Result<()> {
|
||||
let segment_readers: Vec<SegmentReader> = self.load_segment_readers()?;
|
||||
let schema = self.index.schema();
|
||||
let searchers = repeat_with(|| {
|
||||
Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())
|
||||
})
|
||||
.take(self.num_searchers)
|
||||
.collect();
|
||||
self.searcher_pool.publish_new_generation(searchers);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the list of segments that are searchable
|
||||
fn searchable_segments(&self) -> crate::Result<Vec<Segment>> {
|
||||
self.index.searchable_segments()
|
||||
}
|
||||
|
||||
fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.searcher_pool.acquire()
|
||||
}
|
||||
}
|
||||
|
||||
/// `IndexReader` is your entry point to read and search the index.
|
||||
///
|
||||
/// It controls when a new version of the index should be loaded and lends
|
||||
/// you instances of `Searcher` for the last loaded version.
|
||||
///
|
||||
/// `Clone` does not clone the different pool of searcher. `IndexReader`
|
||||
/// just wraps and `Arc`.
|
||||
#[derive(Clone)]
|
||||
pub struct MetaFileIndexReader {
|
||||
inner: Arc<MetaFileIndexReaderInner>,
|
||||
watch_handle_opt: Option<WatchHandle>,
|
||||
}
|
||||
|
||||
impl MetaFileIndexReader {
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
self.inner.reload()
|
||||
}
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.inner.searcher()
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,32 @@
|
||||
mod index_writer_reader;
|
||||
mod meta_file_reader;
|
||||
mod pool;
|
||||
|
||||
use self::meta_file_reader::MetaFileIndexReader;
|
||||
pub use self::meta_file_reader::{IndexReaderBuilder, ReloadPolicy};
|
||||
pub use self::pool::LeasedItem;
|
||||
use self::pool::Pool;
|
||||
use crate::core::Segment;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::Index;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
pub(crate) use crate::reader::index_writer_reader::NRTReader;
|
||||
|
||||
use crate::Searcher;
|
||||
use crate::SegmentReader;
|
||||
use std::sync::Arc;
|
||||
|
||||
/*
|
||||
//
|
||||
//enum SegmentSource {
|
||||
// FromMetaFile,
|
||||
// FromWriter(Arc<RwLock<SegmentRegisters>>),
|
||||
//}
|
||||
//
|
||||
//impl SegmentSource {
|
||||
// fn from_meta_file() -> SegmentSource {
|
||||
//
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
||||
/// Defines when a new version of the index should be reloaded.
|
||||
///
|
||||
@@ -120,15 +137,24 @@ struct InnerIndexReader {
|
||||
}
|
||||
|
||||
impl InnerIndexReader {
|
||||
fn load_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
|
||||
// We keep the lock until we have effectively finished opening the
|
||||
// the `SegmentReader` because it prevents a diffferent process
|
||||
// to garbage collect these file while we open them.
|
||||
//
|
||||
// Once opened, on linux & mac, the mmap will remain valid after
|
||||
// the file has been deleted
|
||||
// On windows, the file deletion will fail.
|
||||
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
|
||||
let searchable_segments = self.searchable_segments()?;
|
||||
searchable_segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
.collect::<crate::Result<_>>()
|
||||
}
|
||||
|
||||
fn reload(&self) -> crate::Result<()> {
|
||||
let segment_readers: Vec<SegmentReader> = {
|
||||
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
|
||||
let searchable_segments = self.searchable_segments()?;
|
||||
searchable_segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
.collect::<crate::Result<_>>()?
|
||||
};
|
||||
let segment_readers: Vec<SegmentReader> = self.load_segment_readers()?;
|
||||
let schema = self.index.schema();
|
||||
let searchers = (0..self.num_searchers)
|
||||
.map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()))
|
||||
@@ -154,18 +180,16 @@ impl InnerIndexReader {
|
||||
///
|
||||
/// `Clone` does not clone the different pool of searcher. `IndexReader`
|
||||
/// just wraps and `Arc`.
|
||||
=======
|
||||
>>>>>>> Added NRTReader
|
||||
*/
|
||||
#[derive(Clone)]
|
||||
pub struct IndexReader {
|
||||
inner: Arc<InnerIndexReader>,
|
||||
watch_handle_opt: Option<WatchHandle>,
|
||||
pub enum IndexReader {
|
||||
FromMetaFile(MetaFileIndexReader),
|
||||
NRT(NRTReader),
|
||||
}
|
||||
|
||||
impl IndexReader {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn index(&self) -> Index {
|
||||
self.inner.index.clone()
|
||||
}
|
||||
|
||||
/// Update searchers so that they reflect the state of the last
|
||||
/// `.commit()`.
|
||||
///
|
||||
@@ -176,7 +200,10 @@ impl IndexReader {
|
||||
/// This automatic reload can take 10s of milliseconds to kick in however, and in unit tests
|
||||
/// it can be nice to deterministically force the reload of searchers.
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
self.inner.reload()
|
||||
match self {
|
||||
IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.reload(),
|
||||
IndexReader::NRT(nrt_reader) => nrt_reader.reload(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a searcher
|
||||
@@ -190,6 +217,21 @@ impl IndexReader {
|
||||
/// The same searcher must be used for a given query, as it ensures
|
||||
/// the use of a consistent segment set.
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.inner.searcher()
|
||||
match self {
|
||||
IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.searcher(),
|
||||
IndexReader::NRT(nrt_reader) => nrt_reader.searcher(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MetaFileIndexReader> for IndexReader {
|
||||
fn from(meta_file_reader: MetaFileIndexReader) -> Self {
|
||||
IndexReader::FromMetaFile(meta_file_reader)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NRTReader> for IndexReader {
|
||||
fn from(nrt_reader: NRTReader) -> Self {
|
||||
IndexReader::NRT(nrt_reader)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user