use std::collections::BTreeMap; use std::sync::Arc; use std::{fmt, io}; use crate::collector::Collector; use crate::core::{Executor, SegmentReader}; use crate::query::Query; use crate::schema::{Document, Schema, Term}; use crate::space_usage::SearcherSpaceUsage; use crate::store::{CacheStats, StoreReader}; use crate::{DocAddress, Index, Opstamp, SegmentId, TrackedObject}; /// Identifies the searcher generation accessed by a [`Searcher`]. /// /// While this might seem redundant, a [`SearcherGeneration`] contains /// both a `generation_id` AND a list of `(SegmentId, DeleteOpstamp)`. /// /// This is on purpose. This object is used by the [`Warmer`](crate::reader::Warmer) API. /// Having both information makes it possible to identify which /// artifact should be refreshed or garbage collected. /// /// Depending on the use case, `Warmer`'s implementers can decide to /// produce artifacts per: /// - `generation_id` (e.g. some searcher level aggregates) /// - `(segment_id, delete_opstamp)` (e.g. segment level aggregates) /// - `segment_id` (e.g. for immutable document level information) /// - `(generation_id, segment_id)` (e.g. for consistent dynamic column) /// - ... #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SearcherGeneration { segments: BTreeMap>, generation_id: u64, } impl SearcherGeneration { pub(crate) fn from_segment_readers( segment_readers: &[SegmentReader], generation_id: u64, ) -> Self { let mut segment_id_to_del_opstamp = BTreeMap::new(); for segment_reader in segment_readers { segment_id_to_del_opstamp .insert(segment_reader.segment_id(), segment_reader.delete_opstamp()); } Self { segments: segment_id_to_del_opstamp, generation_id, } } /// Returns the searcher generation id. pub fn generation_id(&self) -> u64 { self.generation_id } /// Return a `(SegmentId -> DeleteOpstamp)` mapping. pub fn segments(&self) -> &BTreeMap> { &self.segments } } /// Holds a list of `SegmentReader`s ready for search. /// /// It guarantees that the `Segment` will not be removed before /// the destruction of the `Searcher`. #[derive(Clone)] pub struct Searcher { inner: Arc, } impl Searcher { /// Returns the `Index` associated with the `Searcher` pub fn index(&self) -> &Index { &self.inner.index } /// [`SearcherGeneration`] which identifies the version of the snapshot held by this `Searcher`. pub fn generation(&self) -> &SearcherGeneration { self.inner.generation.as_ref() } /// Fetches a document from tantivy's store given a [`DocAddress`]. /// /// The searcher uses the segment ordinal to route the /// request to the right `Segment`. pub fn doc(&self, doc_address: DocAddress) -> crate::Result { let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize]; store_reader.get(doc_address.doc_id) } /// The cache stats for the underlying store reader. /// /// Aggregates the sum for each segment store reader. pub fn doc_store_cache_stats(&self) -> CacheStats { let cache_stats: CacheStats = self .inner .store_readers .iter() .map(|reader| reader.cache_stats()) .sum(); cache_stats } /// Fetches a document in an asynchronous manner. #[cfg(feature = "quickwit")] pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result { let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize]; store_reader.get_async(doc_address.doc_id).await } /// Access the schema associated with the index of this searcher. pub fn schema(&self) -> &Schema { &self.inner.schema } /// Returns the overall number of documents in the index. pub fn num_docs(&self) -> u64 { self.inner .segment_readers .iter() .map(|segment_reader| u64::from(segment_reader.num_docs())) .sum::() } /// Return the overall number of documents containing /// the given term. pub fn doc_freq(&self, term: &Term) -> crate::Result { let mut total_doc_freq = 0; for segment_reader in &self.inner.segment_readers { let inverted_index = segment_reader.inverted_index(term.field())?; let doc_freq = inverted_index.doc_freq(term)?; total_doc_freq += u64::from(doc_freq); } Ok(total_doc_freq) } /// Return the overall number of documents containing /// the given term in an asynchronous manner. #[cfg(feature = "quickwit")] pub async fn doc_freq_async(&self, term: &Term) -> crate::Result { let mut total_doc_freq = 0; for segment_reader in &self.inner.segment_readers { let inverted_index = segment_reader.inverted_index(term.field())?; let doc_freq = inverted_index.doc_freq_async(term).await?; total_doc_freq += u64::from(doc_freq); } Ok(total_doc_freq) } /// Return the list of segment readers pub fn segment_readers(&self) -> &[SegmentReader] { &self.inner.segment_readers } /// Returns the segment_reader associated with the given segment_ord pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader { &self.inner.segment_readers[segment_ord as usize] } /// Runs a query on the segment readers wrapped by the searcher. /// /// Search works as follows : /// /// First the weight object associated with the query is created. /// /// Then, the query loops over the segments and for each segment : /// - setup the collector and informs it that the segment being processed has changed. /// - creates a SegmentCollector for collecting documents associated with the segment /// - creates a `Scorer` object associated for this segment /// - iterate through the matched documents and push them to the segment collector. /// /// Finally, the Collector merges each of the child collectors into itself for result usability /// by the caller. pub fn search( &self, query: &dyn Query, collector: &C, ) -> crate::Result { let executor = self.inner.index.search_executor(); self.search_with_executor(query, collector, executor) } /// Same as [`search(...)`](Searcher::search) but multithreaded. /// /// The current implementation is rather naive : /// multithreading is by splitting search into as many task /// as there are segments. /// /// It is powerless at making search faster if your index consists in /// one large segment. /// /// Also, keep in my multithreading a single query on several /// threads will not improve your throughput. It can actually /// hurt it. It will however, decrease the average response time. pub fn search_with_executor( &self, query: &dyn Query, collector: &C, executor: &Executor, ) -> crate::Result { let scoring_enabled = collector.requires_scoring(); let weight = query.weight(self, scoring_enabled)?; let segment_readers = self.segment_readers(); let fruits = executor.map( |(segment_ord, segment_reader)| { collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader) }, segment_readers.iter().enumerate(), )?; collector.merge_fruits(fruits) } /// Summarize total space usage of this searcher. pub fn space_usage(&self) -> io::Result { let mut space_usage = SearcherSpaceUsage::new(); for segment_reader in self.segment_readers() { space_usage.add_segment(segment_reader.space_usage()?); } Ok(space_usage) } } impl From> for Searcher { fn from(inner: Arc) -> Self { Searcher { inner } } } /// Holds a list of `SegmentReader`s ready for search. /// /// It guarantees that the `Segment` will not be removed before /// the destruction of the `Searcher`. pub(crate) struct SearcherInner { schema: Schema, index: Index, segment_readers: Vec, store_readers: Vec, generation: TrackedObject, } impl SearcherInner { /// Creates a new `Searcher` pub(crate) fn new( schema: Schema, index: Index, segment_readers: Vec, generation: TrackedObject, doc_store_cache_size: usize, ) -> io::Result { assert_eq!( &segment_readers .iter() .map(|reader| (reader.segment_id(), reader.delete_opstamp())) .collect::>(), generation.segments(), "Set of segments referenced by this Searcher and its SearcherGeneration must match" ); let store_readers: Vec = segment_readers .iter() .map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size)) .collect::>>()?; Ok(SearcherInner { schema, index, segment_readers, store_readers, generation, }) } } impl fmt::Debug for Searcher { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let segment_ids = self .segment_readers() .iter() .map(SegmentReader::segment_id) .collect::>(); write!(f, "Searcher({:?})", segment_ids) } }