mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 00:02:55 +00:00
remove searcher pool and make Searcher cloneable (#1411)
* remove searcher pool and make Searcher cloneable closes #1410 * use SearcherInner in InnerIndexReader
This commit is contained in:
@@ -3,7 +3,7 @@ Tantivy 0.19
|
||||
- Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396)
|
||||
The `DateTime` type has been updated to hold timestamps with microseconds precision.
|
||||
`DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing).
|
||||
|
||||
- Remove Searcher pool and make `Searcher` cloneable.
|
||||
|
||||
Tantivy 0.18
|
||||
================================
|
||||
|
||||
@@ -59,6 +59,7 @@ measure_time = "0.8.2"
|
||||
pretty_assertions = "1.2.1"
|
||||
serde_cbor = { version = "0.11.2", optional = true }
|
||||
async-trait = "0.1.53"
|
||||
arc-swap = "1.5.0"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3.9"
|
||||
|
||||
@@ -145,11 +145,7 @@ fn main() -> tantivy::Result<()> {
|
||||
let warmers: Vec<Weak<dyn Warmer>> = vec![Arc::downgrade(
|
||||
&(price_dynamic_column.clone() as Arc<dyn Warmer>),
|
||||
)];
|
||||
let reader: IndexReader = index
|
||||
.reader_builder()
|
||||
.warmers(warmers)
|
||||
.num_searchers(1)
|
||||
.try_into()?;
|
||||
let reader: IndexReader = index.reader_builder().warmers(warmers).try_into()?;
|
||||
reader.reload()?;
|
||||
|
||||
let query_parser = QueryParser::for_index(&index, vec![text]);
|
||||
|
||||
@@ -263,7 +263,7 @@ impl SegmentCollector for BytesFastFieldSegmentCollector {
|
||||
}
|
||||
}
|
||||
|
||||
fn make_test_searcher() -> crate::Result<crate::LeasedItem<Searcher>> {
|
||||
fn make_test_searcher() -> crate::Result<Searcher> {
|
||||
let schema = Schema::builder().build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
use crate::collector::Collector;
|
||||
@@ -62,45 +63,20 @@ impl SearcherGeneration {
|
||||
///
|
||||
/// It guarantees that the `Segment` will not be removed before
|
||||
/// the destruction of the `Searcher`.
|
||||
#[derive(Clone)]
|
||||
pub struct Searcher {
|
||||
schema: Schema,
|
||||
index: Index,
|
||||
segment_readers: Vec<SegmentReader>,
|
||||
store_readers: Vec<StoreReader>,
|
||||
generation: TrackedObject<SearcherGeneration>,
|
||||
inner: Arc<SearcherInner>,
|
||||
}
|
||||
|
||||
impl Searcher {
|
||||
/// Creates a new `Searcher`
|
||||
pub(crate) fn new(
|
||||
schema: Schema,
|
||||
index: Index,
|
||||
segment_readers: Vec<SegmentReader>,
|
||||
generation: TrackedObject<SearcherGeneration>,
|
||||
doc_store_cache_size: usize,
|
||||
) -> io::Result<Searcher> {
|
||||
let store_readers: Vec<StoreReader> = segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
|
||||
.collect::<io::Result<Vec<_>>>()?;
|
||||
|
||||
Ok(Searcher {
|
||||
schema,
|
||||
index,
|
||||
segment_readers,
|
||||
store_readers,
|
||||
generation,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the `Index` associated to the `Searcher`
|
||||
pub fn index(&self) -> &Index {
|
||||
&self.index
|
||||
&self.inner.index
|
||||
}
|
||||
|
||||
/// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`.
|
||||
pub fn generation(&self) -> &SearcherGeneration {
|
||||
self.generation.as_ref()
|
||||
self.inner.generation.as_ref()
|
||||
}
|
||||
|
||||
/// Fetches a document from tantivy's store given a `DocAddress`.
|
||||
@@ -108,7 +84,7 @@ impl Searcher {
|
||||
/// The searcher uses the segment ordinal to route the
|
||||
/// the request to the right `Segment`.
|
||||
pub fn doc(&self, doc_address: DocAddress) -> crate::Result<Document> {
|
||||
let store_reader = &self.store_readers[doc_address.segment_ord as usize];
|
||||
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
||||
store_reader.get(doc_address.doc_id)
|
||||
}
|
||||
|
||||
@@ -117,6 +93,7 @@ impl Searcher {
|
||||
/// Aggregates the sum for each segment store reader.
|
||||
pub fn doc_store_cache_stats(&self) -> CacheStats {
|
||||
let cache_stats: CacheStats = self
|
||||
.inner
|
||||
.store_readers
|
||||
.iter()
|
||||
.map(|reader| reader.cache_stats())
|
||||
@@ -127,18 +104,19 @@ impl Searcher {
|
||||
/// Fetches a document in an asynchronous manner.
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
|
||||
let store_reader = &self.store_readers[doc_address.segment_ord as usize];
|
||||
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
||||
store_reader.get_async(doc_address.doc_id).await
|
||||
}
|
||||
|
||||
/// Access the schema associated to the index of this searcher.
|
||||
pub fn schema(&self) -> &Schema {
|
||||
&self.schema
|
||||
&self.inner.schema
|
||||
}
|
||||
|
||||
/// Returns the overall number of documents in the index.
|
||||
pub fn num_docs(&self) -> u64 {
|
||||
self.segment_readers
|
||||
self.inner
|
||||
.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| u64::from(segment_reader.num_docs()))
|
||||
.sum::<u64>()
|
||||
@@ -148,7 +126,7 @@ impl Searcher {
|
||||
/// the given term.
|
||||
pub fn doc_freq(&self, term: &Term) -> crate::Result<u64> {
|
||||
let mut total_doc_freq = 0;
|
||||
for segment_reader in &self.segment_readers {
|
||||
for segment_reader in &self.inner.segment_readers {
|
||||
let inverted_index = segment_reader.inverted_index(term.field())?;
|
||||
let doc_freq = inverted_index.doc_freq(term)?;
|
||||
total_doc_freq += u64::from(doc_freq);
|
||||
@@ -158,12 +136,12 @@ impl Searcher {
|
||||
|
||||
/// Return the list of segment readers
|
||||
pub fn segment_readers(&self) -> &[SegmentReader] {
|
||||
&self.segment_readers
|
||||
&self.inner.segment_readers
|
||||
}
|
||||
|
||||
/// Returns the segment_reader associated with the given segment_ord
|
||||
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
|
||||
&self.segment_readers[segment_ord as usize]
|
||||
&self.inner.segment_readers[segment_ord as usize]
|
||||
}
|
||||
|
||||
/// Runs a query on the segment readers wrapped by the searcher.
|
||||
@@ -185,7 +163,7 @@ impl Searcher {
|
||||
query: &dyn Query,
|
||||
collector: &C,
|
||||
) -> crate::Result<C::Fruit> {
|
||||
let executor = self.index.search_executor();
|
||||
let executor = self.inner.index.search_executor();
|
||||
self.search_with_executor(query, collector, executor)
|
||||
}
|
||||
|
||||
@@ -222,17 +200,59 @@ impl Searcher {
|
||||
/// Summarize total space usage of this searcher.
|
||||
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
|
||||
let mut space_usage = SearcherSpaceUsage::new();
|
||||
for segment_reader in &self.segment_readers {
|
||||
for segment_reader in self.segment_readers() {
|
||||
space_usage.add_segment(segment_reader.space_usage()?);
|
||||
}
|
||||
Ok(space_usage)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Arc<SearcherInner>> for Searcher {
|
||||
fn from(inner: Arc<SearcherInner>) -> Self {
|
||||
Searcher { inner }
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds a list of `SegmentReader`s ready for search.
|
||||
///
|
||||
/// It guarantees that the `Segment` will not be removed before
|
||||
/// the destruction of the `Searcher`.
|
||||
pub(crate) struct SearcherInner {
|
||||
schema: Schema,
|
||||
index: Index,
|
||||
segment_readers: Vec<SegmentReader>,
|
||||
store_readers: Vec<StoreReader>,
|
||||
generation: TrackedObject<SearcherGeneration>,
|
||||
}
|
||||
|
||||
impl SearcherInner {
|
||||
/// Creates a new `Searcher`
|
||||
pub(crate) fn new(
|
||||
schema: Schema,
|
||||
index: Index,
|
||||
segment_readers: Vec<SegmentReader>,
|
||||
generation: TrackedObject<SearcherGeneration>,
|
||||
doc_store_cache_size: usize,
|
||||
) -> io::Result<SearcherInner> {
|
||||
let store_readers: Vec<StoreReader> = segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
|
||||
.collect::<io::Result<Vec<_>>>()?;
|
||||
|
||||
Ok(SearcherInner {
|
||||
schema,
|
||||
index,
|
||||
segment_readers,
|
||||
store_readers,
|
||||
generation,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Searcher {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let segment_ids = self
|
||||
.segment_readers
|
||||
.segment_readers()
|
||||
.iter()
|
||||
.map(SegmentReader::segment_id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -6,8 +6,6 @@ pub use self::writer::BytesFastFieldWriter;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Deref;
|
||||
|
||||
use crate::query::TermQuery;
|
||||
use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value, FAST, INDEXED, STORED};
|
||||
use crate::{DocAddress, DocSet, Index, Searcher, Term};
|
||||
@@ -37,9 +35,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_index_for_test<T: Into<BytesOptions>>(
|
||||
byte_options: T,
|
||||
) -> crate::Result<impl Deref<Target = Searcher>> {
|
||||
fn create_index_for_test<T: Into<BytesOptions>>(byte_options: T) -> crate::Result<Searcher> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field = schema_builder.add_bytes_field("string_bytes", byte_options.into());
|
||||
let schema = schema_builder.build();
|
||||
@@ -86,7 +82,7 @@ mod tests {
|
||||
let field = searcher.schema().get_field("string_bytes").unwrap();
|
||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let term_weight = term_query.specialized_weight(&*searcher, true)?;
|
||||
let term_weight = term_query.specialized_weight(&searcher, true)?;
|
||||
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
|
||||
assert_eq!(term_scorer.doc(), 0u32);
|
||||
Ok(())
|
||||
@@ -99,7 +95,7 @@ mod tests {
|
||||
let field = searcher.schema().get_field("string_bytes").unwrap();
|
||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let term_weight_err = term_query.specialized_weight(&*searcher, false);
|
||||
let term_weight_err = term_query.specialized_weight(&searcher, false);
|
||||
assert!(matches!(
|
||||
term_weight_err,
|
||||
Err(crate::TantivyError::SchemaError(_))
|
||||
|
||||
@@ -112,7 +112,7 @@ mod tests {
|
||||
Term::from_field_text(text, "hello"),
|
||||
IndexRecordOption::WithFreqs,
|
||||
);
|
||||
let weight = query.weight(&*searcher, true)?;
|
||||
let weight = query.weight(&searcher, true)?;
|
||||
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
|
||||
assert_eq!(scorer.doc(), 0);
|
||||
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
|
||||
@@ -141,7 +141,7 @@ mod tests {
|
||||
Term::from_field_text(text, "hello"),
|
||||
IndexRecordOption::WithFreqs,
|
||||
);
|
||||
let weight = query.weight(&*searcher, true)?;
|
||||
let weight = query.weight(&searcher, true)?;
|
||||
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
|
||||
assert_eq!(scorer.doc(), 0);
|
||||
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
|
||||
|
||||
@@ -307,7 +307,6 @@ pub use crate::indexer::demuxer::*;
|
||||
pub use crate::indexer::operation::UserOperation;
|
||||
pub use crate::indexer::{merge_filtered_segments, merge_indices, IndexWriter, PreparedCommit};
|
||||
pub use crate::postings::Postings;
|
||||
pub use crate::reader::LeasedItem;
|
||||
pub use crate::schema::{DateOptions, DatePrecision, Document, Term};
|
||||
|
||||
/// Index format version.
|
||||
|
||||
@@ -359,7 +359,7 @@ pub mod tests {
|
||||
let matching_docs = |query: &str| {
|
||||
let query_parser = QueryParser::for_index(&index, vec![json_field]);
|
||||
let phrase_query = query_parser.parse_query(query).unwrap();
|
||||
let phrase_weight = phrase_query.weight(&*searcher, false).unwrap();
|
||||
let phrase_weight = phrase_query.weight(&searcher, false).unwrap();
|
||||
let mut phrase_scorer = phrase_weight
|
||||
.scorer(searcher.segment_reader(0), 1.0f32)
|
||||
.unwrap();
|
||||
|
||||
@@ -141,7 +141,7 @@ mod tests {
|
||||
let term_a = Term::from_field_text(text_field, "a");
|
||||
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
|
||||
let reader = index.reader()?;
|
||||
assert_eq!(term_query.count(&*reader.searcher())?, 1);
|
||||
assert_eq!(term_query.count(&reader.searcher())?, 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,14 @@
|
||||
mod pool;
|
||||
mod warming;
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::io;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{atomic, Arc, Weak};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
pub use warming::Warmer;
|
||||
|
||||
pub use self::pool::LeasedItem;
|
||||
use self::pool::Pool;
|
||||
use self::warming::WarmingState;
|
||||
use crate::core::searcher::SearcherGeneration;
|
||||
use crate::core::searcher::{SearcherGeneration, SearcherInner};
|
||||
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
|
||||
use crate::store::DOCSTORE_CACHE_CAPACITY;
|
||||
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};
|
||||
@@ -37,13 +34,12 @@ pub enum ReloadPolicy {
|
||||
/// [IndexReader] builder
|
||||
///
|
||||
/// It makes it possible to configure:
|
||||
/// - [Searcher] pool size
|
||||
/// - [ReloadPolicy] defining when new index versions are detected
|
||||
/// - [Warmer] implementations
|
||||
/// - number of warming threads, for parallelizing warming work
|
||||
/// - The cache size of the underlying doc store readers.
|
||||
#[derive(Clone)]
|
||||
pub struct IndexReaderBuilder {
|
||||
num_searchers: usize,
|
||||
reload_policy: ReloadPolicy,
|
||||
index: Index,
|
||||
warmers: Vec<Weak<dyn Warmer>>,
|
||||
@@ -55,7 +51,6 @@ impl IndexReaderBuilder {
|
||||
#[must_use]
|
||||
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
|
||||
IndexReaderBuilder {
|
||||
num_searchers: num_cpus::get(),
|
||||
reload_policy: ReloadPolicy::OnCommit,
|
||||
index,
|
||||
warmers: Vec::new(),
|
||||
@@ -76,16 +71,12 @@ impl IndexReaderBuilder {
|
||||
self.warmers,
|
||||
searcher_generation_inventory.clone(),
|
||||
)?;
|
||||
let inner_reader = InnerIndexReader {
|
||||
index: self.index,
|
||||
num_searchers: self.num_searchers,
|
||||
doc_store_cache_size: self.doc_store_cache_size,
|
||||
searcher_pool: Pool::new(),
|
||||
let inner_reader = InnerIndexReader::new(
|
||||
self.doc_store_cache_size,
|
||||
self.index,
|
||||
warming_state,
|
||||
searcher_generation_counter: Default::default(),
|
||||
searcher_generation_inventory,
|
||||
};
|
||||
inner_reader.reload()?;
|
||||
)?;
|
||||
let inner_reader_arc = Arc::new(inner_reader);
|
||||
let watch_handle_opt: Option<WatchHandle> = match self.reload_policy {
|
||||
ReloadPolicy::Manual => {
|
||||
@@ -133,15 +124,6 @@ impl IndexReaderBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the number of [Searcher] to pool.
|
||||
///
|
||||
/// See [IndexReader::searcher()].
|
||||
#[must_use]
|
||||
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
|
||||
self.num_searchers = num_searchers;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the [Warmer]s that are invoked when reloading searchable segments.
|
||||
#[must_use]
|
||||
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
|
||||
@@ -169,24 +151,52 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
|
||||
}
|
||||
|
||||
struct InnerIndexReader {
|
||||
num_searchers: usize,
|
||||
doc_store_cache_size: usize,
|
||||
index: Index,
|
||||
warming_state: WarmingState,
|
||||
searcher_pool: Pool<Searcher>,
|
||||
searcher: arc_swap::ArcSwap<SearcherInner>,
|
||||
searcher_generation_counter: Arc<AtomicU64>,
|
||||
searcher_generation_inventory: Inventory<SearcherGeneration>,
|
||||
}
|
||||
|
||||
impl InnerIndexReader {
|
||||
fn new(
|
||||
doc_store_cache_size: usize,
|
||||
index: Index,
|
||||
warming_state: WarmingState,
|
||||
searcher_generation_inventory: Inventory<SearcherGeneration>,
|
||||
) -> crate::Result<Self> {
|
||||
let searcher_generation_counter: Arc<AtomicU64> = Default::default();
|
||||
let segment_readers = Self::open_segment_readers(&index)?;
|
||||
let searcher_generation = Self::create_new_searcher_generation(
|
||||
&segment_readers,
|
||||
&searcher_generation_counter,
|
||||
&searcher_generation_inventory,
|
||||
);
|
||||
|
||||
let searcher = Self::create_searcher(
|
||||
&index,
|
||||
doc_store_cache_size,
|
||||
&warming_state,
|
||||
searcher_generation,
|
||||
)?;
|
||||
Ok(InnerIndexReader {
|
||||
doc_store_cache_size,
|
||||
index,
|
||||
warming_state,
|
||||
searcher: ArcSwap::from(searcher),
|
||||
searcher_generation_counter,
|
||||
searcher_generation_inventory,
|
||||
})
|
||||
}
|
||||
/// Opens the freshest segments `SegmentReader`.
|
||||
///
|
||||
/// This function acquires a lot to prevent GC from removing files
|
||||
/// as we are opening our index.
|
||||
fn open_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
|
||||
fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
|
||||
// Prevents segment files from getting deleted while we are in the process of opening them
|
||||
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
|
||||
let searchable_segments = self.index.searchable_segments()?;
|
||||
let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
|
||||
let searchable_segments = index.searchable_segments()?;
|
||||
let segment_readers = searchable_segments
|
||||
.iter()
|
||||
.map(SegmentReader::open)
|
||||
@@ -195,41 +205,57 @@ impl InnerIndexReader {
|
||||
}
|
||||
|
||||
fn create_new_searcher_generation(
|
||||
&self,
|
||||
segment_readers: &[SegmentReader],
|
||||
searcher_generation_counter: &Arc<AtomicU64>,
|
||||
searcher_generation_inventory: &Inventory<SearcherGeneration>,
|
||||
) -> TrackedObject<SearcherGeneration> {
|
||||
let generation_id = self
|
||||
.searcher_generation_counter
|
||||
.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
let searcher_generation =
|
||||
SearcherGeneration::from_segment_readers(segment_readers, generation_id);
|
||||
self.searcher_generation_inventory
|
||||
.track(searcher_generation)
|
||||
searcher_generation_inventory.track(searcher_generation)
|
||||
}
|
||||
|
||||
fn create_searcher(
|
||||
index: &Index,
|
||||
doc_store_cache_size: usize,
|
||||
warming_state: &WarmingState,
|
||||
searcher_generation: TrackedObject<SearcherGeneration>,
|
||||
) -> crate::Result<Arc<SearcherInner>> {
|
||||
let segment_readers = Self::open_segment_readers(index)?;
|
||||
let schema = index.schema();
|
||||
let searcher = Arc::new(SearcherInner::new(
|
||||
schema,
|
||||
index.clone(),
|
||||
segment_readers,
|
||||
searcher_generation,
|
||||
doc_store_cache_size,
|
||||
)?);
|
||||
|
||||
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
|
||||
Ok(searcher)
|
||||
}
|
||||
|
||||
fn reload(&self) -> crate::Result<()> {
|
||||
let segment_readers = self.open_segment_readers()?;
|
||||
let searcher_generation = self.create_new_searcher_generation(&segment_readers);
|
||||
let schema = self.index.schema();
|
||||
let searchers: Vec<Searcher> = std::iter::repeat_with(|| {
|
||||
Searcher::new(
|
||||
schema.clone(),
|
||||
self.index.clone(),
|
||||
segment_readers.clone(),
|
||||
searcher_generation.clone(),
|
||||
self.doc_store_cache_size,
|
||||
)
|
||||
})
|
||||
.take(self.num_searchers)
|
||||
.collect::<io::Result<_>>()?;
|
||||
self.warming_state
|
||||
.warm_new_searcher_generation(&searchers[0])?;
|
||||
self.searcher_pool.publish_new_generation(searchers);
|
||||
let segment_readers = Self::open_segment_readers(&self.index)?;
|
||||
let searcher_generation = Self::create_new_searcher_generation(
|
||||
&segment_readers,
|
||||
&self.searcher_generation_counter,
|
||||
&self.searcher_generation_inventory,
|
||||
);
|
||||
let searcher = Self::create_searcher(
|
||||
&self.index,
|
||||
self.doc_store_cache_size,
|
||||
&self.warming_state,
|
||||
searcher_generation,
|
||||
)?;
|
||||
|
||||
self.searcher.store(searcher);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
self.searcher_pool.acquire()
|
||||
fn searcher(&self) -> Searcher {
|
||||
self.searcher.load().clone().into()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -275,7 +301,7 @@ impl IndexReader {
|
||||
///
|
||||
/// The same searcher must be used for a given query, as it ensures
|
||||
/// the use of a consistent segment set.
|
||||
pub fn searcher(&self) -> LeasedItem<Searcher> {
|
||||
pub fn searcher(&self) -> Searcher {
|
||||
self.inner.searcher()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,295 +0,0 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crossbeam_channel::{unbounded, Receiver, RecvError, Sender};
|
||||
|
||||
pub struct GenerationItem<T> {
|
||||
generation: usize,
|
||||
item: T,
|
||||
}
|
||||
|
||||
/// Queue implementation for the Object Pool below
|
||||
/// Uses the unbounded Linked-List type queue from crossbeam-channel
|
||||
/// Splits the Queue into sender and receiver
|
||||
struct Queue<T> {
|
||||
sender: Sender<T>,
|
||||
receiver: Receiver<T>,
|
||||
}
|
||||
|
||||
impl<T> Queue<T> {
|
||||
fn new() -> Self {
|
||||
let (s, r) = unbounded();
|
||||
Queue {
|
||||
sender: s,
|
||||
receiver: r,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sender trait returns a Result type, which is ignored.
|
||||
/// The Result is not handled at the moment
|
||||
fn push(&self, elem: T) {
|
||||
self.sender
|
||||
.send(elem)
|
||||
.expect("Sending an item to crossbeam-queue shouldn't fail");
|
||||
}
|
||||
|
||||
/// Relies on the underlying crossbeam-channel Receiver
|
||||
/// to block on empty queue
|
||||
fn pop(&self) -> Result<T, RecvError> {
|
||||
self.receiver.recv()
|
||||
}
|
||||
}
|
||||
|
||||
/// An object pool
|
||||
///
|
||||
/// This is used in tantivy to create a pool of `Searcher`.
|
||||
/// Object are wrapped in a `LeasedItem` wrapper and are
|
||||
/// released automatically back into the pool on `Drop`.
|
||||
pub struct Pool<T> {
|
||||
queue: Arc<Queue<GenerationItem<T>>>,
|
||||
freshest_generation: AtomicUsize,
|
||||
next_generation: AtomicUsize,
|
||||
}
|
||||
|
||||
impl<T> Pool<T> {
|
||||
pub fn new() -> Pool<T> {
|
||||
let queue = Arc::new(Queue::new());
|
||||
Pool {
|
||||
queue,
|
||||
freshest_generation: AtomicUsize::default(),
|
||||
next_generation: AtomicUsize::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Publishes a new generation of `Searcher`.
|
||||
///
|
||||
/// After publish, all new `Searcher` acquired will be
|
||||
/// of the new generation.
|
||||
pub fn publish_new_generation(&self, items: Vec<T>) {
|
||||
assert!(!items.is_empty());
|
||||
let next_generation = self.next_generation.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
let num_items = items.len();
|
||||
for item in items {
|
||||
let gen_item = GenerationItem {
|
||||
item,
|
||||
generation: next_generation,
|
||||
};
|
||||
self.queue.push(gen_item);
|
||||
}
|
||||
self.advertise_generation(next_generation);
|
||||
// Purge possible previous searchers.
|
||||
//
|
||||
// Assuming at this point no searcher is held more than duration T by the user,
|
||||
// this guarantees that an obsolete searcher will not be uselessly held (and its associated
|
||||
// mmap) for more than duration T.
|
||||
//
|
||||
// Proof: At this point, obsolete searcher that are held by the user will be held for less
|
||||
// than T. When released, they will be dropped as their generation is detected obsolete.
|
||||
//
|
||||
// We still need to ensure that the searcher that are obsolete and in the pool get removed.
|
||||
// The queue currently contains up to 2n searchers, in any random order.
|
||||
//
|
||||
// Half of them are obsoletes. By requesting `(n+1)` fresh searchers, we ensure that all
|
||||
// searcher will be inspected.
|
||||
for _ in 0..=num_items {
|
||||
let _ = self.acquire();
|
||||
}
|
||||
}
|
||||
|
||||
/// At the exit of this method,
|
||||
/// - freshest_generation has a value greater or equal than generation
|
||||
/// - freshest_generation has the last value that has been advertised
|
||||
fn advertise_generation(&self, generation: usize) {
|
||||
// not optimal at all but the easiest to read proof.
|
||||
let mut former_generation = self.freshest_generation.load(Ordering::Acquire);
|
||||
loop {
|
||||
match self.freshest_generation.compare_exchange(
|
||||
former_generation,
|
||||
generation,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// We successfuly updated the value.
|
||||
return;
|
||||
}
|
||||
Err(current_generation) => {
|
||||
// The value was updated after we did our load apparently.
|
||||
// In theory, it is always a value greater than ours, but just to
|
||||
// simplify the logic, we keep looping until we reach a
|
||||
// value >= to our target value.
|
||||
if current_generation >= generation {
|
||||
return;
|
||||
}
|
||||
former_generation = current_generation;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn generation(&self) -> usize {
|
||||
self.freshest_generation.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
/// Acquires a new searcher.
|
||||
///
|
||||
/// If no searcher is available, this methods block until
|
||||
/// a searcher is released.
|
||||
pub fn acquire(&self) -> LeasedItem<T> {
|
||||
let generation = self.generation();
|
||||
loop {
|
||||
let gen_item = self.queue.pop().unwrap();
|
||||
if gen_item.generation >= generation {
|
||||
return LeasedItem {
|
||||
gen_item: Some(gen_item),
|
||||
recycle_queue: Arc::clone(&self.queue),
|
||||
};
|
||||
} else {
|
||||
// this searcher is obsolete,
|
||||
// removing it from the pool.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A LeasedItem holds an object borrowed from a Pool.
|
||||
///
|
||||
/// Upon drop, the object is automatically returned
|
||||
/// into the pool.
|
||||
pub struct LeasedItem<T> {
|
||||
gen_item: Option<GenerationItem<T>>,
|
||||
recycle_queue: Arc<Queue<GenerationItem<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Deref for LeasedItem<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
&self
|
||||
.gen_item
|
||||
.as_ref()
|
||||
.expect("Unwrapping a leased item should never fail")
|
||||
.item // unwrap is safe here
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for LeasedItem<T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
&mut self
|
||||
.gen_item
|
||||
.as_mut()
|
||||
.expect("Unwrapping a mut leased item should never fail")
|
||||
.item // unwrap is safe here
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for LeasedItem<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(gen_item) = self.gen_item.take() {
|
||||
self.recycle_queue.push(gen_item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::{iter, mem};
|
||||
|
||||
use crossbeam_channel as channel;
|
||||
|
||||
use super::{Pool, Queue};
|
||||
|
||||
#[test]
|
||||
fn test_pool() {
|
||||
let items10: Vec<usize> = iter::repeat(10).take(10).collect();
|
||||
let pool = Pool::new();
|
||||
pool.publish_new_generation(items10);
|
||||
for _ in 0..20 {
|
||||
assert_eq!(*pool.acquire(), 10);
|
||||
}
|
||||
let items11: Vec<usize> = iter::repeat(11).take(10).collect();
|
||||
pool.publish_new_generation(items11);
|
||||
for _ in 0..20 {
|
||||
assert_eq!(*pool.acquire(), 11);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_queue() {
|
||||
let q = Queue::new();
|
||||
let elem = 5;
|
||||
q.push(elem);
|
||||
let res = q.pop();
|
||||
assert_eq!(res.unwrap(), elem);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pool_dont_panic_on_empty_pop() {
|
||||
// When the object pool is exhausted, it shouldn't panic on pop()
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
// Wrap the pool in an Arc, same way as its used in `core/index.rs`
|
||||
let pool1 = Arc::new(Pool::new());
|
||||
// clone pools outside the move scope of each new thread
|
||||
let pool2 = Arc::clone(&pool1);
|
||||
let pool3 = Arc::clone(&pool1);
|
||||
|
||||
let elements_for_pool = vec![1, 2];
|
||||
pool1.publish_new_generation(elements_for_pool);
|
||||
|
||||
let mut threads = vec![];
|
||||
// spawn one more thread than there are elements in the pool
|
||||
|
||||
let (start_1_send, start_1_recv) = channel::bounded(0);
|
||||
let (start_2_send, start_2_recv) = channel::bounded(0);
|
||||
let (start_3_send, start_3_recv) = channel::bounded(0);
|
||||
|
||||
let (event_send1, event_recv) = channel::unbounded();
|
||||
let event_send2 = event_send1.clone();
|
||||
let event_send3 = event_send1.clone();
|
||||
|
||||
threads.push(thread::spawn(move || {
|
||||
assert_eq!(start_1_recv.recv(), Ok("start"));
|
||||
let _leased_searcher = pool1.acquire();
|
||||
assert!(event_send1.send("1 acquired").is_ok());
|
||||
assert_eq!(start_1_recv.recv(), Ok("stop"));
|
||||
assert!(event_send1.send("1 stopped").is_ok());
|
||||
mem::drop(_leased_searcher);
|
||||
}));
|
||||
|
||||
threads.push(thread::spawn(move || {
|
||||
assert_eq!(start_2_recv.recv(), Ok("start"));
|
||||
let _leased_searcher = pool2.acquire();
|
||||
assert!(event_send2.send("2 acquired").is_ok());
|
||||
assert_eq!(start_2_recv.recv(), Ok("stop"));
|
||||
mem::drop(_leased_searcher);
|
||||
assert!(event_send2.send("2 stopped").is_ok());
|
||||
}));
|
||||
|
||||
threads.push(thread::spawn(move || {
|
||||
assert_eq!(start_3_recv.recv(), Ok("start"));
|
||||
let _leased_searcher = pool3.acquire();
|
||||
assert!(event_send3.send("3 acquired").is_ok());
|
||||
assert_eq!(start_3_recv.recv(), Ok("stop"));
|
||||
mem::drop(_leased_searcher);
|
||||
assert!(event_send3.send("3 stopped").is_ok());
|
||||
}));
|
||||
|
||||
assert!(start_1_send.send("start").is_ok());
|
||||
assert_eq!(event_recv.recv(), Ok("1 acquired"));
|
||||
assert!(start_2_send.send("start").is_ok());
|
||||
assert_eq!(event_recv.recv(), Ok("2 acquired"));
|
||||
assert!(start_3_send.send("start").is_ok());
|
||||
assert!(event_recv.try_recv().is_err());
|
||||
assert!(start_1_send.send("stop").is_ok());
|
||||
assert_eq!(event_recv.recv(), Ok("1 stopped"));
|
||||
assert_eq!(event_recv.recv(), Ok("3 acquired"));
|
||||
assert!(start_3_send.send("stop").is_ok());
|
||||
assert_eq!(event_recv.recv(), Ok("3 stopped"));
|
||||
assert!(start_2_send.send("stop").is_ok());
|
||||
assert_eq!(event_recv.recv(), Ok("2 stopped"));
|
||||
}
|
||||
}
|
||||
@@ -273,7 +273,6 @@ mod tests {
|
||||
.reader_builder()
|
||||
.reload_policy(ReloadPolicy::Manual)
|
||||
.num_warming_threads(num_warming_threads)
|
||||
.num_searchers(num_searchers)
|
||||
.warmers(vec![
|
||||
Arc::downgrade(&warmer1) as Weak<dyn Warmer>,
|
||||
Arc::downgrade(&warmer2) as Weak<dyn Warmer>,
|
||||
|
||||
Reference in New Issue
Block a user