remove searcher pool and make Searcher cloneable (#1411)

* remove searcher pool and make Searcher cloneable

closes #1410

* use SearcherInner in InnerIndexReader
This commit is contained in:
PSeitz
2022-07-12 02:07:48 -07:00
committed by GitHub
parent a4be239d38
commit 23fe73a6c0
13 changed files with 152 additions and 410 deletions

View File

@@ -3,7 +3,7 @@ Tantivy 0.19
- Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396) - Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396)
The `DateTime` type has been updated to hold timestamps with microseconds precision. The `DateTime` type has been updated to hold timestamps with microseconds precision.
`DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing). `DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing).
- Remove Searcher pool and make `Searcher` cloneable.
Tantivy 0.18 Tantivy 0.18
================================ ================================

View File

@@ -59,6 +59,7 @@ measure_time = "0.8.2"
pretty_assertions = "1.2.1" pretty_assertions = "1.2.1"
serde_cbor = { version = "0.11.2", optional = true } serde_cbor = { version = "0.11.2", optional = true }
async-trait = "0.1.53" async-trait = "0.1.53"
arc-swap = "1.5.0"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
winapi = "0.3.9" winapi = "0.3.9"

View File

@@ -145,11 +145,7 @@ fn main() -> tantivy::Result<()> {
let warmers: Vec<Weak<dyn Warmer>> = vec![Arc::downgrade( let warmers: Vec<Weak<dyn Warmer>> = vec![Arc::downgrade(
&(price_dynamic_column.clone() as Arc<dyn Warmer>), &(price_dynamic_column.clone() as Arc<dyn Warmer>),
)]; )];
let reader: IndexReader = index let reader: IndexReader = index.reader_builder().warmers(warmers).try_into()?;
.reader_builder()
.warmers(warmers)
.num_searchers(1)
.try_into()?;
reader.reload()?; reader.reload()?;
let query_parser = QueryParser::for_index(&index, vec![text]); let query_parser = QueryParser::for_index(&index, vec![text]);

View File

@@ -263,7 +263,7 @@ impl SegmentCollector for BytesFastFieldSegmentCollector {
} }
} }
fn make_test_searcher() -> crate::Result<crate::LeasedItem<Searcher>> { fn make_test_searcher() -> crate::Result<Searcher> {
let schema = Schema::builder().build(); let schema = Schema::builder().build();
let index = Index::create_in_ram(schema); let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?; let mut index_writer = index.writer_for_tests()?;

View File

@@ -1,4 +1,5 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc;
use std::{fmt, io}; use std::{fmt, io};
use crate::collector::Collector; use crate::collector::Collector;
@@ -62,45 +63,20 @@ impl SearcherGeneration {
/// ///
/// It guarantees that the `Segment` will not be removed before /// It guarantees that the `Segment` will not be removed before
/// the destruction of the `Searcher`. /// the destruction of the `Searcher`.
#[derive(Clone)]
pub struct Searcher { pub struct Searcher {
schema: Schema, inner: Arc<SearcherInner>,
index: Index,
segment_readers: Vec<SegmentReader>,
store_readers: Vec<StoreReader>,
generation: TrackedObject<SearcherGeneration>,
} }
impl Searcher { impl Searcher {
/// Creates a new `Searcher`
pub(crate) fn new(
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
) -> io::Result<Searcher> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.collect::<io::Result<Vec<_>>>()?;
Ok(Searcher {
schema,
index,
segment_readers,
store_readers,
generation,
})
}
/// Returns the `Index` associated to the `Searcher` /// Returns the `Index` associated to the `Searcher`
pub fn index(&self) -> &Index { pub fn index(&self) -> &Index {
&self.index &self.inner.index
} }
/// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`. /// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`.
pub fn generation(&self) -> &SearcherGeneration { pub fn generation(&self) -> &SearcherGeneration {
self.generation.as_ref() self.inner.generation.as_ref()
} }
/// Fetches a document from tantivy's store given a `DocAddress`. /// Fetches a document from tantivy's store given a `DocAddress`.
@@ -108,7 +84,7 @@ impl Searcher {
/// The searcher uses the segment ordinal to route the /// The searcher uses the segment ordinal to route the
/// the request to the right `Segment`. /// the request to the right `Segment`.
pub fn doc(&self, doc_address: DocAddress) -> crate::Result<Document> { pub fn doc(&self, doc_address: DocAddress) -> crate::Result<Document> {
let store_reader = &self.store_readers[doc_address.segment_ord as usize]; let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
store_reader.get(doc_address.doc_id) store_reader.get(doc_address.doc_id)
} }
@@ -117,6 +93,7 @@ impl Searcher {
/// Aggregates the sum for each segment store reader. /// Aggregates the sum for each segment store reader.
pub fn doc_store_cache_stats(&self) -> CacheStats { pub fn doc_store_cache_stats(&self) -> CacheStats {
let cache_stats: CacheStats = self let cache_stats: CacheStats = self
.inner
.store_readers .store_readers
.iter() .iter()
.map(|reader| reader.cache_stats()) .map(|reader| reader.cache_stats())
@@ -127,18 +104,19 @@ impl Searcher {
/// Fetches a document in an asynchronous manner. /// Fetches a document in an asynchronous manner.
#[cfg(feature = "quickwit")] #[cfg(feature = "quickwit")]
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> { pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
let store_reader = &self.store_readers[doc_address.segment_ord as usize]; let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
store_reader.get_async(doc_address.doc_id).await store_reader.get_async(doc_address.doc_id).await
} }
/// Access the schema associated to the index of this searcher. /// Access the schema associated to the index of this searcher.
pub fn schema(&self) -> &Schema { pub fn schema(&self) -> &Schema {
&self.schema &self.inner.schema
} }
/// Returns the overall number of documents in the index. /// Returns the overall number of documents in the index.
pub fn num_docs(&self) -> u64 { pub fn num_docs(&self) -> u64 {
self.segment_readers self.inner
.segment_readers
.iter() .iter()
.map(|segment_reader| u64::from(segment_reader.num_docs())) .map(|segment_reader| u64::from(segment_reader.num_docs()))
.sum::<u64>() .sum::<u64>()
@@ -148,7 +126,7 @@ impl Searcher {
/// the given term. /// the given term.
pub fn doc_freq(&self, term: &Term) -> crate::Result<u64> { pub fn doc_freq(&self, term: &Term) -> crate::Result<u64> {
let mut total_doc_freq = 0; let mut total_doc_freq = 0;
for segment_reader in &self.segment_readers { for segment_reader in &self.inner.segment_readers {
let inverted_index = segment_reader.inverted_index(term.field())?; let inverted_index = segment_reader.inverted_index(term.field())?;
let doc_freq = inverted_index.doc_freq(term)?; let doc_freq = inverted_index.doc_freq(term)?;
total_doc_freq += u64::from(doc_freq); total_doc_freq += u64::from(doc_freq);
@@ -158,12 +136,12 @@ impl Searcher {
/// Return the list of segment readers /// Return the list of segment readers
pub fn segment_readers(&self) -> &[SegmentReader] { pub fn segment_readers(&self) -> &[SegmentReader] {
&self.segment_readers &self.inner.segment_readers
} }
/// Returns the segment_reader associated with the given segment_ord /// Returns the segment_reader associated with the given segment_ord
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader { pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
&self.segment_readers[segment_ord as usize] &self.inner.segment_readers[segment_ord as usize]
} }
/// Runs a query on the segment readers wrapped by the searcher. /// Runs a query on the segment readers wrapped by the searcher.
@@ -185,7 +163,7 @@ impl Searcher {
query: &dyn Query, query: &dyn Query,
collector: &C, collector: &C,
) -> crate::Result<C::Fruit> { ) -> crate::Result<C::Fruit> {
let executor = self.index.search_executor(); let executor = self.inner.index.search_executor();
self.search_with_executor(query, collector, executor) self.search_with_executor(query, collector, executor)
} }
@@ -222,17 +200,59 @@ impl Searcher {
/// Summarize total space usage of this searcher. /// Summarize total space usage of this searcher.
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> { pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
let mut space_usage = SearcherSpaceUsage::new(); let mut space_usage = SearcherSpaceUsage::new();
for segment_reader in &self.segment_readers { for segment_reader in self.segment_readers() {
space_usage.add_segment(segment_reader.space_usage()?); space_usage.add_segment(segment_reader.space_usage()?);
} }
Ok(space_usage) Ok(space_usage)
} }
} }
impl From<Arc<SearcherInner>> for Searcher {
fn from(inner: Arc<SearcherInner>) -> Self {
Searcher { inner }
}
}
/// Holds a list of `SegmentReader`s ready for search.
///
/// It guarantees that the `Segment` will not be removed before
/// the destruction of the `Searcher`.
pub(crate) struct SearcherInner {
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
store_readers: Vec<StoreReader>,
generation: TrackedObject<SearcherGeneration>,
}
impl SearcherInner {
/// Creates a new `Searcher`
pub(crate) fn new(
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
) -> io::Result<SearcherInner> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.collect::<io::Result<Vec<_>>>()?;
Ok(SearcherInner {
schema,
index,
segment_readers,
store_readers,
generation,
})
}
}
impl fmt::Debug for Searcher { impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let segment_ids = self let segment_ids = self
.segment_readers .segment_readers()
.iter() .iter()
.map(SegmentReader::segment_id) .map(SegmentReader::segment_id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View File

@@ -6,8 +6,6 @@ pub use self::writer::BytesFastFieldWriter;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::ops::Deref;
use crate::query::TermQuery; use crate::query::TermQuery;
use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value, FAST, INDEXED, STORED}; use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value, FAST, INDEXED, STORED};
use crate::{DocAddress, DocSet, Index, Searcher, Term}; use crate::{DocAddress, DocSet, Index, Searcher, Term};
@@ -37,9 +35,7 @@ mod tests {
Ok(()) Ok(())
} }
fn create_index_for_test<T: Into<BytesOptions>>( fn create_index_for_test<T: Into<BytesOptions>>(byte_options: T) -> crate::Result<Searcher> {
byte_options: T,
) -> crate::Result<impl Deref<Target = Searcher>> {
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
let field = schema_builder.add_bytes_field("string_bytes", byte_options.into()); let field = schema_builder.add_bytes_field("string_bytes", byte_options.into());
let schema = schema_builder.build(); let schema = schema_builder.build();
@@ -86,7 +82,7 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap(); let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref()); let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic); let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight = term_query.specialized_weight(&*searcher, true)?; let term_weight = term_query.specialized_weight(&searcher, true)?;
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?; let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
assert_eq!(term_scorer.doc(), 0u32); assert_eq!(term_scorer.doc(), 0u32);
Ok(()) Ok(())
@@ -99,7 +95,7 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap(); let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref()); let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic); let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight_err = term_query.specialized_weight(&*searcher, false); let term_weight_err = term_query.specialized_weight(&searcher, false);
assert!(matches!( assert!(matches!(
term_weight_err, term_weight_err,
Err(crate::TantivyError::SchemaError(_)) Err(crate::TantivyError::SchemaError(_))

View File

@@ -112,7 +112,7 @@ mod tests {
Term::from_field_text(text, "hello"), Term::from_field_text(text, "hello"),
IndexRecordOption::WithFreqs, IndexRecordOption::WithFreqs,
); );
let weight = query.weight(&*searcher, true)?; let weight = query.weight(&searcher, true)?;
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?; let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
assert_eq!(scorer.doc(), 0); assert_eq!(scorer.doc(), 0);
assert!((scorer.score() - 0.22920431).abs() < 0.001f32); assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
@@ -141,7 +141,7 @@ mod tests {
Term::from_field_text(text, "hello"), Term::from_field_text(text, "hello"),
IndexRecordOption::WithFreqs, IndexRecordOption::WithFreqs,
); );
let weight = query.weight(&*searcher, true)?; let weight = query.weight(&searcher, true)?;
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?; let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
assert_eq!(scorer.doc(), 0); assert_eq!(scorer.doc(), 0);
assert!((scorer.score() - 0.22920431).abs() < 0.001f32); assert!((scorer.score() - 0.22920431).abs() < 0.001f32);

View File

@@ -307,7 +307,6 @@ pub use crate::indexer::demuxer::*;
pub use crate::indexer::operation::UserOperation; pub use crate::indexer::operation::UserOperation;
pub use crate::indexer::{merge_filtered_segments, merge_indices, IndexWriter, PreparedCommit}; pub use crate::indexer::{merge_filtered_segments, merge_indices, IndexWriter, PreparedCommit};
pub use crate::postings::Postings; pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{DateOptions, DatePrecision, Document, Term}; pub use crate::schema::{DateOptions, DatePrecision, Document, Term};
/// Index format version. /// Index format version.

View File

@@ -359,7 +359,7 @@ pub mod tests {
let matching_docs = |query: &str| { let matching_docs = |query: &str| {
let query_parser = QueryParser::for_index(&index, vec![json_field]); let query_parser = QueryParser::for_index(&index, vec![json_field]);
let phrase_query = query_parser.parse_query(query).unwrap(); let phrase_query = query_parser.parse_query(query).unwrap();
let phrase_weight = phrase_query.weight(&*searcher, false).unwrap(); let phrase_weight = phrase_query.weight(&searcher, false).unwrap();
let mut phrase_scorer = phrase_weight let mut phrase_scorer = phrase_weight
.scorer(searcher.segment_reader(0), 1.0f32) .scorer(searcher.segment_reader(0), 1.0f32)
.unwrap(); .unwrap();

View File

@@ -141,7 +141,7 @@ mod tests {
let term_a = Term::from_field_text(text_field, "a"); let term_a = Term::from_field_text(text_field, "a");
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic); let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
let reader = index.reader()?; let reader = index.reader()?;
assert_eq!(term_query.count(&*reader.searcher())?, 1); assert_eq!(term_query.count(&reader.searcher())?, 1);
Ok(()) Ok(())
} }

View File

@@ -1,17 +1,14 @@
mod pool;
mod warming; mod warming;
use std::convert::TryInto; use std::convert::TryInto;
use std::io;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::{atomic, Arc, Weak}; use std::sync::{atomic, Arc, Weak};
use arc_swap::ArcSwap;
pub use warming::Warmer; pub use warming::Warmer;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use self::warming::WarmingState; use self::warming::WarmingState;
use crate::core::searcher::SearcherGeneration; use crate::core::searcher::{SearcherGeneration, SearcherInner};
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK}; use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject}; use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};
@@ -37,13 +34,12 @@ pub enum ReloadPolicy {
/// [IndexReader] builder /// [IndexReader] builder
/// ///
/// It makes it possible to configure: /// It makes it possible to configure:
/// - [Searcher] pool size
/// - [ReloadPolicy] defining when new index versions are detected /// - [ReloadPolicy] defining when new index versions are detected
/// - [Warmer] implementations /// - [Warmer] implementations
/// - number of warming threads, for parallelizing warming work /// - number of warming threads, for parallelizing warming work
/// - The cache size of the underlying doc store readers.
#[derive(Clone)] #[derive(Clone)]
pub struct IndexReaderBuilder { pub struct IndexReaderBuilder {
num_searchers: usize,
reload_policy: ReloadPolicy, reload_policy: ReloadPolicy,
index: Index, index: Index,
warmers: Vec<Weak<dyn Warmer>>, warmers: Vec<Weak<dyn Warmer>>,
@@ -55,7 +51,6 @@ impl IndexReaderBuilder {
#[must_use] #[must_use]
pub(crate) fn new(index: Index) -> IndexReaderBuilder { pub(crate) fn new(index: Index) -> IndexReaderBuilder {
IndexReaderBuilder { IndexReaderBuilder {
num_searchers: num_cpus::get(),
reload_policy: ReloadPolicy::OnCommit, reload_policy: ReloadPolicy::OnCommit,
index, index,
warmers: Vec::new(), warmers: Vec::new(),
@@ -76,16 +71,12 @@ impl IndexReaderBuilder {
self.warmers, self.warmers,
searcher_generation_inventory.clone(), searcher_generation_inventory.clone(),
)?; )?;
let inner_reader = InnerIndexReader { let inner_reader = InnerIndexReader::new(
index: self.index, self.doc_store_cache_size,
num_searchers: self.num_searchers, self.index,
doc_store_cache_size: self.doc_store_cache_size,
searcher_pool: Pool::new(),
warming_state, warming_state,
searcher_generation_counter: Default::default(),
searcher_generation_inventory, searcher_generation_inventory,
}; )?;
inner_reader.reload()?;
let inner_reader_arc = Arc::new(inner_reader); let inner_reader_arc = Arc::new(inner_reader);
let watch_handle_opt: Option<WatchHandle> = match self.reload_policy { let watch_handle_opt: Option<WatchHandle> = match self.reload_policy {
ReloadPolicy::Manual => { ReloadPolicy::Manual => {
@@ -133,15 +124,6 @@ impl IndexReaderBuilder {
self self
} }
/// Sets the number of [Searcher] to pool.
///
/// See [IndexReader::searcher()].
#[must_use]
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
self.num_searchers = num_searchers;
self
}
/// Set the [Warmer]s that are invoked when reloading searchable segments. /// Set the [Warmer]s that are invoked when reloading searchable segments.
#[must_use] #[must_use]
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder { pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
@@ -169,24 +151,52 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
} }
struct InnerIndexReader { struct InnerIndexReader {
num_searchers: usize,
doc_store_cache_size: usize, doc_store_cache_size: usize,
index: Index, index: Index,
warming_state: WarmingState, warming_state: WarmingState,
searcher_pool: Pool<Searcher>, searcher: arc_swap::ArcSwap<SearcherInner>,
searcher_generation_counter: Arc<AtomicU64>, searcher_generation_counter: Arc<AtomicU64>,
searcher_generation_inventory: Inventory<SearcherGeneration>, searcher_generation_inventory: Inventory<SearcherGeneration>,
} }
impl InnerIndexReader { impl InnerIndexReader {
fn new(
doc_store_cache_size: usize,
index: Index,
warming_state: WarmingState,
searcher_generation_inventory: Inventory<SearcherGeneration>,
) -> crate::Result<Self> {
let searcher_generation_counter: Arc<AtomicU64> = Default::default();
let segment_readers = Self::open_segment_readers(&index)?;
let searcher_generation = Self::create_new_searcher_generation(
&segment_readers,
&searcher_generation_counter,
&searcher_generation_inventory,
);
let searcher = Self::create_searcher(
&index,
doc_store_cache_size,
&warming_state,
searcher_generation,
)?;
Ok(InnerIndexReader {
doc_store_cache_size,
index,
warming_state,
searcher: ArcSwap::from(searcher),
searcher_generation_counter,
searcher_generation_inventory,
})
}
/// Opens the freshest segments `SegmentReader`. /// Opens the freshest segments `SegmentReader`.
/// ///
/// This function acquires a lot to prevent GC from removing files /// This function acquires a lot to prevent GC from removing files
/// as we are opening our index. /// as we are opening our index.
fn open_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> { fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
// Prevents segment files from getting deleted while we are in the process of opening them // Prevents segment files from getting deleted while we are in the process of opening them
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = self.index.searchable_segments()?; let searchable_segments = index.searchable_segments()?;
let segment_readers = searchable_segments let segment_readers = searchable_segments
.iter() .iter()
.map(SegmentReader::open) .map(SegmentReader::open)
@@ -195,41 +205,57 @@ impl InnerIndexReader {
} }
fn create_new_searcher_generation( fn create_new_searcher_generation(
&self,
segment_readers: &[SegmentReader], segment_readers: &[SegmentReader],
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
) -> TrackedObject<SearcherGeneration> { ) -> TrackedObject<SearcherGeneration> {
let generation_id = self let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::Relaxed);
.searcher_generation_counter
.fetch_add(1, atomic::Ordering::Relaxed);
let searcher_generation = let searcher_generation =
SearcherGeneration::from_segment_readers(segment_readers, generation_id); SearcherGeneration::from_segment_readers(segment_readers, generation_id);
self.searcher_generation_inventory searcher_generation_inventory.track(searcher_generation)
.track(searcher_generation) }
fn create_searcher(
index: &Index,
doc_store_cache_size: usize,
warming_state: &WarmingState,
searcher_generation: TrackedObject<SearcherGeneration>,
) -> crate::Result<Arc<SearcherInner>> {
let segment_readers = Self::open_segment_readers(index)?;
let schema = index.schema();
let searcher = Arc::new(SearcherInner::new(
schema,
index.clone(),
segment_readers,
searcher_generation,
doc_store_cache_size,
)?);
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
Ok(searcher)
} }
fn reload(&self) -> crate::Result<()> { fn reload(&self) -> crate::Result<()> {
let segment_readers = self.open_segment_readers()?; let segment_readers = Self::open_segment_readers(&self.index)?;
let searcher_generation = self.create_new_searcher_generation(&segment_readers); let searcher_generation = Self::create_new_searcher_generation(
let schema = self.index.schema(); &segment_readers,
let searchers: Vec<Searcher> = std::iter::repeat_with(|| { &self.searcher_generation_counter,
Searcher::new( &self.searcher_generation_inventory,
schema.clone(), );
self.index.clone(), let searcher = Self::create_searcher(
segment_readers.clone(), &self.index,
searcher_generation.clone(), self.doc_store_cache_size,
self.doc_store_cache_size, &self.warming_state,
) searcher_generation,
}) )?;
.take(self.num_searchers)
.collect::<io::Result<_>>()?; self.searcher.store(searcher);
self.warming_state
.warm_new_searcher_generation(&searchers[0])?;
self.searcher_pool.publish_new_generation(searchers);
Ok(()) Ok(())
} }
fn searcher(&self) -> LeasedItem<Searcher> { fn searcher(&self) -> Searcher {
self.searcher_pool.acquire() self.searcher.load().clone().into()
} }
} }
@@ -275,7 +301,7 @@ impl IndexReader {
/// ///
/// The same searcher must be used for a given query, as it ensures /// The same searcher must be used for a given query, as it ensures
/// the use of a consistent segment set. /// the use of a consistent segment set.
pub fn searcher(&self) -> LeasedItem<Searcher> { pub fn searcher(&self) -> Searcher {
self.inner.searcher() self.inner.searcher()
} }
} }

View File

@@ -1,295 +0,0 @@
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crossbeam_channel::{unbounded, Receiver, RecvError, Sender};
pub struct GenerationItem<T> {
generation: usize,
item: T,
}
/// Queue implementation for the Object Pool below
/// Uses the unbounded Linked-List type queue from crossbeam-channel
/// Splits the Queue into sender and receiver
struct Queue<T> {
sender: Sender<T>,
receiver: Receiver<T>,
}
impl<T> Queue<T> {
fn new() -> Self {
let (s, r) = unbounded();
Queue {
sender: s,
receiver: r,
}
}
/// Sender trait returns a Result type, which is ignored.
/// The Result is not handled at the moment
fn push(&self, elem: T) {
self.sender
.send(elem)
.expect("Sending an item to crossbeam-queue shouldn't fail");
}
/// Relies on the underlying crossbeam-channel Receiver
/// to block on empty queue
fn pop(&self) -> Result<T, RecvError> {
self.receiver.recv()
}
}
/// An object pool
///
/// This is used in tantivy to create a pool of `Searcher`.
/// Object are wrapped in a `LeasedItem` wrapper and are
/// released automatically back into the pool on `Drop`.
pub struct Pool<T> {
queue: Arc<Queue<GenerationItem<T>>>,
freshest_generation: AtomicUsize,
next_generation: AtomicUsize,
}
impl<T> Pool<T> {
pub fn new() -> Pool<T> {
let queue = Arc::new(Queue::new());
Pool {
queue,
freshest_generation: AtomicUsize::default(),
next_generation: AtomicUsize::default(),
}
}
/// Publishes a new generation of `Searcher`.
///
/// After publish, all new `Searcher` acquired will be
/// of the new generation.
pub fn publish_new_generation(&self, items: Vec<T>) {
assert!(!items.is_empty());
let next_generation = self.next_generation.fetch_add(1, Ordering::SeqCst) + 1;
let num_items = items.len();
for item in items {
let gen_item = GenerationItem {
item,
generation: next_generation,
};
self.queue.push(gen_item);
}
self.advertise_generation(next_generation);
// Purge possible previous searchers.
//
// Assuming at this point no searcher is held more than duration T by the user,
// this guarantees that an obsolete searcher will not be uselessly held (and its associated
// mmap) for more than duration T.
//
// Proof: At this point, obsolete searcher that are held by the user will be held for less
// than T. When released, they will be dropped as their generation is detected obsolete.
//
// We still need to ensure that the searcher that are obsolete and in the pool get removed.
// The queue currently contains up to 2n searchers, in any random order.
//
// Half of them are obsoletes. By requesting `(n+1)` fresh searchers, we ensure that all
// searcher will be inspected.
for _ in 0..=num_items {
let _ = self.acquire();
}
}
/// At the exit of this method,
/// - freshest_generation has a value greater or equal than generation
/// - freshest_generation has the last value that has been advertised
fn advertise_generation(&self, generation: usize) {
// not optimal at all but the easiest to read proof.
let mut former_generation = self.freshest_generation.load(Ordering::Acquire);
loop {
match self.freshest_generation.compare_exchange(
former_generation,
generation,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
// We successfuly updated the value.
return;
}
Err(current_generation) => {
// The value was updated after we did our load apparently.
// In theory, it is always a value greater than ours, but just to
// simplify the logic, we keep looping until we reach a
// value >= to our target value.
if current_generation >= generation {
return;
}
former_generation = current_generation;
}
}
}
}
fn generation(&self) -> usize {
self.freshest_generation.load(Ordering::Acquire)
}
/// Acquires a new searcher.
///
/// If no searcher is available, this methods block until
/// a searcher is released.
pub fn acquire(&self) -> LeasedItem<T> {
let generation = self.generation();
loop {
let gen_item = self.queue.pop().unwrap();
if gen_item.generation >= generation {
return LeasedItem {
gen_item: Some(gen_item),
recycle_queue: Arc::clone(&self.queue),
};
} else {
// this searcher is obsolete,
// removing it from the pool.
}
}
}
}
/// A LeasedItem holds an object borrowed from a Pool.
///
/// Upon drop, the object is automatically returned
/// into the pool.
pub struct LeasedItem<T> {
gen_item: Option<GenerationItem<T>>,
recycle_queue: Arc<Queue<GenerationItem<T>>>,
}
impl<T> Deref for LeasedItem<T> {
type Target = T;
fn deref(&self) -> &T {
&self
.gen_item
.as_ref()
.expect("Unwrapping a leased item should never fail")
.item // unwrap is safe here
}
}
impl<T> DerefMut for LeasedItem<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self
.gen_item
.as_mut()
.expect("Unwrapping a mut leased item should never fail")
.item // unwrap is safe here
}
}
impl<T> Drop for LeasedItem<T> {
fn drop(&mut self) {
if let Some(gen_item) = self.gen_item.take() {
self.recycle_queue.push(gen_item);
}
}
}
#[cfg(test)]
mod tests {
use std::{iter, mem};
use crossbeam_channel as channel;
use super::{Pool, Queue};
#[test]
fn test_pool() {
let items10: Vec<usize> = iter::repeat(10).take(10).collect();
let pool = Pool::new();
pool.publish_new_generation(items10);
for _ in 0..20 {
assert_eq!(*pool.acquire(), 10);
}
let items11: Vec<usize> = iter::repeat(11).take(10).collect();
pool.publish_new_generation(items11);
for _ in 0..20 {
assert_eq!(*pool.acquire(), 11);
}
}
#[test]
fn test_queue() {
let q = Queue::new();
let elem = 5;
q.push(elem);
let res = q.pop();
assert_eq!(res.unwrap(), elem);
}
#[test]
fn test_pool_dont_panic_on_empty_pop() {
// When the object pool is exhausted, it shouldn't panic on pop()
use std::sync::Arc;
use std::thread;
// Wrap the pool in an Arc, same way as its used in `core/index.rs`
let pool1 = Arc::new(Pool::new());
// clone pools outside the move scope of each new thread
let pool2 = Arc::clone(&pool1);
let pool3 = Arc::clone(&pool1);
let elements_for_pool = vec![1, 2];
pool1.publish_new_generation(elements_for_pool);
let mut threads = vec![];
// spawn one more thread than there are elements in the pool
let (start_1_send, start_1_recv) = channel::bounded(0);
let (start_2_send, start_2_recv) = channel::bounded(0);
let (start_3_send, start_3_recv) = channel::bounded(0);
let (event_send1, event_recv) = channel::unbounded();
let event_send2 = event_send1.clone();
let event_send3 = event_send1.clone();
threads.push(thread::spawn(move || {
assert_eq!(start_1_recv.recv(), Ok("start"));
let _leased_searcher = pool1.acquire();
assert!(event_send1.send("1 acquired").is_ok());
assert_eq!(start_1_recv.recv(), Ok("stop"));
assert!(event_send1.send("1 stopped").is_ok());
mem::drop(_leased_searcher);
}));
threads.push(thread::spawn(move || {
assert_eq!(start_2_recv.recv(), Ok("start"));
let _leased_searcher = pool2.acquire();
assert!(event_send2.send("2 acquired").is_ok());
assert_eq!(start_2_recv.recv(), Ok("stop"));
mem::drop(_leased_searcher);
assert!(event_send2.send("2 stopped").is_ok());
}));
threads.push(thread::spawn(move || {
assert_eq!(start_3_recv.recv(), Ok("start"));
let _leased_searcher = pool3.acquire();
assert!(event_send3.send("3 acquired").is_ok());
assert_eq!(start_3_recv.recv(), Ok("stop"));
mem::drop(_leased_searcher);
assert!(event_send3.send("3 stopped").is_ok());
}));
assert!(start_1_send.send("start").is_ok());
assert_eq!(event_recv.recv(), Ok("1 acquired"));
assert!(start_2_send.send("start").is_ok());
assert_eq!(event_recv.recv(), Ok("2 acquired"));
assert!(start_3_send.send("start").is_ok());
assert!(event_recv.try_recv().is_err());
assert!(start_1_send.send("stop").is_ok());
assert_eq!(event_recv.recv(), Ok("1 stopped"));
assert_eq!(event_recv.recv(), Ok("3 acquired"));
assert!(start_3_send.send("stop").is_ok());
assert_eq!(event_recv.recv(), Ok("3 stopped"));
assert!(start_2_send.send("stop").is_ok());
assert_eq!(event_recv.recv(), Ok("2 stopped"));
}
}

View File

@@ -273,7 +273,6 @@ mod tests {
.reader_builder() .reader_builder()
.reload_policy(ReloadPolicy::Manual) .reload_policy(ReloadPolicy::Manual)
.num_warming_threads(num_warming_threads) .num_warming_threads(num_warming_threads)
.num_searchers(num_searchers)
.warmers(vec![ .warmers(vec![
Arc::downgrade(&warmer1) as Weak<dyn Warmer>, Arc::downgrade(&warmer1) as Weak<dyn Warmer>,
Arc::downgrade(&warmer2) as Weak<dyn Warmer>, Arc::downgrade(&warmer2) as Weak<dyn Warmer>,