diff --git a/src/core/index.rs b/src/core/index.rs index 338aebbb2..5db404ca0 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -1,19 +1,14 @@ -use super::pool::LeasedItem; -use super::pool::Pool; use super::segment::create_segment; use super::segment::Segment; -use core::searcher::Searcher; use core::Executor; use core::IndexMeta; use core::SegmentId; use core::SegmentMeta; -use core::SegmentReader; use core::META_FILEPATH; use directory::ManagedDirectory; #[cfg(feature = "mmap")] use directory::MmapDirectory; use directory::INDEX_WRITER_LOCK; -use directory::META_LOCK; use directory::{Directory, RAMDirectory}; use error::DataCorruption; use error::TantivyError; @@ -21,6 +16,8 @@ use indexer::index_writer::open_index_writer; use indexer::index_writer::HEAP_SIZE_MIN; use indexer::segment_updater::save_new_metas; use num_cpus; +use reader::IndexReaderBuilder; +use reader::{IndexReader, ReloadPolicy}; use schema::Field; use schema::FieldType; use schema::Schema; @@ -28,7 +25,6 @@ use serde_json; use std::borrow::BorrowMut; use std::fmt; use std::path::Path; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokenizer::BoxedTokenizer; use tokenizer::TokenizerManager; @@ -52,8 +48,6 @@ fn load_metas(directory: &Directory) -> Result { pub struct Index { directory: ManagedDirectory, schema: Schema, - num_searchers: Arc, - searcher_pool: Arc>, executor: Arc, tokenizers: TokenizerManager, } @@ -158,16 +152,12 @@ impl Index { /// Creates a new index given a directory and an `IndexMeta`. fn create_from_metas(directory: ManagedDirectory, metas: &IndexMeta) -> Result { let schema = metas.schema.clone(); - let n_cpus = num_cpus::get(); let index = Index { directory, schema, - num_searchers: Arc::new(AtomicUsize::new(n_cpus)), - searcher_pool: Arc::new(Pool::new()), tokenizers: TokenizerManager::default(), executor: Arc::new(Executor::single_thread()), }; - index.load_searchers()?; Ok(index) } @@ -197,6 +187,14 @@ impl Index { } } + pub fn reader(&self, reload_policy: ReloadPolicy) -> IndexReader { + self.reader_builder().into() + } + + pub fn reader_builder(&self) -> IndexReaderBuilder { + IndexReaderBuilder::new(self.clone()) + } + /// Opens a new directory from an index path. #[cfg(feature = "mmap")] pub fn open_in_dir>(directory_path: P) -> Result { @@ -335,53 +333,6 @@ impl Index { .map(|segment_meta| segment_meta.id()) .collect()) } - - /// Sets the number of searchers to use - /// - /// Only works after the next call to `load_searchers` - pub fn set_num_searchers(&mut self, num_searchers: usize) { - self.num_searchers.store(num_searchers, Ordering::Release); - } - - /// Update searchers so that they reflect the state of the last - /// `.commit()`. - /// - /// If indexing happens in the same process as searching, - /// you most likely want to call `.load_searchers()` right after each - /// successful call to `.commit()`. - /// - /// If indexing and searching happen in different processes, the way to - /// get the freshest `index` at all time, is to watch `meta.json` and - /// call `load_searchers` whenever a changes happen. - pub fn load_searchers(&self) -> Result<()> { - let _meta_lock = self.directory().acquire_lock(&META_LOCK)?; - let searchable_segments = self.searchable_segments()?; - let segment_readers: Vec = searchable_segments - .iter() - .map(SegmentReader::open) - .collect::>()?; - let schema = self.schema(); - let num_searchers: usize = self.num_searchers.load(Ordering::Acquire); - let searchers = (0..num_searchers) - .map(|_| Searcher::new(schema.clone(), self.clone(), segment_readers.clone())) - .collect(); - self.searcher_pool.publish_new_generation(searchers); - Ok(()) - } - - /// Returns a searcher - /// - /// This method should be called every single time a search - /// query is performed. - /// The searchers are taken from a pool of `num_searchers` searchers. - /// If no searcher is available - /// this may block. - /// - /// The same searcher must be used for a given query, as it ensures - /// the use of a consistent segment set. - pub fn searcher(&self) -> LeasedItem { - self.searcher_pool.acquire() - } } impl fmt::Debug for Index { @@ -395,8 +346,6 @@ impl Clone for Index { Index { directory: self.directory.clone(), schema: self.schema.clone(), - num_searchers: Arc::clone(&self.num_searchers), - searcher_pool: Arc::clone(&self.searcher_pool), tokenizers: self.tokenizers.clone(), executor: self.executor.clone(), } diff --git a/src/core/mod.rs b/src/core/mod.rs index 1aae75f8b..2ead28c0b 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -2,7 +2,6 @@ mod executor; pub mod index; mod index_meta; mod inverted_index_reader; -mod pool; pub mod searcher; mod segment; mod segment_component; diff --git a/src/lib.rs b/src/lib.rs index 6b86ccd76..cee255652 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -210,6 +210,9 @@ pub mod space_usage; pub mod store; pub mod termdict; +mod reader; + +pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy}; mod snippet; pub use self::snippet::{Snippet, SnippetGenerator}; @@ -298,6 +301,7 @@ mod tests { use Index; use IndexWriter; use Postings; + use ReloadPolicy; pub fn assert_nearly_equals(expected: f32, val: f32) { assert!( @@ -386,7 +390,8 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); + let index_reader = index.reader(ReloadPolicy::MANUAL); + indexer_reader.load_searchers().unwrap(); let searcher = index.searcher(); let term_a = Term::from_field_text(text_field, "a"); assert_eq!(searcher.doc_freq(&term_a), 3); diff --git a/src/positions/reader.rs b/src/positions/reader.rs index 750d44cb1..cd8b5f950 100644 --- a/src/positions/reader.rs +++ b/src/positions/reader.rs @@ -151,7 +151,8 @@ impl PositionReader { if self.ahead != Some(0) { // the block currently available is not the block // for the current position - self.bit_packer.decompress(position_data, self.buffer.as_mut(), num_bits); + self.bit_packer + .decompress(position_data, self.buffer.as_mut(), num_bits); self.ahead = Some(0); } let block_len = compressed_block_size(num_bits); diff --git a/src/positions/serializer.rs b/src/positions/serializer.rs index 79660ff05..773be5e14 100644 --- a/src/positions/serializer.rs +++ b/src/positions/serializer.rs @@ -1,9 +1,9 @@ use bitpacking::BitPacker; +use bitpacking::BitPacker4x; use common::BinarySerializable; use common::CountingWriter; use positions::{COMPRESSION_BLOCK_SIZE, LONG_SKIP_INTERVAL}; use std::io::{self, Write}; -use bitpacking::BitPacker4x; pub struct PositionSerializer { bit_packer: BitPacker4x, @@ -53,7 +53,9 @@ impl PositionSerializer { fn flush_block(&mut self) -> io::Result<()> { let num_bits = self.bit_packer.num_bits(&self.block[..]); self.write_skiplist.write_all(&[num_bits])?; - let written_len = self.bit_packer.compress(&self.block[..], &mut self.buffer, num_bits); + let written_len = self + .bit_packer + .compress(&self.block[..], &mut self.buffer, num_bits); self.write_stream.write_all(&self.buffer[..written_len])?; self.block.clear(); if (self.num_ints % LONG_SKIP_INTERVAL) == 0u64 { diff --git a/src/reader/mod.rs b/src/reader/mod.rs new file mode 100644 index 000000000..d1ad22b41 --- /dev/null +++ b/src/reader/mod.rs @@ -0,0 +1,123 @@ +mod pool; + +use self::pool::{LeasedItem, Pool}; +use core::Segment; +use directory::Directory; +use directory::META_LOCK; +use std::sync::Arc; +use Index; +use Result; +use Searcher; +use SegmentReader; + +#[derive(Clone, Copy)] +pub enum ReloadPolicy { + MANUAL, + // NEAR_REAL_TIME(target_ms), + ON_COMMIT, +} + +#[derive(Clone)] +pub struct IndexReaderBuilder { + num_searchers: usize, + reload_policy: ReloadPolicy, + index: Index, +} + +impl IndexReaderBuilder { + pub(crate) fn new(index: Index) -> IndexReaderBuilder { + IndexReaderBuilder { + num_searchers: num_cpus::get(), + reload_policy: ReloadPolicy::MANUAL, + index, + } + } +} + +impl Into for IndexReaderBuilder { + fn into(self) -> IndexReader { + IndexReader::new(self.index, self.num_searchers, self.reload_policy) + } +} + +struct InnerIndexReader { + num_searchers: usize, + searcher_pool: Pool, + reload_policy: ReloadPolicy, + index: Index, +} + +impl InnerIndexReader { + fn load_searchers(&self) -> Result<()> { + let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; + let searchable_segments = self.searchable_segments()?; + let segment_readers: Vec = searchable_segments + .iter() + .map(SegmentReader::open) + .collect::>()?; + let schema = self.index.schema(); + let searchers = (0..self.num_searchers) + .map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())) + .collect(); + self.searcher_pool.publish_new_generation(searchers); + Ok(()) + } + + /// Returns the list of segments that are searchable + fn searchable_segments(&self) -> Result> { + self.index.searchable_segments() + } + + fn searcher(&self) -> LeasedItem { + self.searcher_pool.acquire() + } +} + +pub struct IndexReader { + inner: Arc, +} + +impl IndexReader { + /// Update searchers so that they reflect the state of the last + /// `.commit()`. + /// + /// If indexing happens in the same process as searching, + /// you most likely want to call `.load_searchers()` right after each + /// successful call to `.commit()`. + /// + /// If indexing and searching happen in different processes, the way to + /// get the freshest `index` at all time, is to watch `meta.json` and + /// call `load_searchers` whenever a changes happen. + pub fn load_searchers(&self) -> Result<()> { + self.inner.load_searchers() + } + + /// Returns a searcher + /// + /// This method should be called every single time a search + /// query is performed. + /// The searchers are taken from a pool of `num_searchers` searchers. + /// If no searcher is available + /// this may block. + /// + /// The same searcher must be used for a given query, as it ensures + /// the use of a consistent segment set. + pub fn searcher(&self) -> LeasedItem { + self.inner.searcher() + } + + pub(crate) fn new( + index: Index, + num_searchers: usize, + reload_policy: ReloadPolicy, + ) -> IndexReader { + IndexReader { + inner: Arc::new(InnerIndexReader { + index, + num_searchers, + searcher_pool: Pool::new(), + reload_policy, + }), + } + } +} diff --git a/src/core/pool.rs b/src/reader/pool.rs similarity index 100% rename from src/core/pool.rs rename to src/reader/pool.rs