diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index d10c15fa0..028872b27 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -57,6 +57,68 @@ pub struct SegmentReader { } impl SegmentReader { + /// Open a new segment for reading. + pub fn open(segment: &Segment) -> Result { + let termdict_source = segment.open_read(SegmentComponent::TERMS)?; + let termdict_composite = CompositeFile::open(&termdict_source)?; + + let store_source = segment.open_read(SegmentComponent::STORE)?; + + fail_point!("SegmentReader::open#middle"); + + let postings_source = segment.open_read(SegmentComponent::POSTINGS)?; + let postings_composite = CompositeFile::open(&postings_source)?; + + let positions_composite = { + if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) { + CompositeFile::open(&source)? + } else { + CompositeFile::empty() + } + }; + + let positions_idx_composite = { + if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) { + CompositeFile::open(&source)? + } else { + CompositeFile::empty() + } + }; + + let schema = segment.schema(); + + let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?; + let fast_fields_composite = CompositeFile::open(&fast_fields_data)?; + let fast_field_readers = + Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?); + + let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?; + let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?; + + let delete_bitset_opt = if segment.meta().has_deletes() { + let delete_data = segment.open_read(SegmentComponent::DELETE)?; + Some(DeleteBitSet::open(delete_data)) + } else { + None + }; + + Ok(SegmentReader { + inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())), + max_doc: segment.meta().max_doc(), + num_docs: segment.meta().num_docs(), + termdict_composite, + postings_composite, + fast_fields_readers: fast_field_readers, + fieldnorms_composite, + segment_id: segment.id(), + store_source, + delete_bitset_opt, + positions_composite, + positions_idx_composite, + schema, + }) + } + /// Returns the highest document id ever attributed in /// this segment + 1. /// Today, `tantivy` does not handle deletes, so it happens @@ -144,68 +206,6 @@ impl SegmentReader { StoreReader::from_source(self.store_source.clone()) } - /// Open a new segment for reading. - pub fn open(segment: &Segment) -> Result { - let termdict_source = segment.open_read(SegmentComponent::TERMS)?; - let termdict_composite = CompositeFile::open(&termdict_source)?; - - let store_source = segment.open_read(SegmentComponent::STORE)?; - - fail_point!("SegmentReader::open#middle"); - - let postings_source = segment.open_read(SegmentComponent::POSTINGS)?; - let postings_composite = CompositeFile::open(&postings_source)?; - - let positions_composite = { - if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) { - CompositeFile::open(&source)? - } else { - CompositeFile::empty() - } - }; - - let positions_idx_composite = { - if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) { - CompositeFile::open(&source)? - } else { - CompositeFile::empty() - } - }; - - let schema = segment.schema(); - - let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?; - let fast_fields_composite = CompositeFile::open(&fast_fields_data)?; - let fast_field_readers = - Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?); - - let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?; - let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?; - - let delete_bitset_opt = if segment.meta().has_deletes() { - let delete_data = segment.open_read(SegmentComponent::DELETE)?; - Some(DeleteBitSet::open(delete_data)) - } else { - None - }; - - Ok(SegmentReader { - inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())), - max_doc: segment.meta().max_doc(), - num_docs: segment.meta().num_docs(), - termdict_composite, - postings_composite, - fast_fields_readers: fast_field_readers, - fieldnorms_composite, - segment_id: segment.id(), - store_source, - delete_bitset_opt, - positions_composite, - positions_idx_composite, - schema, - }) - } - /// Returns a field reader associated to the field given in argument. /// If the field was not present in the index during indexing time, /// the InvertedIndexReader is empty. diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 692c88ce3..d818eb6c5 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -8,8 +8,8 @@ use crate::core::SegmentComponent; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::SegmentReader; -use crate::directory::TerminatingWrite; use crate::directory::{DirectoryLock, GarbageCollectionResult}; +use crate::directory::{TerminatingWrite, WatchCallbackList}; use crate::docset::DocSet; use crate::error::TantivyError; use crate::fastfield::write_delete_bitset; @@ -22,16 +22,16 @@ use crate::indexer::stamper::Stamper; use crate::indexer::MergePolicy; use crate::indexer::SegmentEntry; use crate::indexer::SegmentWriter; +use crate::reader::NRTReader; use crate::schema::Document; use crate::schema::IndexRecordOption; use crate::schema::Term; use crate::tokenizer::TokenizerManager; -use crate::Opstamp; +use crate::{IndexReader, Opstamp}; use crossbeam::channel; use futures::executor::block_on; use futures::future::Future; -use smallvec::smallvec; -use smallvec::SmallVec; +use smallvec::{smallvec, SmallVec}; use std::mem; use std::ops::Range; use std::sync::{Arc, RwLock}; @@ -92,6 +92,8 @@ pub struct IndexWriter { stamper: Stamper, committed_opstamp: Opstamp, + + on_commit: WatchCallbackList, } fn compute_deleted_bitset( @@ -344,6 +346,8 @@ impl IndexWriter { stamper, worker_id: 0, + + on_commit: Default::default(), }; index_writer.start_workers()?; Ok(index_writer) @@ -464,6 +468,21 @@ impl IndexWriter { Ok(()) } + // TODO move me + pub(crate) fn trigger_commit(&self) -> impl Future { + self.on_commit.broadcast() + } + + pub fn reader(&self, num_searchers: usize) -> crate::Result { + let nrt_reader = NRTReader::create( + num_searchers, + self.index.clone(), + self.segment_registers.clone(), + &self.on_commit, + )?; + Ok(IndexReader::NRT(nrt_reader)) + } + /// Detects and removes the files that are not used by the index anymore. pub fn garbage_collect_files( &self, @@ -780,7 +799,6 @@ impl Drop for IndexWriter { #[cfg(test)] mod tests { - use super::super::operation::UserOperation; use crate::collector::TopDocs; use crate::directory::error::LockError; @@ -1221,7 +1239,24 @@ mod tests { let index = Index::create_in_ram(schema_builder.build()); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); index_writer.add_document(doc!(idfield=>"myid")); - let commit = index_writer.commit(); - assert!(commit.is_ok()); + assert!(index_writer.commit().is_ok()); + } + + #[test] + fn test_index_writer_reader() { + let mut schema_builder = schema::Schema::builder(); + let idfield = schema_builder.add_text_field("id", STRING); + schema_builder.add_text_field("optfield", STRING); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + index_writer.add_document(doc!(idfield=>"myid")); + assert!(index_writer.commit().is_ok()); + let reader = index_writer.reader(2).unwrap(); + let searcher = reader.searcher(); + assert_eq!(searcher.num_docs(), 1u64); + index_writer.add_document(doc!(idfield=>"myid")); + assert!(index_writer.commit().is_ok()); + assert_eq!(reader.searcher().num_docs(), 2u64); + assert_eq!(searcher.num_docs(), 1u64); } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index fc7be1e27..0dbb55120 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -23,6 +23,7 @@ pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy}; pub use self::prepared_commit::PreparedCommit; pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; +pub(crate) use self::segment_manager::SegmentRegisters; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index e0888f02f..204af68e5 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -38,6 +38,7 @@ impl<'a> PreparedCommit<'a> { .segment_updater() .schedule_commit(self.opstamp, self.payload), ); + let _ = block_on(self.index_writer.trigger_commit()); Ok(self.opstamp) } } diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index c949255df..60db7ddb2 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -46,6 +46,10 @@ impl SegmentEntry { Ok(()) } + pub fn segment(&self) -> &Segment { + &self.segment + } + /// Return a reference to the segment entry deleted bitset. /// /// `DocId` in this bitset are flagged as deleted. diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 795526d27..4cc0a04d4 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -2,6 +2,7 @@ use super::segment_register::SegmentRegister; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::indexer::SegmentEntry; +use crate::Segment; use std::collections::hash_set::HashSet; use std::sync::{Arc, RwLock}; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; @@ -12,15 +13,6 @@ pub(crate) struct SegmentRegisters { committed: SegmentRegister, } -impl SegmentRegisters { - pub fn new(committed: SegmentRegister) -> SegmentRegisters { - SegmentRegisters { - uncommitted: Default::default(), - committed, - } - } -} - #[derive(PartialEq, Eq)] pub(crate) enum SegmentsStatus { Committed, @@ -28,6 +20,17 @@ pub(crate) enum SegmentsStatus { } impl SegmentRegisters { + pub fn new(committed: SegmentRegister) -> SegmentRegisters { + SegmentRegisters { + uncommitted: Default::default(), + committed, + } + } + + pub fn committed_segment(&self) -> Vec { + self.committed.segments() + } + /// Check if all the segments are committed or uncommited. /// /// If some segment is missing or segments are in a different state (this should not happen diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 5fa025f0d..bb07100b3 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -49,6 +49,13 @@ impl SegmentRegister { .collect() } + pub fn segments(&self) -> Vec { + self.segment_states + .values() + .map(|segment_entry| segment_entry.segment().clone()) + .collect() + } + pub fn segment_entries(&self) -> Vec { self.segment_states.values().cloned().collect() } @@ -104,6 +111,7 @@ mod tests { use super::*; use crate::core::{SegmentId, SegmentMetaInventory}; use crate::indexer::delete_queue::*; + use crate::schema::Schema; fn segment_ids(segment_register: &SegmentRegister) -> Vec { segment_register diff --git a/src/reader/index_writer_reader.rs b/src/reader/index_writer_reader.rs new file mode 100644 index 000000000..4e6829c77 --- /dev/null +++ b/src/reader/index_writer_reader.rs @@ -0,0 +1,83 @@ +use crate::directory::{WatchCallbackList, WatchHandle}; +use crate::indexer::SegmentRegisters; +use crate::reader::pool::Pool; +use crate::{Index, LeasedItem, Searcher, Segment, SegmentReader}; +use std::iter::repeat_with; +use std::sync::{Arc, RwLock, Weak}; + +struct InnerNRTReader { + num_searchers: usize, + index: Index, + searcher_pool: Pool, + segment_registers: Arc>, +} + +impl InnerNRTReader { + fn load_segment_readers(&self) -> crate::Result> { + let segments: Vec = { + let rlock = self.segment_registers.read().unwrap(); + rlock.committed_segment() + }; + segments + .iter() + .map(SegmentReader::open) + .collect::>>() + } + + pub fn reload(&self) -> crate::Result<()> { + let segment_readers: Vec = self.load_segment_readers()?; + let schema = self.index.schema(); + let searchers = repeat_with(|| { + Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()) + }) + .take(self.num_searchers) + .collect(); + self.searcher_pool.publish_new_generation(searchers); + Ok(()) + } + + pub fn searcher(&self) -> LeasedItem { + self.searcher_pool.acquire() + } +} + +#[derive(Clone)] +pub struct NRTReader { + inner: Arc, + watch_handle: WatchHandle, +} + +impl NRTReader { + pub fn reload(&self) -> crate::Result<()> { + self.inner.reload() + } + + pub fn searcher(&self) -> LeasedItem { + self.inner.searcher() + } + + pub(crate) fn create( + num_searchers: usize, + index: Index, + segment_registers: Arc>, + watch_callback_list: &WatchCallbackList, + ) -> crate::Result { + let inner_reader: Arc = Arc::new(InnerNRTReader { + num_searchers, + index, + searcher_pool: Pool::new(), + segment_registers, + }); + let inner_reader_weak: Weak = Arc::downgrade(&inner_reader); + let watch_handle = watch_callback_list.subscribe(Box::new(move || { + if let Some(nrt_reader_arc) = inner_reader_weak.upgrade() { + let _ = nrt_reader_arc.reload(); + } + })); + inner_reader.reload()?; + Ok(NRTReader { + inner: inner_reader, + watch_handle, + }) + } +} diff --git a/src/reader/meta_file_reader.rs b/src/reader/meta_file_reader.rs new file mode 100644 index 000000000..4f30b6051 --- /dev/null +++ b/src/reader/meta_file_reader.rs @@ -0,0 +1,180 @@ +use super::pool::Pool; +use crate::core::Segment; +use crate::directory::Directory; +use crate::directory::WatchHandle; +use crate::directory::META_LOCK; +use crate::Searcher; +use crate::SegmentReader; +use crate::{Index, LeasedItem}; +use crate::{IndexReader, Result}; +use std::iter::repeat_with; +use std::sync::Arc; + +/// Defines when a new version of the index should be reloaded. +/// +/// Regardless of whether you search and index in the same process, tantivy does not necessarily +/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define +/// when you want your index to be reloaded. +#[derive(Clone, Copy)] +pub enum ReloadPolicy { + /// The index is entirely reloaded manually. + /// All updates of the index should be manual. + /// + /// No change is reflected automatically. You are required to call `.load_seacher()` manually. + Manual, + /// The index is reloaded within milliseconds after a new commit is available. + /// This is made possible by watching changes in the `meta.json` file. + OnCommit, // TODO add NEAR_REAL_TIME(target_ms) +} + +/// `IndexReader` builder +/// +/// It makes it possible to set the following values. +/// +/// - `num_searchers` (by default, the number of detected CPU threads): +/// +/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block +/// until the one of the searcher in-use gets released. +/// - `reload_policy` (by default `ReloadPolicy::OnCommit`): +/// +/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. +#[derive(Clone)] +pub struct IndexReaderBuilder { + num_searchers: usize, + reload_policy: ReloadPolicy, + index: Index, +} + +impl IndexReaderBuilder { + pub(crate) fn new(index: Index) -> IndexReaderBuilder { + IndexReaderBuilder { + num_searchers: num_cpus::get(), + reload_policy: ReloadPolicy::OnCommit, + index, + } + } + + /// Builds the reader. + /// + /// Building the reader is a non-trivial operation that requires + /// to open different segment readers. It may take hundreds of milliseconds + /// of time and it may return an error. + /// TODO(pmasurel) Use the `TryInto` trait once it is available in stable. + pub fn try_into(self) -> Result { + let inner_reader = MetaFileIndexReaderInner { + index: self.index, + num_searchers: self.num_searchers, + searcher_pool: Pool::new(), + }; + inner_reader.reload()?; + let inner_reader_arc = Arc::new(inner_reader); + let watch_handle_opt: Option; + match self.reload_policy { + ReloadPolicy::Manual => { + // No need to set anything... + watch_handle_opt = None; + } + ReloadPolicy::OnCommit => { + let inner_reader_arc_clone = inner_reader_arc.clone(); + let callback = move || { + if let Err(err) = inner_reader_arc_clone.reload() { + error!( + "Error while loading searcher after commit was detected. {:?}", + err + ); + } + }; + let watch_handle = inner_reader_arc + .index + .directory() + .watch(Box::new(callback))?; + watch_handle_opt = Some(watch_handle); + } + } + Ok(IndexReader::from(MetaFileIndexReader { + inner: inner_reader_arc, + watch_handle_opt, + })) + } + + /// Sets the reload_policy. + /// + /// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. + pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder { + self.reload_policy = reload_policy; + self + } + + /// Sets the number of `Searcher` in the searcher pool. + pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder { + self.num_searchers = num_searchers; + self + } +} + +struct MetaFileIndexReaderInner { + num_searchers: usize, + searcher_pool: Pool, + index: Index, +} + +impl MetaFileIndexReaderInner { + fn load_segment_readers(&self) -> crate::Result> { + // We keep the lock until we have effectively finished opening the + // the `SegmentReader` because it prevents a diffferent process + // to garbage collect these file while we open them. + // + // Once opened, on linux & mac, the mmap will remain valid after + // the file has been deleted + // On windows, the file deletion will fail. + let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; + let searchable_segments = self.searchable_segments()?; + searchable_segments + .iter() + .map(SegmentReader::open) + .collect::>() + } + + fn reload(&self) -> crate::Result<()> { + let segment_readers: Vec = self.load_segment_readers()?; + let schema = self.index.schema(); + let searchers = repeat_with(|| { + Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()) + }) + .take(self.num_searchers) + .collect(); + self.searcher_pool.publish_new_generation(searchers); + Ok(()) + } + + /// Returns the list of segments that are searchable + fn searchable_segments(&self) -> crate::Result> { + self.index.searchable_segments() + } + + fn searcher(&self) -> LeasedItem { + self.searcher_pool.acquire() + } +} + +/// `IndexReader` is your entry point to read and search the index. +/// +/// It controls when a new version of the index should be loaded and lends +/// you instances of `Searcher` for the last loaded version. +/// +/// `Clone` does not clone the different pool of searcher. `IndexReader` +/// just wraps and `Arc`. +#[derive(Clone)] +pub struct MetaFileIndexReader { + inner: Arc, + watch_handle_opt: Option, +} + +impl MetaFileIndexReader { + pub fn reload(&self) -> crate::Result<()> { + self.inner.reload() + } + pub fn searcher(&self) -> LeasedItem { + self.inner.searcher() + } +} diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 247154c87..783717825 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,187 +1,17 @@ +mod index_writer_reader; +mod meta_file_reader; mod pool; +use self::meta_file_reader::MetaFileIndexReader; +pub use self::meta_file_reader::{IndexReaderBuilder, ReloadPolicy}; pub use self::pool::LeasedItem; -use self::pool::Pool; -use crate::core::Segment; -use crate::directory::Directory; -use crate::directory::WatchHandle; -use crate::directory::META_LOCK; -use crate::Index; -use crate::Result; +pub(crate) use crate::reader::index_writer_reader::{NRTReader}; use crate::Searcher; -use crate::SegmentReader; -use std::sync::{Arc, RwLock}; - -// -//enum SegmentSource { -// FromMetaFile, -// FromWriter(Arc>), -//} -// -//impl SegmentSource { -// fn from_meta_file() -> SegmentSource { -// -// } -// -//} - -/// Defines when a new version of the index should be reloaded. -/// -/// Regardless of whether you search and index in the same process, tantivy does not necessarily -/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define -/// when you want your index to be reloaded. -#[derive(Clone, Copy)] -pub enum ReloadPolicy { - /// The index is entirely reloaded manually. - /// All updates of the index should be manual. - /// - /// No change is reflected automatically. You are required to call `.load_seacher()` manually. - Manual, - /// The index is reloaded within milliseconds after a new commit is available. - /// This is made possible by watching changes in the `meta.json` file. - OnCommit, // TODO add NEAR_REAL_TIME(target_ms) -} - -/// `IndexReader` builder -/// -/// It makes it possible to set the following values. -/// -/// - `num_searchers` (by default, the number of detected CPU threads): -/// -/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block -/// until the one of the searcher in-use gets released. -/// - `reload_policy` (by default `ReloadPolicy::OnCommit`): -/// -/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. #[derive(Clone)] -pub struct IndexReaderBuilder { - num_searchers: usize, - reload_policy: ReloadPolicy, - index: Index, -} - -impl IndexReaderBuilder { - pub(crate) fn new(index: Index) -> IndexReaderBuilder { - IndexReaderBuilder { - num_searchers: num_cpus::get(), - reload_policy: ReloadPolicy::OnCommit, - index, - } - } - - /// Builds the reader. - /// - /// Building the reader is a non-trivial operation that requires - /// to open different segment readers. It may take hundreds of milliseconds - /// of time and it may return an error. - /// TODO(pmasurel) Use the `TryInto` trait once it is available in stable. - pub fn try_into(self) -> Result { - let inner_reader = InnerIndexReader { - index: self.index, - num_searchers: self.num_searchers, - searcher_pool: Pool::new(), - }; - inner_reader.reload()?; - let inner_reader_arc = Arc::new(inner_reader); - let watch_handle_opt: Option; - match self.reload_policy { - ReloadPolicy::Manual => { - // No need to set anything... - watch_handle_opt = None; - } - ReloadPolicy::OnCommit => { - let inner_reader_arc_clone = inner_reader_arc.clone(); - let callback = move || { - if let Err(err) = inner_reader_arc_clone.reload() { - error!( - "Error while loading searcher after commit was detected. {:?}", - err - ); - } - }; - let watch_handle = inner_reader_arc - .index - .directory() - .watch(Box::new(callback))?; - watch_handle_opt = Some(watch_handle); - } - } - Ok(IndexReader { - inner: inner_reader_arc, - watch_handle_opt, - }) - } - - /// Sets the reload_policy. - /// - /// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. - pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder { - self.reload_policy = reload_policy; - self - } - - /// Sets the number of `Searcher` in the searcher pool. - pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder { - self.num_searchers = num_searchers; - self - } -} - -struct InnerIndexReader { - num_searchers: usize, - searcher_pool: Pool, - index: Index, -} - -impl InnerIndexReader { - fn load_segment_readers(&self) -> Result> { - // We keep the lock until we have effectively finished opening the - // the `SegmentReader` because it prevents a diffferent process - // to garbage collect these file while we open them. - // - // Once opened, on linux & mac, the mmap will remain valid after - // the file has been deleted - // On windows, the file deletion will fail. - let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; - let searchable_segments = self.searchable_segments()?; - searchable_segments - .iter() - .map(SegmentReader::open) - .collect::>() - } - - fn reload(&self) -> Result<()> { - let segment_readers: Vec = self.load_segment_readers()?; - let schema = self.index.schema(); - let searchers = (0..self.num_searchers) - .map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())) - .collect(); - self.searcher_pool.publish_new_generation(searchers); - Ok(()) - } - - /// Returns the list of segments that are searchable - fn searchable_segments(&self) -> Result> { - self.index.searchable_segments() - } - - fn searcher(&self) -> LeasedItem { - self.searcher_pool.acquire() - } -} - -/// `IndexReader` is your entry point to read and search the index. -/// -/// It controls when a new version of the index should be loaded and lends -/// you instances of `Searcher` for the last loaded version. -/// -/// `Clone` does not clone the different pool of searcher. `IndexReader` -/// just wraps and `Arc`. -#[derive(Clone)] -pub struct IndexReader { - inner: Arc, - watch_handle_opt: Option, +pub enum IndexReader { + FromMetaFile(MetaFileIndexReader), + NRT(NRTReader), } impl IndexReader { @@ -194,8 +24,11 @@ impl IndexReader { /// /// This automatic reload can take 10s of milliseconds to kick in however, and in unit tests /// it can be nice to deterministically force the reload of searchers. - pub fn reload(&self) -> Result<()> { - self.inner.reload() + pub fn reload(&self) -> crate::Result<()> { + match self { + IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.reload(), + IndexReader::NRT(nrt_reader) => nrt_reader.reload(), + } } /// Returns a searcher @@ -209,6 +42,21 @@ impl IndexReader { /// The same searcher must be used for a given query, as it ensures /// the use of a consistent segment set. pub fn searcher(&self) -> LeasedItem { - self.inner.searcher() + match self { + IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.searcher(), + IndexReader::NRT(nrt_reader) => nrt_reader.searcher(), + } + } +} + +impl From for IndexReader { + fn from(meta_file_reader: MetaFileIndexReader) -> Self { + IndexReader::FromMetaFile(meta_file_reader) + } +} + +impl From for IndexReader { + fn from(nrt_reader: NRTReader) -> Self { + IndexReader::NRT(nrt_reader) } }