From 99d4b1a177dec06a31d6755eb37485b841efc508 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 20 Jan 2022 09:40:25 -0500 Subject: [PATCH] Searcher Warming API (#1261) Adds an API to register Warmers in the IndexReader. Co-authored-by: Paul Masurel --- CHANGELOG.md | 5 +- examples/warmer.rs | 223 +++++++++++++++++++++ src/core/index.rs | 2 +- src/core/index_meta.rs | 2 +- src/core/mod.rs | 2 +- src/core/searcher.rs | 62 +++++- src/core/segment_reader.rs | 9 + src/indexer/merge_operation.rs | 2 +- src/lib.rs | 7 +- src/reader/mod.rs | 113 ++++++++--- src/reader/warming.rs | 343 +++++++++++++++++++++++++++++++++ 11 files changed, 733 insertions(+), 37 deletions(-) create mode 100644 examples/warmer.rs create mode 100644 src/reader/warming.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index db85b14e1..472f9672b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,12 @@ Tantivy 0.17 ================================ +- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar @fulmicoton) [#115](https://github.com/quickwit-inc/tantivy/issues/115) +- Adds a searcher Warmer API (@shikhar @fulmicoton) - Change to non-strict schema. Ignore fields in data which are not defined in schema. Previously this returned an error. #1211 - Facets are necessarily indexed. Existing index with indexed facets should work out of the box. Index without facets that are marked with index: false should be broken (but they were already broken in a sense). (@fulmicoton) #1195 . - Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224) -- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225) - Schema now offers not indexing fieldnorms (@lpouget) [#922](https://github.com/quickwit-inc/tantivy/issues/922) -- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar) [#115](https://github.com/quickwit-inc/tantivy/issues/115) +- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225) Tantivy 0.16.2 ================================ diff --git a/examples/warmer.rs b/examples/warmer.rs new file mode 100644 index 000000000..158861e49 --- /dev/null +++ b/examples/warmer.rs @@ -0,0 +1,223 @@ +use std::cmp::Reverse; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, RwLock, Weak}; + +use tantivy::collector::TopDocs; +use tantivy::fastfield::FastFieldReader; +use tantivy::query::QueryParser; +use tantivy::schema::{Field, Schema, FAST, TEXT}; +use tantivy::{doc, DocAddress, DocId, Index, IndexReader, SegmentReader, TrackedObject}; +use tantivy::{Opstamp, Searcher, SearcherGeneration, SegmentId, Warmer}; + +// This example shows how warmers can be used to +// load a values from an external sources using the Warmer API. +// +// In this example, we assume an e-commerce search engine. + +type ProductId = u64; + +/// Price +type Price = u32; + +pub trait PriceFetcher: Send + Sync + 'static { + fn fetch_prices(&self, product_ids: &[ProductId]) -> Vec; +} + +struct DynamicPriceColumn { + field: Field, + price_cache: RwLock), Arc>>>, + price_fetcher: Box, +} + +impl DynamicPriceColumn { + pub fn with_product_id_field(field: Field, price_fetcher: T) -> Self { + DynamicPriceColumn { + field, + price_cache: Default::default(), + price_fetcher: Box::new(price_fetcher), + } + } + + pub fn price_for_segment(&self, segment_reader: &SegmentReader) -> Option>> { + let segment_key = (segment_reader.segment_id(), segment_reader.delete_opstamp()); + self.price_cache.read().unwrap().get(&segment_key).cloned() + } +} +impl Warmer for DynamicPriceColumn { + fn warm(&self, searcher: &Searcher) -> tantivy::Result<()> { + for segment in searcher.segment_readers() { + let key = (segment.segment_id(), segment.delete_opstamp()); + let product_id_reader = segment.fast_fields().u64(self.field)?; + let product_ids: Vec = segment + .doc_ids_alive() + .map(|doc| product_id_reader.get(doc)) + .collect(); + let mut prices_it = self.price_fetcher.fetch_prices(&product_ids).into_iter(); + let mut price_vals: Vec = Vec::new(); + for doc in 0..segment.max_doc() { + if segment.is_deleted(doc) { + price_vals.push(0); + } else { + price_vals.push(prices_it.next().unwrap()) + } + } + self.price_cache + .write() + .unwrap() + .insert(key, Arc::new(price_vals)); + } + Ok(()) + } + + fn garbage_collect(&self, live_generations: &[TrackedObject]) { + let live_segment_id_and_delete_ops: HashSet<(SegmentId, Option)> = + live_generations + .iter() + .flat_map(|gen| gen.segments()) + .map(|(&segment_id, &opstamp)| (segment_id, opstamp)) + .collect(); + let mut price_cache_wrt = self.price_cache.write().unwrap(); + // let price_cache = std::mem::take(&mut *price_cache_wrt); + // Drain would be nicer here. + *price_cache_wrt = std::mem::take(&mut *price_cache_wrt) + .into_iter() + .filter(|(seg_id_and_op, _)| !live_segment_id_and_delete_ops.contains(seg_id_and_op)) + .collect(); + } +} + +/// For the sake of this example, the table is just an editable HashMap behind a RwLock. +/// This map represents a map (ProductId -> Price) +/// +/// In practise, it could be fetching things from an external service, like a SQL table. +/// +#[derive(Default, Clone)] +pub struct ExternalPriceTable { + prices: Arc>>, +} + +impl ExternalPriceTable { + pub fn update_price(&self, product_id: ProductId, price: Price) { + let mut prices_wrt = self.prices.write().unwrap(); + prices_wrt.insert(product_id, price); + } +} + +impl PriceFetcher for ExternalPriceTable { + fn fetch_prices(&self, product_ids: &[ProductId]) -> Vec { + let prices_read = self.prices.read().unwrap(); + product_ids + .iter() + .map(|product_id| prices_read.get(product_id).cloned().unwrap_or(0)) + .collect() + } +} + +fn main() -> tantivy::Result<()> { + // Declaring our schema. + let mut schema_builder = Schema::builder(); + // The product id is assumed to be a primary id for our external price source. + let product_id = schema_builder.add_u64_field("product_id", FAST); + let text = schema_builder.add_text_field("text", TEXT); + let schema: Schema = schema_builder.build(); + + let price_table = ExternalPriceTable::default(); + let price_dynamic_column = Arc::new(DynamicPriceColumn::with_product_id_field( + product_id, + price_table.clone(), + )); + price_table.update_price(OLIVE_OIL, 12); + price_table.update_price(GLOVES, 13); + price_table.update_price(SNEAKERS, 80); + + const OLIVE_OIL: ProductId = 323423; + const GLOVES: ProductId = 3966623; + const SNEAKERS: ProductId = 23222; + + let index = Index::create_in_ram(schema); + let mut writer = index.writer_with_num_threads(1, 10_000_000)?; + writer.add_document(doc!(product_id=>OLIVE_OIL, text=>"cooking olive oil from greece"))?; + writer.add_document(doc!(product_id=>GLOVES, text=>"kitchen gloves, perfect for cooking"))?; + writer.add_document(doc!(product_id=>SNEAKERS, text=>"uber sweet sneakers"))?; + writer.commit()?; + + let warmers: Vec> = vec![Arc::downgrade( + &(price_dynamic_column.clone() as Arc), + )]; + let reader: IndexReader = index + .reader_builder() + .warmers(warmers) + .num_searchers(1) + .try_into()?; + reader.reload()?; + + let query_parser = QueryParser::for_index(&index, vec![text]); + let query = query_parser.parse_query("cooking")?; + + let searcher = reader.searcher(); + let score_by_price = move |segment_reader: &SegmentReader| { + let price = price_dynamic_column + .price_for_segment(segment_reader) + .unwrap(); + move |doc_id: DocId| Reverse(price[doc_id as usize]) + }; + + let most_expensive_first = TopDocs::with_limit(10).custom_score(score_by_price); + + let hits = searcher.search(&query, &most_expensive_first)?; + assert_eq!( + &hits, + &[ + ( + Reverse(12u32), + DocAddress { + segment_ord: 0, + doc_id: 0u32 + } + ), + ( + Reverse(13u32), + DocAddress { + segment_ord: 0, + doc_id: 1u32 + } + ), + ] + ); + + // Olive oil just got more expensive! + price_table.update_price(OLIVE_OIL, 15); + + // The price update are directly reflected on `reload`. + // + // Be careful here though!... + // You may have spotted that we are still using the same `Searcher`. + // + // It is up to the `Warmer` implementer to decide how + // to control this behavior. + + reader.reload()?; + + let hits_with_new_prices = searcher.search(&query, &most_expensive_first)?; + assert_eq!( + &hits_with_new_prices, + &[ + ( + Reverse(13u32), + DocAddress { + segment_ord: 0, + doc_id: 1u32 + } + ), + ( + Reverse(15u32), + DocAddress { + segment_ord: 0, + doc_id: 0u32 + } + ), + ] + ); + + Ok(()) +} diff --git a/src/core/index.rs b/src/core/index.rs index 06873047b..fd88fa708 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -217,7 +217,7 @@ impl Index { /// Replace the default single thread search executor pool /// by a thread pool with a given number of threads. pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> { - self.executor = Arc::new(Executor::multi_thread(num_threads, "thrd-tantivy-search-")?); + self.executor = Arc::new(Executor::multi_thread(num_threads, "tantivy-search-")?); Ok(()) } diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 6ae8b6702..796db6078 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -2,7 +2,7 @@ use super::SegmentComponent; use crate::schema::Schema; use crate::Opstamp; use crate::{core::SegmentId, store::Compressor}; -use census::{Inventory, TrackedObject}; +use crate::{Inventory, TrackedObject}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::{collections::HashSet, sync::atomic::AtomicBool}; diff --git a/src/core/mod.rs b/src/core/mod.rs index 77587ab59..dc73d2cba 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -14,7 +14,7 @@ pub use self::index_meta::{ IndexMeta, IndexSettings, IndexSortByField, Order, SegmentMeta, SegmentMetaInventory, }; pub use self::inverted_index_reader::InvertedIndexReader; -pub use self::searcher::Searcher; +pub use self::searcher::{Searcher, SearcherGeneration}; pub use self::segment::Segment; pub use self::segment_component::SegmentComponent; pub use self::segment_id::SegmentId; diff --git a/src/core/searcher.rs b/src/core/searcher.rs index f4ca148b9..55028d8c8 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -1,6 +1,5 @@ use crate::collector::Collector; use crate::core::Executor; - use crate::core::SegmentReader; use crate::query::Query; use crate::schema::Document; @@ -10,9 +9,62 @@ use crate::space_usage::SearcherSpaceUsage; use crate::store::StoreReader; use crate::DocAddress; use crate::Index; +use crate::Opstamp; +use crate::SegmentId; +use crate::TrackedObject; +use std::collections::BTreeMap; use std::{fmt, io}; +/// Identifies the searcher generation accessed by a [Searcher]. +/// +/// While this might seem redundant, a [SearcherGeneration] contains +/// both a `generation_id` AND a list of `(SegmentId, DeleteOpstamp)`. +/// +/// This is on purpose. This object is used by the `Warmer` API. +/// Having both information makes it possible to identify which +/// artifact should be refreshed or garbage collected. +/// +/// Depending on the use case, `Warmer`'s implementers can decide to +/// produce artifacts per: +/// - `generation_id` (e.g. some searcher level aggregates) +/// - `(segment_id, delete_opstamp)` (e.g. segment level aggregates) +/// - `segment_id` (e.g. for immutable document level information) +/// - `(generation_id, segment_id)` (e.g. for consistent dynamic column) +/// - ... +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SearcherGeneration { + segments: BTreeMap>, + generation_id: u64, +} + +impl SearcherGeneration { + pub(crate) fn from_segment_readers( + segment_readers: &[SegmentReader], + generation_id: u64, + ) -> Self { + let mut segment_id_to_del_opstamp = BTreeMap::new(); + for segment_reader in segment_readers { + segment_id_to_del_opstamp + .insert(segment_reader.segment_id(), segment_reader.delete_opstamp()); + } + Self { + segments: segment_id_to_del_opstamp, + generation_id, + } + } + + /// Returns the searcher generation id. + pub fn generation_id(&self) -> u64 { + self.generation_id + } + + /// Return a `(SegmentId -> DeleteOpstamp)` mapping. + pub fn segments(&self) -> &BTreeMap> { + &self.segments + } +} + /// Holds a list of `SegmentReader`s ready for search. /// /// It guarantees that the `Segment` will not be removed before @@ -23,6 +75,7 @@ pub struct Searcher { index: Index, segment_readers: Vec, store_readers: Vec, + generation: TrackedObject, } impl Searcher { @@ -31,6 +84,7 @@ impl Searcher { schema: Schema, index: Index, segment_readers: Vec, + generation: TrackedObject, ) -> io::Result { let store_readers: Vec = segment_readers .iter() @@ -41,6 +95,7 @@ impl Searcher { index, segment_readers, store_readers, + generation, }) } @@ -49,6 +104,11 @@ impl Searcher { &self.index } + /// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`. + pub fn generation(&self) -> &SearcherGeneration { + self.generation.as_ref() + } + /// Fetches a document from tantivy's store given a `DocAddress`. /// /// The searcher uses the segment ordinal to route the diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index b8e828c9a..547232058 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -17,6 +17,7 @@ use crate::space_usage::SegmentSpaceUsage; use crate::store::StoreReader; use crate::termdict::TermDictionary; use crate::DocId; +use crate::Opstamp; use fail::fail_point; use std::fmt; use std::sync::Arc; @@ -38,6 +39,8 @@ pub struct SegmentReader { inv_idx_reader_cache: Arc>>>, segment_id: SegmentId, + delete_opstamp: Option, + max_doc: DocId, num_docs: DocId, @@ -205,6 +208,7 @@ impl SegmentReader { fast_fields_readers: fast_field_readers, fieldnorm_readers, segment_id: segment.id(), + delete_opstamp: segment.meta().delete_opstamp(), store_file, alive_bitset_opt, positions_composite, @@ -290,6 +294,11 @@ impl SegmentReader { self.segment_id } + /// Returns the delete opstamp + pub fn delete_opstamp(&self) -> Option { + self.delete_opstamp + } + /// Returns the bitset representing /// the documents that have been deleted. pub fn alive_bitset(&self) -> Option<&AliveBitSet> { diff --git a/src/indexer/merge_operation.rs b/src/indexer/merge_operation.rs index f7dc6bd0e..28a0e53ee 100644 --- a/src/indexer/merge_operation.rs +++ b/src/indexer/merge_operation.rs @@ -1,6 +1,6 @@ use crate::Opstamp; use crate::SegmentId; -use census::{Inventory, TrackedObject}; +use crate::{Inventory, TrackedObject}; use std::collections::HashSet; use std::ops::Deref; diff --git a/src/lib.rs b/src/lib.rs index afbb6fe7e..5b2058015 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,7 +158,7 @@ pub mod termdict; mod reader; -pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy}; +pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy, Warmer}; mod snippet; pub use self::snippet::{Snippet, SnippetGenerator}; @@ -166,8 +166,8 @@ mod docset; pub use self::docset::{DocSet, TERMINATED}; pub use crate::core::{Executor, SegmentComponent}; pub use crate::core::{ - Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, Order, Searcher, Segment, - SegmentId, SegmentMeta, + Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, Order, Searcher, + SearcherGeneration, Segment, SegmentId, SegmentMeta, }; pub use crate::core::{InvertedIndexReader, SegmentReader}; pub use crate::directory::Directory; @@ -179,6 +179,7 @@ pub use crate::indexer::{IndexWriter, PreparedCommit}; pub use crate::postings::Postings; pub use crate::reader::LeasedItem; pub use crate::schema::{Document, Term}; +pub use census::{Inventory, TrackedObject}; pub use common::HasLen; pub use common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64}; use std::fmt; diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 7313112ee..df0ee21ef 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,16 +1,23 @@ mod pool; +mod warming; pub use self::pool::LeasedItem; use self::pool::Pool; -use crate::core::Segment; +use self::warming::WarmingState; +use crate::core::searcher::SearcherGeneration; use crate::directory::WatchHandle; use crate::directory::META_LOCK; use crate::directory::{Directory, WatchCallback}; use crate::Index; use crate::Searcher; use crate::SegmentReader; +use crate::{Inventory, TrackedObject}; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::sync::Weak; use std::{convert::TryInto, io}; +pub use warming::Warmer; /// Defines when a new version of the index should be reloaded. /// @@ -29,22 +36,20 @@ pub enum ReloadPolicy { OnCommit, // TODO add NEAR_REAL_TIME(target_ms) } -/// `IndexReader` builder +/// [IndexReader] builder /// -/// It makes it possible to set the following values. -/// -/// - `num_searchers` (by default, the number of detected CPU threads): -/// -/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block -/// until the one of the searcher in-use gets released. -/// - `reload_policy` (by default `ReloadPolicy::OnCommit`): -/// -/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. +/// It makes it possible to configure: +/// - [Searcher] pool size +/// - [ReloadPolicy] defining when new index versions are detected +/// - [Warmer] implementations +/// - number of warming threads, for parallelizing warming work #[derive(Clone)] pub struct IndexReaderBuilder { num_searchers: usize, reload_policy: ReloadPolicy, index: Index, + warmers: Vec>, + num_warming_threads: usize, } impl IndexReaderBuilder { @@ -53,6 +58,8 @@ impl IndexReaderBuilder { num_searchers: num_cpus::get(), reload_policy: ReloadPolicy::OnCommit, index, + warmers: Vec::new(), + num_warming_threads: 1, } } @@ -63,10 +70,19 @@ impl IndexReaderBuilder { /// of time and it may return an error. #[allow(clippy::needless_late_init)] pub fn try_into(self) -> crate::Result { + let searcher_generation_inventory = Inventory::default(); + let warming_state = WarmingState::new( + self.num_warming_threads, + self.warmers, + searcher_generation_inventory.clone(), + )?; let inner_reader = InnerIndexReader { index: self.index, num_searchers: self.num_searchers, searcher_pool: Pool::new(), + warming_state, + searcher_generation_counter: Default::default(), + searcher_generation_inventory, }; inner_reader.reload()?; let inner_reader_arc = Arc::new(inner_reader); @@ -107,11 +123,27 @@ impl IndexReaderBuilder { self } - /// Sets the number of `Searcher` in the searcher pool. + /// Sets the number of [Searcher] to pool. + /// + /// See [Self::searcher()]. pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder { self.num_searchers = num_searchers; self } + + /// Set the [Warmer]s that are invoked when reloading searchable segments. + pub fn warmers(mut self, warmers: Vec>) -> IndexReaderBuilder { + self.warmers = warmers; + self + } + + /// Sets the number of warming threads. + /// + /// This allows parallelizing warming work when there are multiple [Warmer] registered with the [IndexReader]. + pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder { + self.num_warming_threads = num_warming_threads; + self + } } impl TryInto for IndexReaderBuilder { @@ -124,35 +156,62 @@ impl TryInto for IndexReaderBuilder { struct InnerIndexReader { num_searchers: usize, - searcher_pool: Pool, index: Index, + warming_state: WarmingState, + searcher_pool: Pool, + searcher_generation_counter: Arc, + searcher_generation_inventory: Inventory, } impl InnerIndexReader { + /// Opens the freshest segments `SegmentReader`. + /// + /// This function acquires a lot to prevent GC from removing files + /// as we are opening our index. + fn open_segment_readers(&self) -> crate::Result> { + // Prevents segment files from getting deleted while we are in the process of opening them + let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; + let searchable_segments = self.index.searchable_segments()?; + let segment_readers = searchable_segments + .iter() + .map(SegmentReader::open) + .collect::>()?; + Ok(segment_readers) + } + + fn create_new_searcher_generation( + &self, + segment_readers: &[SegmentReader], + ) -> TrackedObject { + let generation_id = self + .searcher_generation_counter + .fetch_add(1, atomic::Ordering::Relaxed); + let searcher_generation = + SearcherGeneration::from_segment_readers(segment_readers, generation_id); + self.searcher_generation_inventory + .track(searcher_generation) + } + fn reload(&self) -> crate::Result<()> { - let segment_readers: Vec = { - let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; - let searchable_segments = self.searchable_segments()?; - searchable_segments - .iter() - .map(SegmentReader::open) - .collect::>()? - }; + let segment_readers = self.open_segment_readers()?; + let searcher_generation = self.create_new_searcher_generation(&segment_readers); let schema = self.index.schema(); let searchers: Vec = std::iter::repeat_with(|| { - Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()) + Searcher::new( + schema.clone(), + self.index.clone(), + segment_readers.clone(), + searcher_generation.clone(), + ) }) .take(self.num_searchers) .collect::>()?; + self.warming_state + .warm_new_searcher_generation(&searchers[0])?; self.searcher_pool.publish_new_generation(searchers); Ok(()) } - /// Returns the list of segments that are searchable - fn searchable_segments(&self) -> crate::Result> { - self.index.searchable_segments() - } - fn searcher(&self) -> LeasedItem { self.searcher_pool.acquire() } diff --git a/src/reader/warming.rs b/src/reader/warming.rs new file mode 100644 index 000000000..b247bd8db --- /dev/null +++ b/src/reader/warming.rs @@ -0,0 +1,343 @@ +use std::{ + collections::HashSet, + sync::{Arc, Mutex, Weak}, + thread::JoinHandle, + time::Duration, +}; + +use crate::{Executor, Searcher, SearcherGeneration, TantivyError}; +use crate::{Inventory, TrackedObject}; + +pub const GC_INTERVAL: Duration = Duration::from_secs(1); + +/// `Warmer` can be used to maintain segment-level state e.g. caches. +/// +/// They must be registered with the [IndexReaderBuilder]. +pub trait Warmer: Sync + Send { + /// Perform any warming work using the provided [Searcher]. + fn warm(&self, searcher: &Searcher) -> crate::Result<()>; + + /// Discards internal state for any [SearcherGeneration] not provided. + fn garbage_collect(&self, live_generations: &[TrackedObject]); +} + +/// Warming-related state with interior mutability. +#[derive(Clone)] +pub(crate) struct WarmingState(Arc>); + +impl WarmingState { + pub fn new( + num_warming_threads: usize, + warmers: Vec>, + searcher_generation_inventory: Inventory, + ) -> crate::Result { + Ok(Self(Arc::new(Mutex::new(WarmingStateInner { + num_warming_threads, + warmers, + gc_thread: None, + warmed_generation_ids: Default::default(), + searcher_generation_inventory, + })))) + } + + /// Start tracking a new generation of [Searcher], and [Warmer::warm] it if there are active warmers. + /// + /// A background GC thread for [Warmer::garbage_collect] calls is uniquely created if there are active warmers. + pub fn warm_new_searcher_generation(&self, searcher: &Searcher) -> crate::Result<()> { + self.0 + .lock() + .unwrap() + .warm_new_searcher_generation(searcher, &self.0) + } + + #[cfg(test)] + fn gc_maybe(&self) -> bool { + self.0.lock().unwrap().gc_maybe() + } +} + +struct WarmingStateInner { + num_warming_threads: usize, + warmers: Vec>, + gc_thread: Option>, + // Contains all generations that have been warmed up. + // This list is used to avoid triggers the individual Warmer GCs + // if no warmed generation needs to be collected. + warmed_generation_ids: HashSet, + searcher_generation_inventory: Inventory, +} + +impl WarmingStateInner { + /// Start tracking provided searcher as an exemplar of a new generation. + /// If there are active warmers, warm them with the provided searcher, and kick background GC thread if it has not yet been kicked. + /// Otherwise, prune state for dropped searcher generations inline. + fn warm_new_searcher_generation( + &mut self, + searcher: &Searcher, + this: &Arc>, + ) -> crate::Result<()> { + let warmers = self.pruned_warmers(); + // Avoid threads (warming as well as background GC) if there are no warmers + if warmers.is_empty() { + return Ok(()); + } + self.start_gc_thread_maybe(this)?; + self.warmed_generation_ids + .insert(searcher.generation().generation_id()); + warming_executor(self.num_warming_threads.min(warmers.len()))? + .map(|warmer| warmer.warm(searcher), warmers.into_iter())?; + Ok(()) + } + + /// Attempt to upgrade the weak Warmer references, pruning those which cannot be upgraded. + /// Return the strong references. + fn pruned_warmers(&mut self) -> Vec> { + let strong_warmers = self + .warmers + .iter() + .flat_map(|weak_warmer| weak_warmer.upgrade()) + .collect::>(); + self.warmers = strong_warmers.iter().map(Arc::downgrade).collect(); + strong_warmers + } + + /// [Warmer::garbage_collect] active warmers if some searcher generation is observed to have been dropped. + fn gc_maybe(&mut self) -> bool { + let live_generations = self.searcher_generation_inventory.list(); + let live_generation_ids: HashSet = live_generations + .iter() + .map(|searcher_generation| searcher_generation.generation_id()) + .collect(); + let gc_not_required = self + .warmed_generation_ids + .iter() + .all(|warmed_up_generation| live_generation_ids.contains(warmed_up_generation)); + if gc_not_required { + return false; + } + for warmer in self.pruned_warmers() { + warmer.garbage_collect(&live_generations); + } + self.warmed_generation_ids = live_generation_ids; + true + } + + /// Start GC thread if one has not already been started. + fn start_gc_thread_maybe(&mut self, this: &Arc>) -> crate::Result { + if self.gc_thread.is_some() { + return Ok(false); + } + let weak_inner = Arc::downgrade(this); + let handle = std::thread::Builder::new() + .name("tantivy-warm-gc".to_owned()) + .spawn(|| Self::gc_loop(weak_inner)) + .map_err(|_| { + TantivyError::SystemError("Failed to spawn warmer GC thread".to_owned()) + })?; + self.gc_thread = Some(handle); + Ok(true) + } + + /// Every [GC_INTERVAL] attempt to GC, with panics caught and logged using [std::panic::catch_unwind]. + fn gc_loop(inner: Weak>) { + for _ in crossbeam::channel::tick(GC_INTERVAL) { + if let Some(inner) = inner.upgrade() { + // rely on deterministic gc in tests + #[cfg(not(test))] + if let Err(err) = std::panic::catch_unwind(|| inner.lock().unwrap().gc_maybe()) { + error!("Panic in Warmer GC {:?}", err); + } + // avoid unused var warning in tests + #[cfg(test)] + drop(inner); + } + } + } +} + +fn warming_executor(num_threads: usize) -> crate::Result { + if num_threads <= 1 { + Ok(Executor::single_thread()) + } else { + Executor::multi_thread(num_threads, "tantivy-warm-") + } +} + +#[cfg(test)] +mod tests { + use std::{ + collections::HashSet, + sync::{ + atomic::{self, AtomicUsize}, + Arc, RwLock, Weak, + }, + }; + + use crate::TrackedObject; + use crate::{ + core::searcher::SearcherGeneration, + directory::RamDirectory, + schema::{Schema, INDEXED}, + Index, IndexSettings, ReloadPolicy, Searcher, SegmentId, + }; + + use super::Warmer; + + #[derive(Default)] + struct TestWarmer { + active_segment_ids: RwLock>, + warm_calls: AtomicUsize, + gc_calls: AtomicUsize, + } + + impl TestWarmer { + fn live_segment_ids(&self) -> HashSet { + self.active_segment_ids.read().unwrap().clone() + } + + fn warm_calls(&self) -> usize { + self.warm_calls.load(atomic::Ordering::Acquire) + } + + fn gc_calls(&self) -> usize { + self.gc_calls.load(atomic::Ordering::Acquire) + } + + fn verify( + &self, + expected_warm_calls: usize, + expected_gc_calls: usize, + expected_segment_ids: HashSet, + ) { + assert_eq!(self.warm_calls(), expected_warm_calls); + assert_eq!(self.gc_calls(), expected_gc_calls); + assert_eq!(self.live_segment_ids(), expected_segment_ids); + } + } + + impl Warmer for TestWarmer { + fn warm(&self, searcher: &crate::Searcher) -> crate::Result<()> { + self.warm_calls.fetch_add(1, atomic::Ordering::SeqCst); + for reader in searcher.segment_readers() { + self.active_segment_ids + .write() + .unwrap() + .insert(reader.segment_id()); + } + Ok(()) + } + + fn garbage_collect(&self, live_generations: &[TrackedObject]) { + self.gc_calls + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let active_segment_ids = live_generations + .iter() + .flat_map(|searcher_generation| searcher_generation.segments().keys().copied()) + .collect(); + *self.active_segment_ids.write().unwrap() = active_segment_ids; + } + } + + fn segment_ids(searcher: &Searcher) -> HashSet { + searcher + .segment_readers() + .iter() + .map(|reader| reader.segment_id()) + .collect() + } + + fn test_warming(num_warming_threads: usize) -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_u64_field("pk", INDEXED); + let schema = schema_builder.build(); + + let directory = RamDirectory::create(); + let index = Index::create(directory.clone(), schema, IndexSettings::default())?; + + let num_writer_threads = 4; + let mut writer = index + .writer_with_num_threads(num_writer_threads, 25_000_000) + .unwrap(); + + for i in 0u64..1000u64 { + writer.add_document(doc!(field => i))?; + } + writer.commit()?; + + let warmer1 = Arc::new(TestWarmer::default()); + let warmer2 = Arc::new(TestWarmer::default()); + warmer1.verify(0, 0, HashSet::new()); + warmer2.verify(0, 0, HashSet::new()); + + let num_searchers = 4; + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .num_warming_threads(num_warming_threads) + .num_searchers(num_searchers) + .warmers(vec![ + Arc::downgrade(&warmer1) as Weak, + Arc::downgrade(&warmer2) as Weak, + ]) + .try_into()?; + + let warming_state = &reader.inner.warming_state; + + let searcher = reader.searcher(); + assert_eq!(searcher.segment_readers().len(), num_writer_threads); + assert!( + !warming_state.gc_maybe(), + "no GC after first searcher generation" + ); + warmer1.verify(1, 0, segment_ids(&searcher)); + warmer2.verify(1, 0, segment_ids(&searcher)); + assert_eq!(searcher.num_docs(), 1000); + + for i in 1000u64..2000u64 { + writer.add_document(doc!(field => i))?; + } + writer.commit()?; + writer.wait_merging_threads()?; + + drop(warmer1); + + let old_searcher = searcher; + + reader.reload()?; + + assert!(!warming_state.gc_maybe(), "old searcher still around"); + + let searcher = reader.searcher(); + assert_eq!(searcher.num_docs(), 2000); + + warmer2.verify( + 2, + 0, + segment_ids(&old_searcher) + .union(&segment_ids(&searcher)) + .copied() + .collect(), + ); + + drop(old_searcher); + for _ in 0..num_searchers { + // make sure the old searcher is dropped by the pool too + let _ = reader.searcher(); + } + assert!(warming_state.gc_maybe(), "old searcher dropped"); + + warmer2.verify(2, 1, segment_ids(&searcher)); + + Ok(()) + } + + #[test] + fn warming_single_thread() -> crate::Result<()> { + test_warming(1) + } + + #[test] + fn warming_four_threads() -> crate::Result<()> { + test_warming(4) + } +}