Searcher Warming API (#1261)

Adds an API to register Warmers in the IndexReader.

Co-authored-by: Paul Masurel <paul@quickwit.io>
This commit is contained in:
Shikhar Bhushan
2022-01-20 09:40:25 -05:00
committed by GitHub
parent 732f6847c0
commit 99d4b1a177
11 changed files with 733 additions and 37 deletions

View File

@@ -1,11 +1,12 @@
Tantivy 0.17
================================
- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar @fulmicoton) [#115](https://github.com/quickwit-inc/tantivy/issues/115)
- Adds a searcher Warmer API (@shikhar @fulmicoton)
- Change to non-strict schema. Ignore fields in data which are not defined in schema. Previously this returned an error. #1211
- Facets are necessarily indexed. Existing index with indexed facets should work out of the box. Index without facets that are marked with index: false should be broken (but they were already broken in a sense). (@fulmicoton) #1195 .
- Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224)
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225)
- Schema now offers not indexing fieldnorms (@lpouget) [#922](https://github.com/quickwit-inc/tantivy/issues/922)
- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar) [#115](https://github.com/quickwit-inc/tantivy/issues/115)
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225)
Tantivy 0.16.2
================================

223
examples/warmer.rs Normal file
View File

@@ -0,0 +1,223 @@
use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock, Weak};
use tantivy::collector::TopDocs;
use tantivy::fastfield::FastFieldReader;
use tantivy::query::QueryParser;
use tantivy::schema::{Field, Schema, FAST, TEXT};
use tantivy::{doc, DocAddress, DocId, Index, IndexReader, SegmentReader, TrackedObject};
use tantivy::{Opstamp, Searcher, SearcherGeneration, SegmentId, Warmer};
// This example shows how warmers can be used to
// load a values from an external sources using the Warmer API.
//
// In this example, we assume an e-commerce search engine.
type ProductId = u64;
/// Price
type Price = u32;
pub trait PriceFetcher: Send + Sync + 'static {
fn fetch_prices(&self, product_ids: &[ProductId]) -> Vec<Price>;
}
struct DynamicPriceColumn {
field: Field,
price_cache: RwLock<HashMap<(SegmentId, Option<Opstamp>), Arc<Vec<Price>>>>,
price_fetcher: Box<dyn PriceFetcher>,
}
impl DynamicPriceColumn {
pub fn with_product_id_field<T: PriceFetcher>(field: Field, price_fetcher: T) -> Self {
DynamicPriceColumn {
field,
price_cache: Default::default(),
price_fetcher: Box::new(price_fetcher),
}
}
pub fn price_for_segment(&self, segment_reader: &SegmentReader) -> Option<Arc<Vec<Price>>> {
let segment_key = (segment_reader.segment_id(), segment_reader.delete_opstamp());
self.price_cache.read().unwrap().get(&segment_key).cloned()
}
}
impl Warmer for DynamicPriceColumn {
fn warm(&self, searcher: &Searcher) -> tantivy::Result<()> {
for segment in searcher.segment_readers() {
let key = (segment.segment_id(), segment.delete_opstamp());
let product_id_reader = segment.fast_fields().u64(self.field)?;
let product_ids: Vec<ProductId> = segment
.doc_ids_alive()
.map(|doc| product_id_reader.get(doc))
.collect();
let mut prices_it = self.price_fetcher.fetch_prices(&product_ids).into_iter();
let mut price_vals: Vec<Price> = Vec::new();
for doc in 0..segment.max_doc() {
if segment.is_deleted(doc) {
price_vals.push(0);
} else {
price_vals.push(prices_it.next().unwrap())
}
}
self.price_cache
.write()
.unwrap()
.insert(key, Arc::new(price_vals));
}
Ok(())
}
fn garbage_collect(&self, live_generations: &[TrackedObject<SearcherGeneration>]) {
let live_segment_id_and_delete_ops: HashSet<(SegmentId, Option<Opstamp>)> =
live_generations
.iter()
.flat_map(|gen| gen.segments())
.map(|(&segment_id, &opstamp)| (segment_id, opstamp))
.collect();
let mut price_cache_wrt = self.price_cache.write().unwrap();
// let price_cache = std::mem::take(&mut *price_cache_wrt);
// Drain would be nicer here.
*price_cache_wrt = std::mem::take(&mut *price_cache_wrt)
.into_iter()
.filter(|(seg_id_and_op, _)| !live_segment_id_and_delete_ops.contains(seg_id_and_op))
.collect();
}
}
/// For the sake of this example, the table is just an editable HashMap behind a RwLock.
/// This map represents a map (ProductId -> Price)
///
/// In practise, it could be fetching things from an external service, like a SQL table.
///
#[derive(Default, Clone)]
pub struct ExternalPriceTable {
prices: Arc<RwLock<HashMap<ProductId, Price>>>,
}
impl ExternalPriceTable {
pub fn update_price(&self, product_id: ProductId, price: Price) {
let mut prices_wrt = self.prices.write().unwrap();
prices_wrt.insert(product_id, price);
}
}
impl PriceFetcher for ExternalPriceTable {
fn fetch_prices(&self, product_ids: &[ProductId]) -> Vec<Price> {
let prices_read = self.prices.read().unwrap();
product_ids
.iter()
.map(|product_id| prices_read.get(product_id).cloned().unwrap_or(0))
.collect()
}
}
fn main() -> tantivy::Result<()> {
// Declaring our schema.
let mut schema_builder = Schema::builder();
// The product id is assumed to be a primary id for our external price source.
let product_id = schema_builder.add_u64_field("product_id", FAST);
let text = schema_builder.add_text_field("text", TEXT);
let schema: Schema = schema_builder.build();
let price_table = ExternalPriceTable::default();
let price_dynamic_column = Arc::new(DynamicPriceColumn::with_product_id_field(
product_id,
price_table.clone(),
));
price_table.update_price(OLIVE_OIL, 12);
price_table.update_price(GLOVES, 13);
price_table.update_price(SNEAKERS, 80);
const OLIVE_OIL: ProductId = 323423;
const GLOVES: ProductId = 3966623;
const SNEAKERS: ProductId = 23222;
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 10_000_000)?;
writer.add_document(doc!(product_id=>OLIVE_OIL, text=>"cooking olive oil from greece"))?;
writer.add_document(doc!(product_id=>GLOVES, text=>"kitchen gloves, perfect for cooking"))?;
writer.add_document(doc!(product_id=>SNEAKERS, text=>"uber sweet sneakers"))?;
writer.commit()?;
let warmers: Vec<Weak<dyn Warmer>> = vec![Arc::downgrade(
&(price_dynamic_column.clone() as Arc<dyn Warmer>),
)];
let reader: IndexReader = index
.reader_builder()
.warmers(warmers)
.num_searchers(1)
.try_into()?;
reader.reload()?;
let query_parser = QueryParser::for_index(&index, vec![text]);
let query = query_parser.parse_query("cooking")?;
let searcher = reader.searcher();
let score_by_price = move |segment_reader: &SegmentReader| {
let price = price_dynamic_column
.price_for_segment(segment_reader)
.unwrap();
move |doc_id: DocId| Reverse(price[doc_id as usize])
};
let most_expensive_first = TopDocs::with_limit(10).custom_score(score_by_price);
let hits = searcher.search(&query, &most_expensive_first)?;
assert_eq!(
&hits,
&[
(
Reverse(12u32),
DocAddress {
segment_ord: 0,
doc_id: 0u32
}
),
(
Reverse(13u32),
DocAddress {
segment_ord: 0,
doc_id: 1u32
}
),
]
);
// Olive oil just got more expensive!
price_table.update_price(OLIVE_OIL, 15);
// The price update are directly reflected on `reload`.
//
// Be careful here though!...
// You may have spotted that we are still using the same `Searcher`.
//
// It is up to the `Warmer` implementer to decide how
// to control this behavior.
reader.reload()?;
let hits_with_new_prices = searcher.search(&query, &most_expensive_first)?;
assert_eq!(
&hits_with_new_prices,
&[
(
Reverse(13u32),
DocAddress {
segment_ord: 0,
doc_id: 1u32
}
),
(
Reverse(15u32),
DocAddress {
segment_ord: 0,
doc_id: 0u32
}
),
]
);
Ok(())
}

View File

@@ -217,7 +217,7 @@ impl Index {
/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Arc::new(Executor::multi_thread(num_threads, "thrd-tantivy-search-")?);
self.executor = Arc::new(Executor::multi_thread(num_threads, "tantivy-search-")?);
Ok(())
}

View File

@@ -2,7 +2,7 @@ use super::SegmentComponent;
use crate::schema::Schema;
use crate::Opstamp;
use crate::{core::SegmentId, store::Compressor};
use census::{Inventory, TrackedObject};
use crate::{Inventory, TrackedObject};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{collections::HashSet, sync::atomic::AtomicBool};

View File

@@ -14,7 +14,7 @@ pub use self::index_meta::{
IndexMeta, IndexSettings, IndexSortByField, Order, SegmentMeta, SegmentMetaInventory,
};
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::searcher::Searcher;
pub use self::searcher::{Searcher, SearcherGeneration};
pub use self::segment::Segment;
pub use self::segment_component::SegmentComponent;
pub use self::segment_id::SegmentId;

View File

@@ -1,6 +1,5 @@
use crate::collector::Collector;
use crate::core::Executor;
use crate::core::SegmentReader;
use crate::query::Query;
use crate::schema::Document;
@@ -10,9 +9,62 @@ use crate::space_usage::SearcherSpaceUsage;
use crate::store::StoreReader;
use crate::DocAddress;
use crate::Index;
use crate::Opstamp;
use crate::SegmentId;
use crate::TrackedObject;
use std::collections::BTreeMap;
use std::{fmt, io};
/// Identifies the searcher generation accessed by a [Searcher].
///
/// While this might seem redundant, a [SearcherGeneration] contains
/// both a `generation_id` AND a list of `(SegmentId, DeleteOpstamp)`.
///
/// This is on purpose. This object is used by the `Warmer` API.
/// Having both information makes it possible to identify which
/// artifact should be refreshed or garbage collected.
///
/// Depending on the use case, `Warmer`'s implementers can decide to
/// produce artifacts per:
/// - `generation_id` (e.g. some searcher level aggregates)
/// - `(segment_id, delete_opstamp)` (e.g. segment level aggregates)
/// - `segment_id` (e.g. for immutable document level information)
/// - `(generation_id, segment_id)` (e.g. for consistent dynamic column)
/// - ...
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SearcherGeneration {
segments: BTreeMap<SegmentId, Option<Opstamp>>,
generation_id: u64,
}
impl SearcherGeneration {
pub(crate) fn from_segment_readers(
segment_readers: &[SegmentReader],
generation_id: u64,
) -> Self {
let mut segment_id_to_del_opstamp = BTreeMap::new();
for segment_reader in segment_readers {
segment_id_to_del_opstamp
.insert(segment_reader.segment_id(), segment_reader.delete_opstamp());
}
Self {
segments: segment_id_to_del_opstamp,
generation_id,
}
}
/// Returns the searcher generation id.
pub fn generation_id(&self) -> u64 {
self.generation_id
}
/// Return a `(SegmentId -> DeleteOpstamp)` mapping.
pub fn segments(&self) -> &BTreeMap<SegmentId, Option<Opstamp>> {
&self.segments
}
}
/// Holds a list of `SegmentReader`s ready for search.
///
/// It guarantees that the `Segment` will not be removed before
@@ -23,6 +75,7 @@ pub struct Searcher {
index: Index,
segment_readers: Vec<SegmentReader>,
store_readers: Vec<StoreReader>,
generation: TrackedObject<SearcherGeneration>,
}
impl Searcher {
@@ -31,6 +84,7 @@ impl Searcher {
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
) -> io::Result<Searcher> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
@@ -41,6 +95,7 @@ impl Searcher {
index,
segment_readers,
store_readers,
generation,
})
}
@@ -49,6 +104,11 @@ impl Searcher {
&self.index
}
/// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`.
pub fn generation(&self) -> &SearcherGeneration {
self.generation.as_ref()
}
/// Fetches a document from tantivy's store given a `DocAddress`.
///
/// The searcher uses the segment ordinal to route the

View File

@@ -17,6 +17,7 @@ use crate::space_usage::SegmentSpaceUsage;
use crate::store::StoreReader;
use crate::termdict::TermDictionary;
use crate::DocId;
use crate::Opstamp;
use fail::fail_point;
use std::fmt;
use std::sync::Arc;
@@ -38,6 +39,8 @@ pub struct SegmentReader {
inv_idx_reader_cache: Arc<RwLock<HashMap<Field, Arc<InvertedIndexReader>>>>,
segment_id: SegmentId,
delete_opstamp: Option<Opstamp>,
max_doc: DocId,
num_docs: DocId,
@@ -205,6 +208,7 @@ impl SegmentReader {
fast_fields_readers: fast_field_readers,
fieldnorm_readers,
segment_id: segment.id(),
delete_opstamp: segment.meta().delete_opstamp(),
store_file,
alive_bitset_opt,
positions_composite,
@@ -290,6 +294,11 @@ impl SegmentReader {
self.segment_id
}
/// Returns the delete opstamp
pub fn delete_opstamp(&self) -> Option<Opstamp> {
self.delete_opstamp
}
/// Returns the bitset representing
/// the documents that have been deleted.
pub fn alive_bitset(&self) -> Option<&AliveBitSet> {

View File

@@ -1,6 +1,6 @@
use crate::Opstamp;
use crate::SegmentId;
use census::{Inventory, TrackedObject};
use crate::{Inventory, TrackedObject};
use std::collections::HashSet;
use std::ops::Deref;

View File

@@ -158,7 +158,7 @@ pub mod termdict;
mod reader;
pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy};
pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy, Warmer};
mod snippet;
pub use self::snippet::{Snippet, SnippetGenerator};
@@ -166,8 +166,8 @@ mod docset;
pub use self::docset::{DocSet, TERMINATED};
pub use crate::core::{Executor, SegmentComponent};
pub use crate::core::{
Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, Order, Searcher, Segment,
SegmentId, SegmentMeta,
Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, Order, Searcher,
SearcherGeneration, Segment, SegmentId, SegmentMeta,
};
pub use crate::core::{InvertedIndexReader, SegmentReader};
pub use crate::directory::Directory;
@@ -179,6 +179,7 @@ pub use crate::indexer::{IndexWriter, PreparedCommit};
pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{Document, Term};
pub use census::{Inventory, TrackedObject};
pub use common::HasLen;
pub use common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
use std::fmt;

View File

@@ -1,16 +1,23 @@
mod pool;
mod warming;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use crate::core::Segment;
use self::warming::WarmingState;
use crate::core::searcher::SearcherGeneration;
use crate::directory::WatchHandle;
use crate::directory::META_LOCK;
use crate::directory::{Directory, WatchCallback};
use crate::Index;
use crate::Searcher;
use crate::SegmentReader;
use crate::{Inventory, TrackedObject};
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::Weak;
use std::{convert::TryInto, io};
pub use warming::Warmer;
/// Defines when a new version of the index should be reloaded.
///
@@ -29,22 +36,20 @@ pub enum ReloadPolicy {
OnCommit, // TODO add NEAR_REAL_TIME(target_ms)
}
/// `IndexReader` builder
/// [IndexReader] builder
///
/// It makes it possible to set the following values.
///
/// - `num_searchers` (by default, the number of detected CPU threads):
///
/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block
/// until the one of the searcher in-use gets released.
/// - `reload_policy` (by default `ReloadPolicy::OnCommit`):
///
/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
/// It makes it possible to configure:
/// - [Searcher] pool size
/// - [ReloadPolicy] defining when new index versions are detected
/// - [Warmer] implementations
/// - number of warming threads, for parallelizing warming work
#[derive(Clone)]
pub struct IndexReaderBuilder {
num_searchers: usize,
reload_policy: ReloadPolicy,
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
}
impl IndexReaderBuilder {
@@ -53,6 +58,8 @@ impl IndexReaderBuilder {
num_searchers: num_cpus::get(),
reload_policy: ReloadPolicy::OnCommit,
index,
warmers: Vec::new(),
num_warming_threads: 1,
}
}
@@ -63,10 +70,19 @@ impl IndexReaderBuilder {
/// of time and it may return an error.
#[allow(clippy::needless_late_init)]
pub fn try_into(self) -> crate::Result<IndexReader> {
let searcher_generation_inventory = Inventory::default();
let warming_state = WarmingState::new(
self.num_warming_threads,
self.warmers,
searcher_generation_inventory.clone(),
)?;
let inner_reader = InnerIndexReader {
index: self.index,
num_searchers: self.num_searchers,
searcher_pool: Pool::new(),
warming_state,
searcher_generation_counter: Default::default(),
searcher_generation_inventory,
};
inner_reader.reload()?;
let inner_reader_arc = Arc::new(inner_reader);
@@ -107,11 +123,27 @@ impl IndexReaderBuilder {
self
}
/// Sets the number of `Searcher` in the searcher pool.
/// Sets the number of [Searcher] to pool.
///
/// See [Self::searcher()].
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
self.num_searchers = num_searchers;
self
}
/// Set the [Warmer]s that are invoked when reloading searchable segments.
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
self.warmers = warmers;
self
}
/// Sets the number of warming threads.
///
/// This allows parallelizing warming work when there are multiple [Warmer] registered with the [IndexReader].
pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder {
self.num_warming_threads = num_warming_threads;
self
}
}
impl TryInto<IndexReader> for IndexReaderBuilder {
@@ -124,35 +156,62 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
struct InnerIndexReader {
num_searchers: usize,
searcher_pool: Pool<Searcher>,
index: Index,
warming_state: WarmingState,
searcher_pool: Pool<Searcher>,
searcher_generation_counter: Arc<AtomicU64>,
searcher_generation_inventory: Inventory<SearcherGeneration>,
}
impl InnerIndexReader {
/// Opens the freshest segments `SegmentReader`.
///
/// This function acquires a lot to prevent GC from removing files
/// as we are opening our index.
fn open_segment_readers(&self) -> crate::Result<Vec<SegmentReader>> {
// Prevents segment files from getting deleted while we are in the process of opening them
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = self.index.searchable_segments()?;
let segment_readers = searchable_segments
.iter()
.map(SegmentReader::open)
.collect::<crate::Result<_>>()?;
Ok(segment_readers)
}
fn create_new_searcher_generation(
&self,
segment_readers: &[SegmentReader],
) -> TrackedObject<SearcherGeneration> {
let generation_id = self
.searcher_generation_counter
.fetch_add(1, atomic::Ordering::Relaxed);
let searcher_generation =
SearcherGeneration::from_segment_readers(segment_readers, generation_id);
self.searcher_generation_inventory
.track(searcher_generation)
}
fn reload(&self) -> crate::Result<()> {
let segment_readers: Vec<SegmentReader> = {
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = self.searchable_segments()?;
searchable_segments
.iter()
.map(SegmentReader::open)
.collect::<crate::Result<_>>()?
};
let segment_readers = self.open_segment_readers()?;
let searcher_generation = self.create_new_searcher_generation(&segment_readers);
let schema = self.index.schema();
let searchers: Vec<Searcher> = std::iter::repeat_with(|| {
Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())
Searcher::new(
schema.clone(),
self.index.clone(),
segment_readers.clone(),
searcher_generation.clone(),
)
})
.take(self.num_searchers)
.collect::<io::Result<_>>()?;
self.warming_state
.warm_new_searcher_generation(&searchers[0])?;
self.searcher_pool.publish_new_generation(searchers);
Ok(())
}
/// Returns the list of segments that are searchable
fn searchable_segments(&self) -> crate::Result<Vec<Segment>> {
self.index.searchable_segments()
}
fn searcher(&self) -> LeasedItem<Searcher> {
self.searcher_pool.acquire()
}

343
src/reader/warming.rs Normal file
View File

@@ -0,0 +1,343 @@
use std::{
collections::HashSet,
sync::{Arc, Mutex, Weak},
thread::JoinHandle,
time::Duration,
};
use crate::{Executor, Searcher, SearcherGeneration, TantivyError};
use crate::{Inventory, TrackedObject};
pub const GC_INTERVAL: Duration = Duration::from_secs(1);
/// `Warmer` can be used to maintain segment-level state e.g. caches.
///
/// They must be registered with the [IndexReaderBuilder].
pub trait Warmer: Sync + Send {
/// Perform any warming work using the provided [Searcher].
fn warm(&self, searcher: &Searcher) -> crate::Result<()>;
/// Discards internal state for any [SearcherGeneration] not provided.
fn garbage_collect(&self, live_generations: &[TrackedObject<SearcherGeneration>]);
}
/// Warming-related state with interior mutability.
#[derive(Clone)]
pub(crate) struct WarmingState(Arc<Mutex<WarmingStateInner>>);
impl WarmingState {
pub fn new(
num_warming_threads: usize,
warmers: Vec<Weak<dyn Warmer>>,
searcher_generation_inventory: Inventory<SearcherGeneration>,
) -> crate::Result<Self> {
Ok(Self(Arc::new(Mutex::new(WarmingStateInner {
num_warming_threads,
warmers,
gc_thread: None,
warmed_generation_ids: Default::default(),
searcher_generation_inventory,
}))))
}
/// Start tracking a new generation of [Searcher], and [Warmer::warm] it if there are active warmers.
///
/// A background GC thread for [Warmer::garbage_collect] calls is uniquely created if there are active warmers.
pub fn warm_new_searcher_generation(&self, searcher: &Searcher) -> crate::Result<()> {
self.0
.lock()
.unwrap()
.warm_new_searcher_generation(searcher, &self.0)
}
#[cfg(test)]
fn gc_maybe(&self) -> bool {
self.0.lock().unwrap().gc_maybe()
}
}
struct WarmingStateInner {
num_warming_threads: usize,
warmers: Vec<Weak<dyn Warmer>>,
gc_thread: Option<JoinHandle<()>>,
// Contains all generations that have been warmed up.
// This list is used to avoid triggers the individual Warmer GCs
// if no warmed generation needs to be collected.
warmed_generation_ids: HashSet<u64>,
searcher_generation_inventory: Inventory<SearcherGeneration>,
}
impl WarmingStateInner {
/// Start tracking provided searcher as an exemplar of a new generation.
/// If there are active warmers, warm them with the provided searcher, and kick background GC thread if it has not yet been kicked.
/// Otherwise, prune state for dropped searcher generations inline.
fn warm_new_searcher_generation(
&mut self,
searcher: &Searcher,
this: &Arc<Mutex<Self>>,
) -> crate::Result<()> {
let warmers = self.pruned_warmers();
// Avoid threads (warming as well as background GC) if there are no warmers
if warmers.is_empty() {
return Ok(());
}
self.start_gc_thread_maybe(this)?;
self.warmed_generation_ids
.insert(searcher.generation().generation_id());
warming_executor(self.num_warming_threads.min(warmers.len()))?
.map(|warmer| warmer.warm(searcher), warmers.into_iter())?;
Ok(())
}
/// Attempt to upgrade the weak Warmer references, pruning those which cannot be upgraded.
/// Return the strong references.
fn pruned_warmers(&mut self) -> Vec<Arc<dyn Warmer>> {
let strong_warmers = self
.warmers
.iter()
.flat_map(|weak_warmer| weak_warmer.upgrade())
.collect::<Vec<_>>();
self.warmers = strong_warmers.iter().map(Arc::downgrade).collect();
strong_warmers
}
/// [Warmer::garbage_collect] active warmers if some searcher generation is observed to have been dropped.
fn gc_maybe(&mut self) -> bool {
let live_generations = self.searcher_generation_inventory.list();
let live_generation_ids: HashSet<u64> = live_generations
.iter()
.map(|searcher_generation| searcher_generation.generation_id())
.collect();
let gc_not_required = self
.warmed_generation_ids
.iter()
.all(|warmed_up_generation| live_generation_ids.contains(warmed_up_generation));
if gc_not_required {
return false;
}
for warmer in self.pruned_warmers() {
warmer.garbage_collect(&live_generations);
}
self.warmed_generation_ids = live_generation_ids;
true
}
/// Start GC thread if one has not already been started.
fn start_gc_thread_maybe(&mut self, this: &Arc<Mutex<Self>>) -> crate::Result<bool> {
if self.gc_thread.is_some() {
return Ok(false);
}
let weak_inner = Arc::downgrade(this);
let handle = std::thread::Builder::new()
.name("tantivy-warm-gc".to_owned())
.spawn(|| Self::gc_loop(weak_inner))
.map_err(|_| {
TantivyError::SystemError("Failed to spawn warmer GC thread".to_owned())
})?;
self.gc_thread = Some(handle);
Ok(true)
}
/// Every [GC_INTERVAL] attempt to GC, with panics caught and logged using [std::panic::catch_unwind].
fn gc_loop(inner: Weak<Mutex<WarmingStateInner>>) {
for _ in crossbeam::channel::tick(GC_INTERVAL) {
if let Some(inner) = inner.upgrade() {
// rely on deterministic gc in tests
#[cfg(not(test))]
if let Err(err) = std::panic::catch_unwind(|| inner.lock().unwrap().gc_maybe()) {
error!("Panic in Warmer GC {:?}", err);
}
// avoid unused var warning in tests
#[cfg(test)]
drop(inner);
}
}
}
}
fn warming_executor(num_threads: usize) -> crate::Result<Executor> {
if num_threads <= 1 {
Ok(Executor::single_thread())
} else {
Executor::multi_thread(num_threads, "tantivy-warm-")
}
}
#[cfg(test)]
mod tests {
use std::{
collections::HashSet,
sync::{
atomic::{self, AtomicUsize},
Arc, RwLock, Weak,
},
};
use crate::TrackedObject;
use crate::{
core::searcher::SearcherGeneration,
directory::RamDirectory,
schema::{Schema, INDEXED},
Index, IndexSettings, ReloadPolicy, Searcher, SegmentId,
};
use super::Warmer;
#[derive(Default)]
struct TestWarmer {
active_segment_ids: RwLock<HashSet<SegmentId>>,
warm_calls: AtomicUsize,
gc_calls: AtomicUsize,
}
impl TestWarmer {
fn live_segment_ids(&self) -> HashSet<SegmentId> {
self.active_segment_ids.read().unwrap().clone()
}
fn warm_calls(&self) -> usize {
self.warm_calls.load(atomic::Ordering::Acquire)
}
fn gc_calls(&self) -> usize {
self.gc_calls.load(atomic::Ordering::Acquire)
}
fn verify(
&self,
expected_warm_calls: usize,
expected_gc_calls: usize,
expected_segment_ids: HashSet<SegmentId>,
) {
assert_eq!(self.warm_calls(), expected_warm_calls);
assert_eq!(self.gc_calls(), expected_gc_calls);
assert_eq!(self.live_segment_ids(), expected_segment_ids);
}
}
impl Warmer for TestWarmer {
fn warm(&self, searcher: &crate::Searcher) -> crate::Result<()> {
self.warm_calls.fetch_add(1, atomic::Ordering::SeqCst);
for reader in searcher.segment_readers() {
self.active_segment_ids
.write()
.unwrap()
.insert(reader.segment_id());
}
Ok(())
}
fn garbage_collect(&self, live_generations: &[TrackedObject<SearcherGeneration>]) {
self.gc_calls
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let active_segment_ids = live_generations
.iter()
.flat_map(|searcher_generation| searcher_generation.segments().keys().copied())
.collect();
*self.active_segment_ids.write().unwrap() = active_segment_ids;
}
}
fn segment_ids(searcher: &Searcher) -> HashSet<SegmentId> {
searcher
.segment_readers()
.iter()
.map(|reader| reader.segment_id())
.collect()
}
fn test_warming(num_warming_threads: usize) -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_u64_field("pk", INDEXED);
let schema = schema_builder.build();
let directory = RamDirectory::create();
let index = Index::create(directory.clone(), schema, IndexSettings::default())?;
let num_writer_threads = 4;
let mut writer = index
.writer_with_num_threads(num_writer_threads, 25_000_000)
.unwrap();
for i in 0u64..1000u64 {
writer.add_document(doc!(field => i))?;
}
writer.commit()?;
let warmer1 = Arc::new(TestWarmer::default());
let warmer2 = Arc::new(TestWarmer::default());
warmer1.verify(0, 0, HashSet::new());
warmer2.verify(0, 0, HashSet::new());
let num_searchers = 4;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.num_warming_threads(num_warming_threads)
.num_searchers(num_searchers)
.warmers(vec![
Arc::downgrade(&warmer1) as Weak<dyn Warmer>,
Arc::downgrade(&warmer2) as Weak<dyn Warmer>,
])
.try_into()?;
let warming_state = &reader.inner.warming_state;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), num_writer_threads);
assert!(
!warming_state.gc_maybe(),
"no GC after first searcher generation"
);
warmer1.verify(1, 0, segment_ids(&searcher));
warmer2.verify(1, 0, segment_ids(&searcher));
assert_eq!(searcher.num_docs(), 1000);
for i in 1000u64..2000u64 {
writer.add_document(doc!(field => i))?;
}
writer.commit()?;
writer.wait_merging_threads()?;
drop(warmer1);
let old_searcher = searcher;
reader.reload()?;
assert!(!warming_state.gc_maybe(), "old searcher still around");
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2000);
warmer2.verify(
2,
0,
segment_ids(&old_searcher)
.union(&segment_ids(&searcher))
.copied()
.collect(),
);
drop(old_searcher);
for _ in 0..num_searchers {
// make sure the old searcher is dropped by the pool too
let _ = reader.searcher();
}
assert!(warming_state.gc_maybe(), "old searcher dropped");
warmer2.verify(2, 1, segment_ids(&searcher));
Ok(())
}
#[test]
fn warming_single_thread() -> crate::Result<()> {
test_warming(1)
}
#[test]
fn warming_four_threads() -> crate::Result<()> {
test_warming(4)
}
}