mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-31 23:50:41 +00:00
remove searcher pool and make Searcher cloneable (#1411)
* remove searcher pool and make Searcher cloneable closes #1410 * use SearcherInner in InnerIndexReader
This commit is contained in:
@@ -3,7 +3,7 @@ Tantivy 0.19
|
|||||||
- Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396)
|
- Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396)
|
||||||
The `DateTime` type has been updated to hold timestamps with microseconds precision.
|
The `DateTime` type has been updated to hold timestamps with microseconds precision.
|
||||||
`DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing).
|
`DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing).
|
||||||
|
- Remove Searcher pool and make `Searcher` cloneable.
|
||||||
|
|
||||||
Tantivy 0.18
|
Tantivy 0.18
|
||||||
================================
|
================================
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ measure_time = "0.8.2"
|
|||||||
pretty_assertions = "1.2.1"
|
pretty_assertions = "1.2.1"
|
||||||
serde_cbor = { version = "0.11.2", optional = true }
|
serde_cbor = { version = "0.11.2", optional = true }
|
||||||
async-trait = "0.1.53"
|
async-trait = "0.1.53"
|
||||||
|
arc-swap = "1.5.0"
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies]
|
[target.'cfg(windows)'.dependencies]
|
||||||
winapi = "0.3.9"
|
winapi = "0.3.9"
|
||||||
|
|||||||
@@ -145,11 +145,7 @@ fn main() -> tantivy::Result<()> {
|
|||||||
let warmers: Vec<Weak<dyn Warmer>> = vec![Arc::downgrade(
|
let warmers: Vec<Weak<dyn Warmer>> = vec![Arc::downgrade(
|
||||||
&(price_dynamic_column.clone() as Arc<dyn Warmer>),
|
&(price_dynamic_column.clone() as Arc<dyn Warmer>),
|
||||||
)];
|
)];
|
||||||
let reader: IndexReader = index
|
let reader: IndexReader = index.reader_builder().warmers(warmers).try_into()?;
|
||||||
.reader_builder()
|
|
||||||
.warmers(warmers)
|
|
||||||
.num_searchers(1)
|
|
||||||
.try_into()?;
|
|
||||||
reader.reload()?;
|
reader.reload()?;
|
||||||
|
|
||||||
let query_parser = QueryParser::for_index(&index, vec![text]);
|
let query_parser = QueryParser::for_index(&index, vec![text]);
|
||||||
|
|||||||
@@ -263,7 +263,7 @@ impl SegmentCollector for BytesFastFieldSegmentCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_test_searcher() -> crate::Result<crate::LeasedItem<Searcher>> {
|
fn make_test_searcher() -> crate::Result<Searcher> {
|
||||||
let schema = Schema::builder().build();
|
let schema = Schema::builder().build();
|
||||||
let index = Index::create_in_ram(schema);
|
let index = Index::create_in_ram(schema);
|
||||||
let mut index_writer = index.writer_for_tests()?;
|
let mut index_writer = index.writer_for_tests()?;
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::{fmt, io};
|
use std::{fmt, io};
|
||||||
|
|
||||||
use crate::collector::Collector;
|
use crate::collector::Collector;
|
||||||
@@ -62,45 +63,20 @@ impl SearcherGeneration {
|
|||||||
///
|
///
|
||||||
/// It guarantees that the `Segment` will not be removed before
|
/// It guarantees that the `Segment` will not be removed before
|
||||||
/// the destruction of the `Searcher`.
|
/// the destruction of the `Searcher`.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Searcher {
|
pub struct Searcher {
|
||||||
schema: Schema,
|
inner: Arc<SearcherInner>,
|
||||||
index: Index,
|
|
||||||
segment_readers: Vec<SegmentReader>,
|
|
||||||
store_readers: Vec<StoreReader>,
|
|
||||||
generation: TrackedObject<SearcherGeneration>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Searcher {
|
impl Searcher {
|
||||||
/// Creates a new `Searcher`
|
|
||||||
pub(crate) fn new(
|
|
||||||
schema: Schema,
|
|
||||||
index: Index,
|
|
||||||
segment_readers: Vec<SegmentReader>,
|
|
||||||
generation: TrackedObject<SearcherGeneration>,
|
|
||||||
doc_store_cache_size: usize,
|
|
||||||
) -> io::Result<Searcher> {
|
|
||||||
let store_readers: Vec<StoreReader> = segment_readers
|
|
||||||
.iter()
|
|
||||||
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
|
|
||||||
.collect::<io::Result<Vec<_>>>()?;
|
|
||||||
|
|
||||||
Ok(Searcher {
|
|
||||||
schema,
|
|
||||||
index,
|
|
||||||
segment_readers,
|
|
||||||
store_readers,
|
|
||||||
generation,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the `Index` associated to the `Searcher`
|
/// Returns the `Index` associated to the `Searcher`
|
||||||
pub fn index(&self) -> &Index {
|
pub fn index(&self) -> &Index {
|
||||||
&self.index
|
&self.inner.index
|
||||||
}
|
}
|
||||||
|
|
||||||
/// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`.
|
/// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`.
|
||||||
pub fn generation(&self) -> &SearcherGeneration {
|
pub fn generation(&self) -> &SearcherGeneration {
|
||||||
self.generation.as_ref()
|
self.inner.generation.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches a document from tantivy's store given a `DocAddress`.
|
/// Fetches a document from tantivy's store given a `DocAddress`.
|
||||||
@@ -108,7 +84,7 @@ impl Searcher {
|
|||||||
/// The searcher uses the segment ordinal to route the
|
/// The searcher uses the segment ordinal to route the
|
||||||
/// the request to the right `Segment`.
|
/// the request to the right `Segment`.
|
||||||
pub fn doc(&self, doc_address: DocAddress) -> crate::Result<Document> {
|
pub fn doc(&self, doc_address: DocAddress) -> crate::Result<Document> {
|
||||||
let store_reader = &self.store_readers[doc_address.segment_ord as usize];
|
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
||||||
store_reader.get(doc_address.doc_id)
|
store_reader.get(doc_address.doc_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,6 +93,7 @@ impl Searcher {
|
|||||||
/// Aggregates the sum for each segment store reader.
|
/// Aggregates the sum for each segment store reader.
|
||||||
pub fn doc_store_cache_stats(&self) -> CacheStats {
|
pub fn doc_store_cache_stats(&self) -> CacheStats {
|
||||||
let cache_stats: CacheStats = self
|
let cache_stats: CacheStats = self
|
||||||
|
.inner
|
||||||
.store_readers
|
.store_readers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|reader| reader.cache_stats())
|
.map(|reader| reader.cache_stats())
|
||||||
@@ -127,18 +104,19 @@ impl Searcher {
|
|||||||
/// Fetches a document in an asynchronous manner.
|
/// Fetches a document in an asynchronous manner.
|
||||||
#[cfg(feature = "quickwit")]
|
#[cfg(feature = "quickwit")]
|
||||||
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
|
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
|
||||||
let store_reader = &self.store_readers[doc_address.segment_ord as usize];
|
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
||||||
store_reader.get_async(doc_address.doc_id).await
|
store_reader.get_async(doc_address.doc_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Access the schema associated to the index of this searcher.
|
/// Access the schema associated to the index of this searcher.
|
||||||
pub fn schema(&self) -> &Schema {
|
pub fn schema(&self) -> &Schema {
|
||||||
&self.schema
|
&self.inner.schema
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the overall number of documents in the index.
|
/// Returns the overall number of documents in the index.
|
||||||
pub fn num_docs(&self) -> u64 {
|
pub fn num_docs(&self) -> u64 {
|
||||||
self.segment_readers
|
self.inner
|
||||||
|
.segment_readers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|segment_reader| u64::from(segment_reader.num_docs()))
|
.map(|segment_reader| u64::from(segment_reader.num_docs()))
|
||||||
.sum::<u64>()
|
.sum::<u64>()
|
||||||
@@ -148,7 +126,7 @@ impl Searcher {
|
|||||||
/// the given term.
|
/// the given term.
|
||||||
pub fn doc_freq(&self, term: &Term) -> crate::Result<u64> {
|
pub fn doc_freq(&self, term: &Term) -> crate::Result<u64> {
|
||||||
let mut total_doc_freq = 0;
|
let mut total_doc_freq = 0;
|
||||||
for segment_reader in &self.segment_readers {
|
for segment_reader in &self.inner.segment_readers {
|
||||||
let inverted_index = segment_reader.inverted_index(term.field())?;
|
let inverted_index = segment_reader.inverted_index(term.field())?;
|
||||||
let doc_freq = inverted_index.doc_freq(term)?;
|
let doc_freq = inverted_index.doc_freq(term)?;
|
||||||
total_doc_freq += u64::from(doc_freq);
|
total_doc_freq += u64::from(doc_freq);
|
||||||
@@ -158,12 +136,12 @@ impl Searcher {
|
|||||||
|
|
||||||
/// Return the list of segment readers
|
/// Return the list of segment readers
|
||||||
pub fn segment_readers(&self) -> &[SegmentReader] {
|
pub fn segment_readers(&self) -> &[SegmentReader] {
|
||||||
&self.segment_readers
|
&self.inner.segment_readers
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the segment_reader associated with the given segment_ord
|
/// Returns the segment_reader associated with the given segment_ord
|
||||||
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
|
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
|
||||||
&self.segment_readers[segment_ord as usize]
|
&self.inner.segment_readers[segment_ord as usize]
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runs a query on the segment readers wrapped by the searcher.
|
/// Runs a query on the segment readers wrapped by the searcher.
|
||||||
@@ -185,7 +163,7 @@ impl Searcher {
|
|||||||
query: &dyn Query,
|
query: &dyn Query,
|
||||||
collector: &C,
|
collector: &C,
|
||||||
) -> crate::Result<C::Fruit> {
|
) -> crate::Result<C::Fruit> {
|
||||||
let executor = self.index.search_executor();
|
let executor = self.inner.index.search_executor();
|
||||||
self.search_with_executor(query, collector, executor)
|
self.search_with_executor(query, collector, executor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -222,17 +200,59 @@ impl Searcher {
|
|||||||
/// Summarize total space usage of this searcher.
|
/// Summarize total space usage of this searcher.
|
||||||
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
|
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
|
||||||
let mut space_usage = SearcherSpaceUsage::new();
|
let mut space_usage = SearcherSpaceUsage::new();
|
||||||
for segment_reader in &self.segment_readers {
|
for segment_reader in self.segment_readers() {
|
||||||
space_usage.add_segment(segment_reader.space_usage()?);
|
space_usage.add_segment(segment_reader.space_usage()?);
|
||||||
}
|
}
|
||||||
Ok(space_usage)
|
Ok(space_usage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<Arc<SearcherInner>> for Searcher {
|
||||||
|
fn from(inner: Arc<SearcherInner>) -> Self {
|
||||||
|
Searcher { inner }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Holds a list of `SegmentReader`s ready for search.
|
||||||
|
///
|
||||||
|
/// It guarantees that the `Segment` will not be removed before
|
||||||
|
/// the destruction of the `Searcher`.
|
||||||
|
pub(crate) struct SearcherInner {
|
||||||
|
schema: Schema,
|
||||||
|
index: Index,
|
||||||
|
segment_readers: Vec<SegmentReader>,
|
||||||
|
store_readers: Vec<StoreReader>,
|
||||||
|
generation: TrackedObject<SearcherGeneration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SearcherInner {
|
||||||
|
/// Creates a new `Searcher`
|
||||||
|
pub(crate) fn new(
|
||||||
|
schema: Schema,
|
||||||
|
index: Index,
|
||||||
|
segment_readers: Vec<SegmentReader>,
|
||||||
|
generation: TrackedObject<SearcherGeneration>,
|
||||||
|
doc_store_cache_size: usize,
|
||||||
|
) -> io::Result<SearcherInner> {
|
||||||
|
let store_readers: Vec<StoreReader> = segment_readers
|
||||||
|
.iter()
|
||||||
|
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
|
||||||
|
.collect::<io::Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
Ok(SearcherInner {
|
||||||
|
schema,
|
||||||
|
index,
|
||||||
|
segment_readers,
|
||||||
|
store_readers,
|
||||||
|
generation,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Searcher {
|
impl fmt::Debug for Searcher {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
let segment_ids = self
|
let segment_ids = self
|
||||||
.segment_readers
|
.segment_readers()
|
||||||
.iter()
|
.iter()
|
||||||
.map(SegmentReader::segment_id)
|
.map(SegmentReader::segment_id)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|||||||
@@ -6,8 +6,6 @@ pub use self::writer::BytesFastFieldWriter;
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::ops::Deref;
|
|
||||||
|
|
||||||
use crate::query::TermQuery;
|
use crate::query::TermQuery;
|
||||||
use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value, FAST, INDEXED, STORED};
|
use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value, FAST, INDEXED, STORED};
|
||||||
use crate::{DocAddress, DocSet, Index, Searcher, Term};
|
use crate::{DocAddress, DocSet, Index, Searcher, Term};
|
||||||
@@ -37,9 +35,7 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_index_for_test<T: Into<BytesOptions>>(
|
fn create_index_for_test<T: Into<BytesOptions>>(byte_options: T) -> crate::Result<Searcher> {
|
||||||
byte_options: T,
|
|
||||||
) -> crate::Result<impl Deref<Target = Searcher>> {
|
|
||||||
let mut schema_builder = Schema::builder();
|
let mut schema_builder = Schema::builder();
|
||||||
let field = schema_builder.add_bytes_field("string_bytes", byte_options.into());
|
let field = schema_builder.add_bytes_field("string_bytes", byte_options.into());
|
||||||
let schema = schema_builder.build();
|
let schema = schema_builder.build();
|
||||||
@@ -86,7 +82,7 @@ mod tests {
|
|||||||
let field = searcher.schema().get_field("string_bytes").unwrap();
|
let field = searcher.schema().get_field("string_bytes").unwrap();
|
||||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||||
let term_weight = term_query.specialized_weight(&*searcher, true)?;
|
let term_weight = term_query.specialized_weight(&searcher, true)?;
|
||||||
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
|
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
|
||||||
assert_eq!(term_scorer.doc(), 0u32);
|
assert_eq!(term_scorer.doc(), 0u32);
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -99,7 +95,7 @@ mod tests {
|
|||||||
let field = searcher.schema().get_field("string_bytes").unwrap();
|
let field = searcher.schema().get_field("string_bytes").unwrap();
|
||||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||||
let term_weight_err = term_query.specialized_weight(&*searcher, false);
|
let term_weight_err = term_query.specialized_weight(&searcher, false);
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
term_weight_err,
|
term_weight_err,
|
||||||
Err(crate::TantivyError::SchemaError(_))
|
Err(crate::TantivyError::SchemaError(_))
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ mod tests {
|
|||||||
Term::from_field_text(text, "hello"),
|
Term::from_field_text(text, "hello"),
|
||||||
IndexRecordOption::WithFreqs,
|
IndexRecordOption::WithFreqs,
|
||||||
);
|
);
|
||||||
let weight = query.weight(&*searcher, true)?;
|
let weight = query.weight(&searcher, true)?;
|
||||||
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
|
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
|
||||||
assert_eq!(scorer.doc(), 0);
|
assert_eq!(scorer.doc(), 0);
|
||||||
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
|
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
|
||||||
@@ -141,7 +141,7 @@ mod tests {
|
|||||||
Term::from_field_text(text, "hello"),
|
Term::from_field_text(text, "hello"),
|
||||||
IndexRecordOption::WithFreqs,
|
IndexRecordOption::WithFreqs,
|
||||||
);
|
);
|
||||||
let weight = query.weight(&*searcher, true)?;
|
let weight = query.weight(&searcher, true)?;
|
||||||
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
|
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
|
||||||
assert_eq!(scorer.doc(), 0);
|
assert_eq!(scorer.doc(), 0);
|
||||||
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
|
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
|
||||||
|
|||||||
@@ -307,7 +307,6 @@ pub use crate::indexer::demuxer::*;
|
|||||||
pub use crate::indexer::operation::UserOperation;
|
pub use crate::indexer::operation::UserOperation;
|
||||||
pub use crate::indexer::{merge_filtered_segments, merge_indices, IndexWriter, PreparedCommit};
|
pub use crate::indexer::{merge_filtered_segments, merge_indices, IndexWriter, PreparedCommit};
|
||||||
pub use crate::postings::Postings;
|
pub use crate::postings::Postings;
|
||||||
pub use crate::reader::LeasedItem;
|
|
||||||
pub use crate::schema::{DateOptions, DatePrecision, Document, Term};
|
pub use crate::schema::{DateOptions, DatePrecision, Document, Term};
|
||||||
|
|
||||||
/// Index format version.
|
/// Index format version.
|
||||||
|
|||||||
@@ -359,7 +359,7 @@ pub mod tests {
|
|||||||
let matching_docs = |query: &str| {
|
let matching_docs = |query: &str| {
|
||||||
let query_parser = QueryParser::for_index(&index, vec![json_field]);
|
let query_parser = QueryParser::for_index(&index, vec![json_field]);
|
||||||
let phrase_query = query_parser.parse_query(query).unwrap();
|
let phrase_query = query_parser.parse_query(query).unwrap();
|
||||||
let phrase_weight = phrase_query.weight(&*searcher, false).unwrap();
|
let phrase_weight = phrase_query.weight(&searcher, false).unwrap();
|
||||||
let mut phrase_scorer = phrase_weight
|
let mut phrase_scorer = phrase_weight
|
||||||
.scorer(searcher.segment_reader(0), 1.0f32)
|
.scorer(searcher.segment_reader(0), 1.0f32)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ mod tests {
|
|||||||
let term_a = Term::from_field_text(text_field, "a");
|
let term_a = Term::from_field_text(text_field, "a");
|
||||||
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
|
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
|
||||||
let reader = index.reader()?;
|
let reader = index.reader()?;
|
||||||
assert_eq!(term_query.count(&*reader.searcher())?, 1);
|
assert_eq!(term_query.count(&reader.searcher())?, 1);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,17 +1,14 @@
|
|||||||
mod pool;
|
|
||||||
mod warming;
|
mod warming;
|
||||||
|
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::io;
|
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::sync::{atomic, Arc, Weak};
|
use std::sync::{atomic, Arc, Weak};
|
||||||
|
|
||||||
|
use arc_swap::ArcSwap;
|
||||||
pub use warming::Warmer;
|
pub use warming::Warmer;
|
||||||
|
|
||||||
pub use self::pool::LeasedItem;
|
|
||||||
use self::pool::Pool;
|
|
||||||
use self::warming::WarmingState;
|
use self::warming::WarmingState;
|
||||||
use crate::core::searcher::SearcherGeneration;
|
use crate::core::searcher::{SearcherGeneration, SearcherInner};
|
||||||
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
|
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
|
||||||
use crate::store::DOCSTORE_CACHE_CAPACITY;
|
use crate::store::DOCSTORE_CACHE_CAPACITY;
|
||||||
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};
|
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};
|
||||||
@@ -37,13 +34,12 @@ pub enum ReloadPolicy {
|
|||||||
/// [IndexReader] builder
|
/// [IndexReader] builder
|
||||||
///
|
///
|
||||||
/// It makes it possible to configure:
|
/// It makes it possible to configure:
|
||||||
/// - [Searcher] pool size
|
|
||||||
/// - [ReloadPolicy] defining when new index versions are detected
|
/// - [ReloadPolicy] defining when new index versions are detected
|
||||||
/// - [Warmer] implementations
|
/// - [Warmer] implementations
|
||||||
/// - number of warming threads, for parallelizing warming work
|
/// - number of warming threads, for parallelizing warming work
|
||||||
|
/// - The cache size of the underlying doc store readers.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct IndexReaderBuilder {
|
pub struct IndexReaderBuilder {
|
||||||
num_searchers: usize,
|
|
||||||
reload_policy: ReloadPolicy,
|
reload_policy: ReloadPolicy,
|
||||||
index: Index,
|
index: Index,
|
||||||
warmers: Vec<Weak<dyn Warmer>>,
|
warmers: Vec<Weak<dyn Warmer>>,
|
||||||
@@ -55,7 +51,6 @@ impl IndexReaderBuilder {
|
|||||||
#[must_use]
|
#[must_use]
|
||||||
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
|
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
|
||||||
IndexReaderBuilder {
|
IndexReaderBuilder {
|
||||||
num_searchers: num_cpus::get(),
|
|
||||||
reload_policy: ReloadPolicy::OnCommit,
|
reload_policy: ReloadPolicy::OnCommit,
|
||||||
index,
|
index,
|
||||||
warmers: Vec::new(),
|
warmers: Vec::new(),
|
||||||
@@ -76,16 +71,12 @@ impl IndexReaderBuilder {
|
|||||||
self.warmers,
|
self.warmers,
|
||||||
searcher_generation_inventory.clone(),
|
searcher_generation_inventory.clone(),
|
||||||
)?;
|
)?;
|
||||||
let inner_reader = InnerIndexReader {
|
let inner_reader = InnerIndexReader::new(
|
||||||
index: self.index,
|
self.doc_store_cache_size,
|
||||||
num_searchers: self.num_searchers,
|
self.index,
|
||||||
doc_store_cache_size: self.doc_store_cache_size,
|
|
||||||
searcher_pool: Pool::new(),
|
|
||||||
warming_state,
|
warming_state,
|
||||||
searcher_generation_counter: Default::default(),
|
|
||||||
searcher_generation_inventory,
|
searcher_generation_inventory,
|
||||||
};
|
)?;
|
||||||
inner_reader.reload()?;
|
|
||||||
let inner_reader_arc = Arc::new(inner_reader);
|
let inner_reader_arc = Arc::new(inner_reader);
|
||||||
let watch_handle_opt: Option<WatchHandle> = match self.reload_policy {
|
let watch_handle_opt: Option<WatchHandle> = match self.reload_policy {
|
||||||
ReloadPolicy::Manual => {
|
ReloadPolicy::Manual => {
|
||||||
@@ -133,15 +124,6 @@ impl IndexReaderBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the number of [Searcher] to pool.
|
|
||||||
///
|
|
||||||
/// See [IndexReader::searcher()].
|
|
||||||
#[must_use]
|
|
||||||
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
|
|
||||||
self.num_searchers = num_searchers;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the [Warmer]s that are invoked when reloading searchable segments.
|
/// Set the [Warmer]s that are invoked when reloading searchable segments.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
|
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
|
||||||
@@ -169,24 +151,52 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct InnerIndexReader {
|
struct InnerIndexReader {
|
||||||
num_searchers: usize,
|
|
||||||
doc_store_cache_size: usize,
|
doc_store_cache_size: usize,
|
||||||
index: Index,
|
index: Index,
|
||||||
warming_state: WarmingState,
|
warming_state: WarmingState,
|
||||||
searcher_pool: Pool<Searcher>,
|
searcher: arc_swap::ArcSwap<SearcherInner>,
|
||||||
searcher_generation_counter: Arc<AtomicU64>,
|
searcher_generation_counter: Arc<AtomicU64>,
|
||||||
searcher_generation_inventory: Inventory<SearcherGeneration>,
|
searcher_generation_inventory: Inventory<SearcherGeneration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InnerIndexReader {
|
impl InnerIndexReader {
|
||||||
|
fn new(
|
||||||
|
doc_store_cache_size: usize,
|
||||||
|
index: Index,
|
||||||
|
warming_state: WarmingState,
|
||||||
|
searcher_generation_inventory: Inventory<SearcherGeneration>,
|
||||||
|
) -> crate::Result<Self> {
|
||||||
|
let searcher_generation_counter: Arc<AtomicU64> = Default::default();
|
||||||
|
let segment_readers = Self::open_segment_readers(&index)?;
|
||||||
|
let searcher_generation = Self::create_new_searcher_generation(
|
||||||
|
&segment_readers,
|
||||||
|
&searcher_generation_counter,
|
||||||
|
&searcher_generation_inventory,
|
||||||
|
);
|
||||||
|
|
||||||
|
let searcher = Self::create_searcher(
|
||||||
|
&index,
|
||||||
|
doc_store_cache_size,
|
||||||
|
&warming_state,
|
||||||
|
searcher_generation,
|
||||||
|
)?;
|
||||||
|
Ok(InnerIndexReader {
|
||||||
|
doc_store_cache_size,
|
||||||
|
index,
|
||||||
|
warming_state,
|
||||||
|
searcher: ArcSwap::from(searcher),
|
||||||
|
searcher_generation_counter,
|
||||||
|
searcher_generation_inventory,
|
||||||
|
})
|
||||||
|
}
|
||||||
/// Opens the freshest segments `SegmentReader`.
|
/// Opens the freshest segments `SegmentReader`.
|
||||||
///
|
///
|
||||||
/// This function acquires a lot to prevent GC from removing files
|
/// This function acquires a lot to prevent GC from removing files
|
||||||
/// as we are opening our index.
|
/// as we are opening our index.
|
||||||
fn open_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
|
fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
|
||||||
// Prevents segment files from getting deleted while we are in the process of opening them
|
// Prevents segment files from getting deleted while we are in the process of opening them
|
||||||
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
|
let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
|
||||||
let searchable_segments = self.index.searchable_segments()?;
|
let searchable_segments = index.searchable_segments()?;
|
||||||
let segment_readers = searchable_segments
|
let segment_readers = searchable_segments
|
||||||
.iter()
|
.iter()
|
||||||
.map(SegmentReader::open)
|
.map(SegmentReader::open)
|
||||||
@@ -195,41 +205,57 @@ impl InnerIndexReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn create_new_searcher_generation(
|
fn create_new_searcher_generation(
|
||||||
&self,
|
|
||||||
segment_readers: &[SegmentReader],
|
segment_readers: &[SegmentReader],
|
||||||
|
searcher_generation_counter: &Arc<AtomicU64>,
|
||||||
|
searcher_generation_inventory: &Inventory<SearcherGeneration>,
|
||||||
) -> TrackedObject<SearcherGeneration> {
|
) -> TrackedObject<SearcherGeneration> {
|
||||||
let generation_id = self
|
let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::Relaxed);
|
||||||
.searcher_generation_counter
|
|
||||||
.fetch_add(1, atomic::Ordering::Relaxed);
|
|
||||||
let searcher_generation =
|
let searcher_generation =
|
||||||
SearcherGeneration::from_segment_readers(segment_readers, generation_id);
|
SearcherGeneration::from_segment_readers(segment_readers, generation_id);
|
||||||
self.searcher_generation_inventory
|
searcher_generation_inventory.track(searcher_generation)
|
||||||
.track(searcher_generation)
|
}
|
||||||
|
|
||||||
|
fn create_searcher(
|
||||||
|
index: &Index,
|
||||||
|
doc_store_cache_size: usize,
|
||||||
|
warming_state: &WarmingState,
|
||||||
|
searcher_generation: TrackedObject<SearcherGeneration>,
|
||||||
|
) -> crate::Result<Arc<SearcherInner>> {
|
||||||
|
let segment_readers = Self::open_segment_readers(index)?;
|
||||||
|
let schema = index.schema();
|
||||||
|
let searcher = Arc::new(SearcherInner::new(
|
||||||
|
schema,
|
||||||
|
index.clone(),
|
||||||
|
segment_readers,
|
||||||
|
searcher_generation,
|
||||||
|
doc_store_cache_size,
|
||||||
|
)?);
|
||||||
|
|
||||||
|
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
|
||||||
|
Ok(searcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reload(&self) -> crate::Result<()> {
|
fn reload(&self) -> crate::Result<()> {
|
||||||
let segment_readers = self.open_segment_readers()?;
|
let segment_readers = Self::open_segment_readers(&self.index)?;
|
||||||
let searcher_generation = self.create_new_searcher_generation(&segment_readers);
|
let searcher_generation = Self::create_new_searcher_generation(
|
||||||
let schema = self.index.schema();
|
&segment_readers,
|
||||||
let searchers: Vec<Searcher> = std::iter::repeat_with(|| {
|
&self.searcher_generation_counter,
|
||||||
Searcher::new(
|
&self.searcher_generation_inventory,
|
||||||
schema.clone(),
|
);
|
||||||
self.index.clone(),
|
let searcher = Self::create_searcher(
|
||||||
segment_readers.clone(),
|
&self.index,
|
||||||
searcher_generation.clone(),
|
self.doc_store_cache_size,
|
||||||
self.doc_store_cache_size,
|
&self.warming_state,
|
||||||
)
|
searcher_generation,
|
||||||
})
|
)?;
|
||||||
.take(self.num_searchers)
|
|
||||||
.collect::<io::Result<_>>()?;
|
self.searcher.store(searcher);
|
||||||
self.warming_state
|
|
||||||
.warm_new_searcher_generation(&searchers[0])?;
|
|
||||||
self.searcher_pool.publish_new_generation(searchers);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn searcher(&self) -> LeasedItem<Searcher> {
|
fn searcher(&self) -> Searcher {
|
||||||
self.searcher_pool.acquire()
|
self.searcher.load().clone().into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,7 +301,7 @@ impl IndexReader {
|
|||||||
///
|
///
|
||||||
/// The same searcher must be used for a given query, as it ensures
|
/// The same searcher must be used for a given query, as it ensures
|
||||||
/// the use of a consistent segment set.
|
/// the use of a consistent segment set.
|
||||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
pub fn searcher(&self) -> Searcher {
|
||||||
self.inner.searcher()
|
self.inner.searcher()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,295 +0,0 @@
|
|||||||
use std::ops::{Deref, DerefMut};
|
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use crossbeam_channel::{unbounded, Receiver, RecvError, Sender};
|
|
||||||
|
|
||||||
pub struct GenerationItem<T> {
|
|
||||||
generation: usize,
|
|
||||||
item: T,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Queue implementation for the Object Pool below
|
|
||||||
/// Uses the unbounded Linked-List type queue from crossbeam-channel
|
|
||||||
/// Splits the Queue into sender and receiver
|
|
||||||
struct Queue<T> {
|
|
||||||
sender: Sender<T>,
|
|
||||||
receiver: Receiver<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Queue<T> {
|
|
||||||
fn new() -> Self {
|
|
||||||
let (s, r) = unbounded();
|
|
||||||
Queue {
|
|
||||||
sender: s,
|
|
||||||
receiver: r,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sender trait returns a Result type, which is ignored.
|
|
||||||
/// The Result is not handled at the moment
|
|
||||||
fn push(&self, elem: T) {
|
|
||||||
self.sender
|
|
||||||
.send(elem)
|
|
||||||
.expect("Sending an item to crossbeam-queue shouldn't fail");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Relies on the underlying crossbeam-channel Receiver
|
|
||||||
/// to block on empty queue
|
|
||||||
fn pop(&self) -> Result<T, RecvError> {
|
|
||||||
self.receiver.recv()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An object pool
|
|
||||||
///
|
|
||||||
/// This is used in tantivy to create a pool of `Searcher`.
|
|
||||||
/// Object are wrapped in a `LeasedItem` wrapper and are
|
|
||||||
/// released automatically back into the pool on `Drop`.
|
|
||||||
pub struct Pool<T> {
|
|
||||||
queue: Arc<Queue<GenerationItem<T>>>,
|
|
||||||
freshest_generation: AtomicUsize,
|
|
||||||
next_generation: AtomicUsize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Pool<T> {
|
|
||||||
pub fn new() -> Pool<T> {
|
|
||||||
let queue = Arc::new(Queue::new());
|
|
||||||
Pool {
|
|
||||||
queue,
|
|
||||||
freshest_generation: AtomicUsize::default(),
|
|
||||||
next_generation: AtomicUsize::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Publishes a new generation of `Searcher`.
|
|
||||||
///
|
|
||||||
/// After publish, all new `Searcher` acquired will be
|
|
||||||
/// of the new generation.
|
|
||||||
pub fn publish_new_generation(&self, items: Vec<T>) {
|
|
||||||
assert!(!items.is_empty());
|
|
||||||
let next_generation = self.next_generation.fetch_add(1, Ordering::SeqCst) + 1;
|
|
||||||
let num_items = items.len();
|
|
||||||
for item in items {
|
|
||||||
let gen_item = GenerationItem {
|
|
||||||
item,
|
|
||||||
generation: next_generation,
|
|
||||||
};
|
|
||||||
self.queue.push(gen_item);
|
|
||||||
}
|
|
||||||
self.advertise_generation(next_generation);
|
|
||||||
// Purge possible previous searchers.
|
|
||||||
//
|
|
||||||
// Assuming at this point no searcher is held more than duration T by the user,
|
|
||||||
// this guarantees that an obsolete searcher will not be uselessly held (and its associated
|
|
||||||
// mmap) for more than duration T.
|
|
||||||
//
|
|
||||||
// Proof: At this point, obsolete searcher that are held by the user will be held for less
|
|
||||||
// than T. When released, they will be dropped as their generation is detected obsolete.
|
|
||||||
//
|
|
||||||
// We still need to ensure that the searcher that are obsolete and in the pool get removed.
|
|
||||||
// The queue currently contains up to 2n searchers, in any random order.
|
|
||||||
//
|
|
||||||
// Half of them are obsoletes. By requesting `(n+1)` fresh searchers, we ensure that all
|
|
||||||
// searcher will be inspected.
|
|
||||||
for _ in 0..=num_items {
|
|
||||||
let _ = self.acquire();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// At the exit of this method,
|
|
||||||
/// - freshest_generation has a value greater or equal than generation
|
|
||||||
/// - freshest_generation has the last value that has been advertised
|
|
||||||
fn advertise_generation(&self, generation: usize) {
|
|
||||||
// not optimal at all but the easiest to read proof.
|
|
||||||
let mut former_generation = self.freshest_generation.load(Ordering::Acquire);
|
|
||||||
loop {
|
|
||||||
match self.freshest_generation.compare_exchange(
|
|
||||||
former_generation,
|
|
||||||
generation,
|
|
||||||
Ordering::SeqCst,
|
|
||||||
Ordering::SeqCst,
|
|
||||||
) {
|
|
||||||
Ok(_) => {
|
|
||||||
// We successfuly updated the value.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(current_generation) => {
|
|
||||||
// The value was updated after we did our load apparently.
|
|
||||||
// In theory, it is always a value greater than ours, but just to
|
|
||||||
// simplify the logic, we keep looping until we reach a
|
|
||||||
// value >= to our target value.
|
|
||||||
if current_generation >= generation {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
former_generation = current_generation;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn generation(&self) -> usize {
|
|
||||||
self.freshest_generation.load(Ordering::Acquire)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Acquires a new searcher.
|
|
||||||
///
|
|
||||||
/// If no searcher is available, this methods block until
|
|
||||||
/// a searcher is released.
|
|
||||||
pub fn acquire(&self) -> LeasedItem<T> {
|
|
||||||
let generation = self.generation();
|
|
||||||
loop {
|
|
||||||
let gen_item = self.queue.pop().unwrap();
|
|
||||||
if gen_item.generation >= generation {
|
|
||||||
return LeasedItem {
|
|
||||||
gen_item: Some(gen_item),
|
|
||||||
recycle_queue: Arc::clone(&self.queue),
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
// this searcher is obsolete,
|
|
||||||
// removing it from the pool.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A LeasedItem holds an object borrowed from a Pool.
|
|
||||||
///
|
|
||||||
/// Upon drop, the object is automatically returned
|
|
||||||
/// into the pool.
|
|
||||||
pub struct LeasedItem<T> {
|
|
||||||
gen_item: Option<GenerationItem<T>>,
|
|
||||||
recycle_queue: Arc<Queue<GenerationItem<T>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Deref for LeasedItem<T> {
|
|
||||||
type Target = T;
|
|
||||||
|
|
||||||
fn deref(&self) -> &T {
|
|
||||||
&self
|
|
||||||
.gen_item
|
|
||||||
.as_ref()
|
|
||||||
.expect("Unwrapping a leased item should never fail")
|
|
||||||
.item // unwrap is safe here
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> DerefMut for LeasedItem<T> {
|
|
||||||
fn deref_mut(&mut self) -> &mut T {
|
|
||||||
&mut self
|
|
||||||
.gen_item
|
|
||||||
.as_mut()
|
|
||||||
.expect("Unwrapping a mut leased item should never fail")
|
|
||||||
.item // unwrap is safe here
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Drop for LeasedItem<T> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Some(gen_item) = self.gen_item.take() {
|
|
||||||
self.recycle_queue.push(gen_item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
|
|
||||||
use std::{iter, mem};
|
|
||||||
|
|
||||||
use crossbeam_channel as channel;
|
|
||||||
|
|
||||||
use super::{Pool, Queue};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_pool() {
|
|
||||||
let items10: Vec<usize> = iter::repeat(10).take(10).collect();
|
|
||||||
let pool = Pool::new();
|
|
||||||
pool.publish_new_generation(items10);
|
|
||||||
for _ in 0..20 {
|
|
||||||
assert_eq!(*pool.acquire(), 10);
|
|
||||||
}
|
|
||||||
let items11: Vec<usize> = iter::repeat(11).take(10).collect();
|
|
||||||
pool.publish_new_generation(items11);
|
|
||||||
for _ in 0..20 {
|
|
||||||
assert_eq!(*pool.acquire(), 11);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_queue() {
|
|
||||||
let q = Queue::new();
|
|
||||||
let elem = 5;
|
|
||||||
q.push(elem);
|
|
||||||
let res = q.pop();
|
|
||||||
assert_eq!(res.unwrap(), elem);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_pool_dont_panic_on_empty_pop() {
|
|
||||||
// When the object pool is exhausted, it shouldn't panic on pop()
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
// Wrap the pool in an Arc, same way as its used in `core/index.rs`
|
|
||||||
let pool1 = Arc::new(Pool::new());
|
|
||||||
// clone pools outside the move scope of each new thread
|
|
||||||
let pool2 = Arc::clone(&pool1);
|
|
||||||
let pool3 = Arc::clone(&pool1);
|
|
||||||
|
|
||||||
let elements_for_pool = vec![1, 2];
|
|
||||||
pool1.publish_new_generation(elements_for_pool);
|
|
||||||
|
|
||||||
let mut threads = vec![];
|
|
||||||
// spawn one more thread than there are elements in the pool
|
|
||||||
|
|
||||||
let (start_1_send, start_1_recv) = channel::bounded(0);
|
|
||||||
let (start_2_send, start_2_recv) = channel::bounded(0);
|
|
||||||
let (start_3_send, start_3_recv) = channel::bounded(0);
|
|
||||||
|
|
||||||
let (event_send1, event_recv) = channel::unbounded();
|
|
||||||
let event_send2 = event_send1.clone();
|
|
||||||
let event_send3 = event_send1.clone();
|
|
||||||
|
|
||||||
threads.push(thread::spawn(move || {
|
|
||||||
assert_eq!(start_1_recv.recv(), Ok("start"));
|
|
||||||
let _leased_searcher = pool1.acquire();
|
|
||||||
assert!(event_send1.send("1 acquired").is_ok());
|
|
||||||
assert_eq!(start_1_recv.recv(), Ok("stop"));
|
|
||||||
assert!(event_send1.send("1 stopped").is_ok());
|
|
||||||
mem::drop(_leased_searcher);
|
|
||||||
}));
|
|
||||||
|
|
||||||
threads.push(thread::spawn(move || {
|
|
||||||
assert_eq!(start_2_recv.recv(), Ok("start"));
|
|
||||||
let _leased_searcher = pool2.acquire();
|
|
||||||
assert!(event_send2.send("2 acquired").is_ok());
|
|
||||||
assert_eq!(start_2_recv.recv(), Ok("stop"));
|
|
||||||
mem::drop(_leased_searcher);
|
|
||||||
assert!(event_send2.send("2 stopped").is_ok());
|
|
||||||
}));
|
|
||||||
|
|
||||||
threads.push(thread::spawn(move || {
|
|
||||||
assert_eq!(start_3_recv.recv(), Ok("start"));
|
|
||||||
let _leased_searcher = pool3.acquire();
|
|
||||||
assert!(event_send3.send("3 acquired").is_ok());
|
|
||||||
assert_eq!(start_3_recv.recv(), Ok("stop"));
|
|
||||||
mem::drop(_leased_searcher);
|
|
||||||
assert!(event_send3.send("3 stopped").is_ok());
|
|
||||||
}));
|
|
||||||
|
|
||||||
assert!(start_1_send.send("start").is_ok());
|
|
||||||
assert_eq!(event_recv.recv(), Ok("1 acquired"));
|
|
||||||
assert!(start_2_send.send("start").is_ok());
|
|
||||||
assert_eq!(event_recv.recv(), Ok("2 acquired"));
|
|
||||||
assert!(start_3_send.send("start").is_ok());
|
|
||||||
assert!(event_recv.try_recv().is_err());
|
|
||||||
assert!(start_1_send.send("stop").is_ok());
|
|
||||||
assert_eq!(event_recv.recv(), Ok("1 stopped"));
|
|
||||||
assert_eq!(event_recv.recv(), Ok("3 acquired"));
|
|
||||||
assert!(start_3_send.send("stop").is_ok());
|
|
||||||
assert_eq!(event_recv.recv(), Ok("3 stopped"));
|
|
||||||
assert!(start_2_send.send("stop").is_ok());
|
|
||||||
assert_eq!(event_recv.recv(), Ok("2 stopped"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -273,7 +273,6 @@ mod tests {
|
|||||||
.reader_builder()
|
.reader_builder()
|
||||||
.reload_policy(ReloadPolicy::Manual)
|
.reload_policy(ReloadPolicy::Manual)
|
||||||
.num_warming_threads(num_warming_threads)
|
.num_warming_threads(num_warming_threads)
|
||||||
.num_searchers(num_searchers)
|
|
||||||
.warmers(vec![
|
.warmers(vec![
|
||||||
Arc::downgrade(&warmer1) as Weak<dyn Warmer>,
|
Arc::downgrade(&warmer1) as Weak<dyn Warmer>,
|
||||||
Arc::downgrade(&warmer2) as Weak<dyn Warmer>,
|
Arc::downgrade(&warmer2) as Weak<dyn Warmer>,
|
||||||
|
|||||||
Reference in New Issue
Block a user