mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
Added NRTReader
This commit is contained in:
@@ -57,6 +57,68 @@ pub struct SegmentReader {
|
||||
}
|
||||
|
||||
impl SegmentReader {
|
||||
/// Open a new segment for reading.
|
||||
pub fn open(segment: &Segment) -> Result<SegmentReader> {
|
||||
let termdict_source = segment.open_read(SegmentComponent::TERMS)?;
|
||||
let termdict_composite = CompositeFile::open(&termdict_source)?;
|
||||
|
||||
let store_source = segment.open_read(SegmentComponent::STORE)?;
|
||||
|
||||
fail_point!("SegmentReader::open#middle");
|
||||
|
||||
let postings_source = segment.open_read(SegmentComponent::POSTINGS)?;
|
||||
let postings_composite = CompositeFile::open(&postings_source)?;
|
||||
|
||||
let positions_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let positions_idx_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let schema = segment.schema();
|
||||
|
||||
let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
|
||||
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
|
||||
let fast_field_readers =
|
||||
Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?);
|
||||
|
||||
let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
|
||||
let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?;
|
||||
|
||||
let delete_bitset_opt = if segment.meta().has_deletes() {
|
||||
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
Some(DeleteBitSet::open(delete_data))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(SegmentReader {
|
||||
inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
max_doc: segment.meta().max_doc(),
|
||||
num_docs: segment.meta().num_docs(),
|
||||
termdict_composite,
|
||||
postings_composite,
|
||||
fast_fields_readers: fast_field_readers,
|
||||
fieldnorms_composite,
|
||||
segment_id: segment.id(),
|
||||
store_source,
|
||||
delete_bitset_opt,
|
||||
positions_composite,
|
||||
positions_idx_composite,
|
||||
schema,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the highest document id ever attributed in
|
||||
/// this segment + 1.
|
||||
/// Today, `tantivy` does not handle deletes, so it happens
|
||||
@@ -144,68 +206,6 @@ impl SegmentReader {
|
||||
StoreReader::from_source(self.store_source.clone())
|
||||
}
|
||||
|
||||
/// Open a new segment for reading.
|
||||
pub fn open(segment: &Segment) -> Result<SegmentReader> {
|
||||
let termdict_source = segment.open_read(SegmentComponent::TERMS)?;
|
||||
let termdict_composite = CompositeFile::open(&termdict_source)?;
|
||||
|
||||
let store_source = segment.open_read(SegmentComponent::STORE)?;
|
||||
|
||||
fail_point!("SegmentReader::open#middle");
|
||||
|
||||
let postings_source = segment.open_read(SegmentComponent::POSTINGS)?;
|
||||
let postings_composite = CompositeFile::open(&postings_source)?;
|
||||
|
||||
let positions_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let positions_idx_composite = {
|
||||
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) {
|
||||
CompositeFile::open(&source)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let schema = segment.schema();
|
||||
|
||||
let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
|
||||
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
|
||||
let fast_field_readers =
|
||||
Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?);
|
||||
|
||||
let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
|
||||
let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?;
|
||||
|
||||
let delete_bitset_opt = if segment.meta().has_deletes() {
|
||||
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
Some(DeleteBitSet::open(delete_data))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(SegmentReader {
|
||||
inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
max_doc: segment.meta().max_doc(),
|
||||
num_docs: segment.meta().num_docs(),
|
||||
termdict_composite,
|
||||
postings_composite,
|
||||
fast_fields_readers: fast_field_readers,
|
||||
fieldnorms_composite,
|
||||
segment_id: segment.id(),
|
||||
store_source,
|
||||
delete_bitset_opt,
|
||||
positions_composite,
|
||||
positions_idx_composite,
|
||||
schema,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a field reader associated to the field given in argument.
|
||||
/// If the field was not present in the index during indexing time,
|
||||
/// the InvertedIndexReader is empty.
|
||||
|
||||
@@ -8,8 +8,8 @@ use crate::core::SegmentComponent;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::directory::TerminatingWrite;
|
||||
use crate::directory::{DirectoryLock, GarbageCollectionResult};
|
||||
use crate::directory::{TerminatingWrite, WatchCallbackList};
|
||||
use crate::docset::DocSet;
|
||||
use crate::error::TantivyError;
|
||||
use crate::fastfield::write_delete_bitset;
|
||||
@@ -22,16 +22,16 @@ use crate::indexer::stamper::Stamper;
|
||||
use crate::indexer::MergePolicy;
|
||||
use crate::indexer::SegmentEntry;
|
||||
use crate::indexer::SegmentWriter;
|
||||
use crate::reader::NRTReader;
|
||||
use crate::schema::Document;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::schema::Term;
|
||||
use crate::tokenizer::TokenizerManager;
|
||||
use crate::Opstamp;
|
||||
use crate::{IndexReader, Opstamp};
|
||||
use crossbeam::channel;
|
||||
use futures::executor::block_on;
|
||||
use futures::future::Future;
|
||||
use smallvec::smallvec;
|
||||
use smallvec::SmallVec;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::mem;
|
||||
use std::ops::Range;
|
||||
use std::sync::{Arc, RwLock};
|
||||
@@ -92,6 +92,8 @@ pub struct IndexWriter {
|
||||
|
||||
stamper: Stamper,
|
||||
committed_opstamp: Opstamp,
|
||||
|
||||
on_commit: WatchCallbackList,
|
||||
}
|
||||
|
||||
fn compute_deleted_bitset(
|
||||
@@ -344,6 +346,8 @@ impl IndexWriter {
|
||||
stamper,
|
||||
|
||||
worker_id: 0,
|
||||
|
||||
on_commit: Default::default(),
|
||||
};
|
||||
index_writer.start_workers()?;
|
||||
Ok(index_writer)
|
||||
@@ -464,6 +468,21 @@ impl IndexWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO move me
|
||||
pub(crate) fn trigger_commit(&self) -> impl Future<Output = ()> {
|
||||
self.on_commit.broadcast()
|
||||
}
|
||||
|
||||
pub fn reader(&self, num_searchers: usize) -> crate::Result<IndexReader> {
|
||||
let nrt_reader = NRTReader::create(
|
||||
num_searchers,
|
||||
self.index.clone(),
|
||||
self.segment_registers.clone(),
|
||||
&self.on_commit,
|
||||
)?;
|
||||
Ok(IndexReader::NRT(nrt_reader))
|
||||
}
|
||||
|
||||
/// Detects and removes the files that are not used by the index anymore.
|
||||
pub fn garbage_collect_files(
|
||||
&self,
|
||||
@@ -780,7 +799,6 @@ impl Drop for IndexWriter {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::super::operation::UserOperation;
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::error::LockError;
|
||||
@@ -1221,7 +1239,24 @@ mod tests {
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(idfield=>"myid"));
|
||||
let commit = index_writer.commit();
|
||||
assert!(commit.is_ok());
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_writer_reader() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let idfield = schema_builder.add_text_field("id", STRING);
|
||||
schema_builder.add_text_field("optfield", STRING);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
index_writer.add_document(doc!(idfield=>"myid"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
let reader = index_writer.reader(2).unwrap();
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.num_docs(), 1u64);
|
||||
index_writer.add_document(doc!(idfield=>"myid"));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
assert_eq!(reader.searcher().num_docs(), 2u64);
|
||||
assert_eq!(searcher.num_docs(), 1u64);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||
pub use self::prepared_commit::PreparedCommit;
|
||||
pub use self::segment_entry::SegmentEntry;
|
||||
pub use self::segment_manager::SegmentManager;
|
||||
pub(crate) use self::segment_manager::SegmentRegisters;
|
||||
pub use self::segment_serializer::SegmentSerializer;
|
||||
pub use self::segment_writer::SegmentWriter;
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ impl<'a> PreparedCommit<'a> {
|
||||
.segment_updater()
|
||||
.schedule_commit(self.opstamp, self.payload),
|
||||
);
|
||||
let _ = block_on(self.index_writer.trigger_commit());
|
||||
Ok(self.opstamp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,10 @@ impl SegmentEntry {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn segment(&self) -> &Segment {
|
||||
&self.segment
|
||||
}
|
||||
|
||||
/// Return a reference to the segment entry deleted bitset.
|
||||
///
|
||||
/// `DocId` in this bitset are flagged as deleted.
|
||||
|
||||
@@ -2,6 +2,7 @@ use super::segment_register::SegmentRegister;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::indexer::SegmentEntry;
|
||||
use crate::Segment;
|
||||
use std::collections::hash_set::HashSet;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
|
||||
@@ -12,15 +13,6 @@ pub(crate) struct SegmentRegisters {
|
||||
committed: SegmentRegister,
|
||||
}
|
||||
|
||||
impl SegmentRegisters {
|
||||
pub fn new(committed: SegmentRegister) -> SegmentRegisters {
|
||||
SegmentRegisters {
|
||||
uncommitted: Default::default(),
|
||||
committed,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub(crate) enum SegmentsStatus {
|
||||
Committed,
|
||||
@@ -28,6 +20,17 @@ pub(crate) enum SegmentsStatus {
|
||||
}
|
||||
|
||||
impl SegmentRegisters {
|
||||
pub fn new(committed: SegmentRegister) -> SegmentRegisters {
|
||||
SegmentRegisters {
|
||||
uncommitted: Default::default(),
|
||||
committed,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn committed_segment(&self) -> Vec<Segment> {
|
||||
self.committed.segments()
|
||||
}
|
||||
|
||||
/// Check if all the segments are committed or uncommited.
|
||||
///
|
||||
/// If some segment is missing or segments are in a different state (this should not happen
|
||||
|
||||
@@ -49,6 +49,13 @@ impl SegmentRegister {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn segments(&self) -> Vec<Segment> {
|
||||
self.segment_states
|
||||
.values()
|
||||
.map(|segment_entry| segment_entry.segment().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
|
||||
self.segment_states.values().cloned().collect()
|
||||
}
|
||||
@@ -104,6 +111,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::core::{SegmentId, SegmentMetaInventory};
|
||||
use crate::indexer::delete_queue::*;
|
||||
use crate::schema::Schema;
|
||||
|
||||
fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {
|
||||
segment_register
|
||||
|
||||
83
src/reader/index_writer_reader.rs
Normal file
83
src/reader/index_writer_reader.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use crate::directory::{WatchCallbackList, WatchHandle};
|
||||
use crate::indexer::SegmentRegisters;
|
||||
use crate::reader::pool::Pool;
|
||||
use crate::{Index, LeasedItem, Searcher, Segment, SegmentReader};
|
||||
use std::iter::repeat_with;
|
||||
use std::sync::{Arc, RwLock, Weak};
|
||||
|
||||
struct InnerNRTReader {
|
||||
num_searchers: usize,
|
||||
index: Index,
|
||||
searcher_pool: Pool<Searcher>,
|
||||
segment_registers: Arc<RwLock<SegmentRegisters>>,
|
||||
}
|
||||
|
||||
impl InnerNRTReader {
|
||||
fn load_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
|
||||
let segments: Vec<Segment> = {
|
||||
let rlock = self.segment_registers.read().unwrap();
|
||||
rlock.committed_segment()
|
||||
};
|
||||
segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
.collect::<crate::Result<Vec<SegmentReader>>>()
|
||||
}
|
||||
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
let segment_readers: Vec<SegmentReader> = self.load_segment_readers()?;
|
||||
let schema = self.index.schema();
|
||||
let searchers = repeat_with(|| {
|
||||
Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())
|
||||
})
|
||||
.take(self.num_searchers)
|
||||
.collect();
|
||||
self.searcher_pool.publish_new_generation(searchers);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.searcher_pool.acquire()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NRTReader {
|
||||
inner: Arc<InnerNRTReader>,
|
||||
watch_handle: WatchHandle,
|
||||
}
|
||||
|
||||
impl NRTReader {
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
self.inner.reload()
|
||||
}
|
||||
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.inner.searcher()
|
||||
}
|
||||
|
||||
pub(crate) fn create(
|
||||
num_searchers: usize,
|
||||
index: Index,
|
||||
segment_registers: Arc<RwLock<SegmentRegisters>>,
|
||||
watch_callback_list: &WatchCallbackList,
|
||||
) -> crate::Result<Self> {
|
||||
let inner_reader: Arc<InnerNRTReader> = Arc::new(InnerNRTReader {
|
||||
num_searchers,
|
||||
index,
|
||||
searcher_pool: Pool::new(),
|
||||
segment_registers,
|
||||
});
|
||||
let inner_reader_weak: Weak<InnerNRTReader> = Arc::downgrade(&inner_reader);
|
||||
let watch_handle = watch_callback_list.subscribe(Box::new(move || {
|
||||
if let Some(nrt_reader_arc) = inner_reader_weak.upgrade() {
|
||||
let _ = nrt_reader_arc.reload();
|
||||
}
|
||||
}));
|
||||
inner_reader.reload()?;
|
||||
Ok(NRTReader {
|
||||
inner: inner_reader,
|
||||
watch_handle,
|
||||
})
|
||||
}
|
||||
}
|
||||
180
src/reader/meta_file_reader.rs
Normal file
180
src/reader/meta_file_reader.rs
Normal file
@@ -0,0 +1,180 @@
|
||||
use super::pool::Pool;
|
||||
use crate::core::Segment;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::Searcher;
|
||||
use crate::SegmentReader;
|
||||
use crate::{Index, LeasedItem};
|
||||
use crate::{IndexReader, Result};
|
||||
use std::iter::repeat_with;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Defines when a new version of the index should be reloaded.
|
||||
///
|
||||
/// Regardless of whether you search and index in the same process, tantivy does not necessarily
|
||||
/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define
|
||||
/// when you want your index to be reloaded.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum ReloadPolicy {
|
||||
/// The index is entirely reloaded manually.
|
||||
/// All updates of the index should be manual.
|
||||
///
|
||||
/// No change is reflected automatically. You are required to call `.load_seacher()` manually.
|
||||
Manual,
|
||||
/// The index is reloaded within milliseconds after a new commit is available.
|
||||
/// This is made possible by watching changes in the `meta.json` file.
|
||||
OnCommit, // TODO add NEAR_REAL_TIME(target_ms)
|
||||
}
|
||||
|
||||
/// `IndexReader` builder
|
||||
///
|
||||
/// It makes it possible to set the following values.
|
||||
///
|
||||
/// - `num_searchers` (by default, the number of detected CPU threads):
|
||||
///
|
||||
/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block
|
||||
/// until the one of the searcher in-use gets released.
|
||||
/// - `reload_policy` (by default `ReloadPolicy::OnCommit`):
|
||||
///
|
||||
/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
|
||||
#[derive(Clone)]
|
||||
pub struct IndexReaderBuilder {
|
||||
num_searchers: usize,
|
||||
reload_policy: ReloadPolicy,
|
||||
index: Index,
|
||||
}
|
||||
|
||||
impl IndexReaderBuilder {
|
||||
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
|
||||
IndexReaderBuilder {
|
||||
num_searchers: num_cpus::get(),
|
||||
reload_policy: ReloadPolicy::OnCommit,
|
||||
index,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the reader.
|
||||
///
|
||||
/// Building the reader is a non-trivial operation that requires
|
||||
/// to open different segment readers. It may take hundreds of milliseconds
|
||||
/// of time and it may return an error.
|
||||
/// TODO(pmasurel) Use the `TryInto` trait once it is available in stable.
|
||||
pub fn try_into(self) -> Result<IndexReader> {
|
||||
let inner_reader = MetaFileIndexReaderInner {
|
||||
index: self.index,
|
||||
num_searchers: self.num_searchers,
|
||||
searcher_pool: Pool::new(),
|
||||
};
|
||||
inner_reader.reload()?;
|
||||
let inner_reader_arc = Arc::new(inner_reader);
|
||||
let watch_handle_opt: Option<WatchHandle>;
|
||||
match self.reload_policy {
|
||||
ReloadPolicy::Manual => {
|
||||
// No need to set anything...
|
||||
watch_handle_opt = None;
|
||||
}
|
||||
ReloadPolicy::OnCommit => {
|
||||
let inner_reader_arc_clone = inner_reader_arc.clone();
|
||||
let callback = move || {
|
||||
if let Err(err) = inner_reader_arc_clone.reload() {
|
||||
error!(
|
||||
"Error while loading searcher after commit was detected. {:?}",
|
||||
err
|
||||
);
|
||||
}
|
||||
};
|
||||
let watch_handle = inner_reader_arc
|
||||
.index
|
||||
.directory()
|
||||
.watch(Box::new(callback))?;
|
||||
watch_handle_opt = Some(watch_handle);
|
||||
}
|
||||
}
|
||||
Ok(IndexReader::from(MetaFileIndexReader {
|
||||
inner: inner_reader_arc,
|
||||
watch_handle_opt,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Sets the reload_policy.
|
||||
///
|
||||
/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
|
||||
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
|
||||
self.reload_policy = reload_policy;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the number of `Searcher` in the searcher pool.
|
||||
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
|
||||
self.num_searchers = num_searchers;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
struct MetaFileIndexReaderInner {
|
||||
num_searchers: usize,
|
||||
searcher_pool: Pool<Searcher>,
|
||||
index: Index,
|
||||
}
|
||||
|
||||
impl MetaFileIndexReaderInner {
|
||||
fn load_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
|
||||
// We keep the lock until we have effectively finished opening the
|
||||
// the `SegmentReader` because it prevents a diffferent process
|
||||
// to garbage collect these file while we open them.
|
||||
//
|
||||
// Once opened, on linux & mac, the mmap will remain valid after
|
||||
// the file has been deleted
|
||||
// On windows, the file deletion will fail.
|
||||
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
|
||||
let searchable_segments = self.searchable_segments()?;
|
||||
searchable_segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
.collect::<Result<_>>()
|
||||
}
|
||||
|
||||
fn reload(&self) -> crate::Result<()> {
|
||||
let segment_readers: Vec<SegmentReader> = self.load_segment_readers()?;
|
||||
let schema = self.index.schema();
|
||||
let searchers = repeat_with(|| {
|
||||
Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())
|
||||
})
|
||||
.take(self.num_searchers)
|
||||
.collect();
|
||||
self.searcher_pool.publish_new_generation(searchers);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the list of segments that are searchable
|
||||
fn searchable_segments(&self) -> crate::Result<Vec<Segment>> {
|
||||
self.index.searchable_segments()
|
||||
}
|
||||
|
||||
fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.searcher_pool.acquire()
|
||||
}
|
||||
}
|
||||
|
||||
/// `IndexReader` is your entry point to read and search the index.
|
||||
///
|
||||
/// It controls when a new version of the index should be loaded and lends
|
||||
/// you instances of `Searcher` for the last loaded version.
|
||||
///
|
||||
/// `Clone` does not clone the different pool of searcher. `IndexReader`
|
||||
/// just wraps and `Arc`.
|
||||
#[derive(Clone)]
|
||||
pub struct MetaFileIndexReader {
|
||||
inner: Arc<MetaFileIndexReaderInner>,
|
||||
watch_handle_opt: Option<WatchHandle>,
|
||||
}
|
||||
|
||||
impl MetaFileIndexReader {
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
self.inner.reload()
|
||||
}
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.inner.searcher()
|
||||
}
|
||||
}
|
||||
@@ -1,187 +1,17 @@
|
||||
mod index_writer_reader;
|
||||
mod meta_file_reader;
|
||||
mod pool;
|
||||
|
||||
use self::meta_file_reader::MetaFileIndexReader;
|
||||
pub use self::meta_file_reader::{IndexReaderBuilder, ReloadPolicy};
|
||||
pub use self::pool::LeasedItem;
|
||||
use self::pool::Pool;
|
||||
use crate::core::Segment;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::Index;
|
||||
use crate::Result;
|
||||
pub(crate) use crate::reader::index_writer_reader::{NRTReader};
|
||||
use crate::Searcher;
|
||||
use crate::SegmentReader;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
|
||||
//
|
||||
//enum SegmentSource {
|
||||
// FromMetaFile,
|
||||
// FromWriter(Arc<RwLock<SegmentRegisters>>),
|
||||
//}
|
||||
//
|
||||
//impl SegmentSource {
|
||||
// fn from_meta_file() -> SegmentSource {
|
||||
//
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
||||
/// Defines when a new version of the index should be reloaded.
|
||||
///
|
||||
/// Regardless of whether you search and index in the same process, tantivy does not necessarily
|
||||
/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define
|
||||
/// when you want your index to be reloaded.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum ReloadPolicy {
|
||||
/// The index is entirely reloaded manually.
|
||||
/// All updates of the index should be manual.
|
||||
///
|
||||
/// No change is reflected automatically. You are required to call `.load_seacher()` manually.
|
||||
Manual,
|
||||
/// The index is reloaded within milliseconds after a new commit is available.
|
||||
/// This is made possible by watching changes in the `meta.json` file.
|
||||
OnCommit, // TODO add NEAR_REAL_TIME(target_ms)
|
||||
}
|
||||
|
||||
/// `IndexReader` builder
|
||||
///
|
||||
/// It makes it possible to set the following values.
|
||||
///
|
||||
/// - `num_searchers` (by default, the number of detected CPU threads):
|
||||
///
|
||||
/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block
|
||||
/// until the one of the searcher in-use gets released.
|
||||
/// - `reload_policy` (by default `ReloadPolicy::OnCommit`):
|
||||
///
|
||||
/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
|
||||
#[derive(Clone)]
|
||||
pub struct IndexReaderBuilder {
|
||||
num_searchers: usize,
|
||||
reload_policy: ReloadPolicy,
|
||||
index: Index,
|
||||
}
|
||||
|
||||
impl IndexReaderBuilder {
|
||||
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
|
||||
IndexReaderBuilder {
|
||||
num_searchers: num_cpus::get(),
|
||||
reload_policy: ReloadPolicy::OnCommit,
|
||||
index,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the reader.
|
||||
///
|
||||
/// Building the reader is a non-trivial operation that requires
|
||||
/// to open different segment readers. It may take hundreds of milliseconds
|
||||
/// of time and it may return an error.
|
||||
/// TODO(pmasurel) Use the `TryInto` trait once it is available in stable.
|
||||
pub fn try_into(self) -> Result<IndexReader> {
|
||||
let inner_reader = InnerIndexReader {
|
||||
index: self.index,
|
||||
num_searchers: self.num_searchers,
|
||||
searcher_pool: Pool::new(),
|
||||
};
|
||||
inner_reader.reload()?;
|
||||
let inner_reader_arc = Arc::new(inner_reader);
|
||||
let watch_handle_opt: Option<WatchHandle>;
|
||||
match self.reload_policy {
|
||||
ReloadPolicy::Manual => {
|
||||
// No need to set anything...
|
||||
watch_handle_opt = None;
|
||||
}
|
||||
ReloadPolicy::OnCommit => {
|
||||
let inner_reader_arc_clone = inner_reader_arc.clone();
|
||||
let callback = move || {
|
||||
if let Err(err) = inner_reader_arc_clone.reload() {
|
||||
error!(
|
||||
"Error while loading searcher after commit was detected. {:?}",
|
||||
err
|
||||
);
|
||||
}
|
||||
};
|
||||
let watch_handle = inner_reader_arc
|
||||
.index
|
||||
.directory()
|
||||
.watch(Box::new(callback))?;
|
||||
watch_handle_opt = Some(watch_handle);
|
||||
}
|
||||
}
|
||||
Ok(IndexReader {
|
||||
inner: inner_reader_arc,
|
||||
watch_handle_opt,
|
||||
})
|
||||
}
|
||||
|
||||
/// Sets the reload_policy.
|
||||
///
|
||||
/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
|
||||
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
|
||||
self.reload_policy = reload_policy;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the number of `Searcher` in the searcher pool.
|
||||
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
|
||||
self.num_searchers = num_searchers;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
struct InnerIndexReader {
|
||||
num_searchers: usize,
|
||||
searcher_pool: Pool<Searcher>,
|
||||
index: Index,
|
||||
}
|
||||
|
||||
impl InnerIndexReader {
|
||||
fn load_segment_readers(&self) -> Result<Vec<SegmentReader>> {
|
||||
// We keep the lock until we have effectively finished opening the
|
||||
// the `SegmentReader` because it prevents a diffferent process
|
||||
// to garbage collect these file while we open them.
|
||||
//
|
||||
// Once opened, on linux & mac, the mmap will remain valid after
|
||||
// the file has been deleted
|
||||
// On windows, the file deletion will fail.
|
||||
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
|
||||
let searchable_segments = self.searchable_segments()?;
|
||||
searchable_segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
.collect::<Result<_>>()
|
||||
}
|
||||
|
||||
fn reload(&self) -> Result<()> {
|
||||
let segment_readers: Vec<SegmentReader> = self.load_segment_readers()?;
|
||||
let schema = self.index.schema();
|
||||
let searchers = (0..self.num_searchers)
|
||||
.map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()))
|
||||
.collect();
|
||||
self.searcher_pool.publish_new_generation(searchers);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the list of segments that are searchable
|
||||
fn searchable_segments(&self) -> Result<Vec<Segment>> {
|
||||
self.index.searchable_segments()
|
||||
}
|
||||
|
||||
fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.searcher_pool.acquire()
|
||||
}
|
||||
}
|
||||
|
||||
/// `IndexReader` is your entry point to read and search the index.
|
||||
///
|
||||
/// It controls when a new version of the index should be loaded and lends
|
||||
/// you instances of `Searcher` for the last loaded version.
|
||||
///
|
||||
/// `Clone` does not clone the different pool of searcher. `IndexReader`
|
||||
/// just wraps and `Arc`.
|
||||
#[derive(Clone)]
|
||||
pub struct IndexReader {
|
||||
inner: Arc<InnerIndexReader>,
|
||||
watch_handle_opt: Option<WatchHandle>,
|
||||
pub enum IndexReader {
|
||||
FromMetaFile(MetaFileIndexReader),
|
||||
NRT(NRTReader),
|
||||
}
|
||||
|
||||
impl IndexReader {
|
||||
@@ -194,8 +24,11 @@ impl IndexReader {
|
||||
///
|
||||
/// This automatic reload can take 10s of milliseconds to kick in however, and in unit tests
|
||||
/// it can be nice to deterministically force the reload of searchers.
|
||||
pub fn reload(&self) -> Result<()> {
|
||||
self.inner.reload()
|
||||
pub fn reload(&self) -> crate::Result<()> {
|
||||
match self {
|
||||
IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.reload(),
|
||||
IndexReader::NRT(nrt_reader) => nrt_reader.reload(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a searcher
|
||||
@@ -209,6 +42,21 @@ impl IndexReader {
|
||||
/// The same searcher must be used for a given query, as it ensures
|
||||
/// the use of a consistent segment set.
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.inner.searcher()
|
||||
match self {
|
||||
IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.searcher(),
|
||||
IndexReader::NRT(nrt_reader) => nrt_reader.searcher(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MetaFileIndexReader> for IndexReader {
|
||||
fn from(meta_file_reader: MetaFileIndexReader) -> Self {
|
||||
IndexReader::FromMetaFile(meta_file_reader)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NRTReader> for IndexReader {
|
||||
fn from(nrt_reader: NRTReader) -> Self {
|
||||
IndexReader::NRT(nrt_reader)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user