mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 13:02:55 +00:00
Compare commits
5 Commits
test_parse
...
block-cach
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1177fe22f | ||
|
|
0361a1edaa | ||
|
|
3d48ce80c5 | ||
|
|
49a913f6f8 | ||
|
|
19a773da47 |
@@ -1,4 +1,6 @@
|
||||
use std::collections::BTreeMap;
|
||||
#[cfg(feature = "quickwit")]
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
@@ -112,6 +114,108 @@ impl Searcher {
|
||||
store_reader.get_async(doc_address.doc_id).await
|
||||
}
|
||||
|
||||
/// Fetches multiple documents in an asynchronous manner.
|
||||
///
|
||||
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times,
|
||||
/// as it groups overlapping requests to segments and blocks and avoids concurrent requests
|
||||
/// trashing the caches of each other. However, it does so using intermediate data structures
|
||||
/// and independent block caches so it will be slower if documents from very few blocks are
|
||||
/// fetched which would have fit into the global block cache.
|
||||
///
|
||||
/// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`)
|
||||
/// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether
|
||||
/// it is predominately I/O-bound or rather CPU-bound.
|
||||
///
|
||||
/// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled
|
||||
/// into the global block cache and hence not be available for subsequent calls.
|
||||
///
|
||||
/// Note that there is no synchronous variant of this method as the same degree of efficiency
|
||||
/// can be had by accessing documents in address order.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use futures::executor::block_on;
|
||||
/// # use futures::stream::{FuturesUnordered, StreamExt};
|
||||
/// #
|
||||
/// # use tantivy::schema::Schema;
|
||||
/// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError};
|
||||
/// #
|
||||
/// # let index = Index::create_in_ram(Schema::builder().build());
|
||||
/// # let searcher = index.reader()?.searcher();
|
||||
/// #
|
||||
/// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0));
|
||||
/// #
|
||||
/// let mut groups: FuturesUnordered<_> = searcher
|
||||
/// .docs_async::<TantivyDocument>(doc_addresses)?
|
||||
/// .collect();
|
||||
///
|
||||
/// let mut docs = Vec::new();
|
||||
///
|
||||
/// block_on(async {
|
||||
/// while let Some(group) = groups.next().await {
|
||||
/// docs.extend(group?);
|
||||
/// }
|
||||
///
|
||||
/// Ok::<_, TantivyError>(())
|
||||
/// })?;
|
||||
/// #
|
||||
/// # Ok::<_, TantivyError>(())
|
||||
/// ```
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub fn docs_async<D: DocumentDeserialize>(
|
||||
&self,
|
||||
doc_addresses: impl IntoIterator<Item = DocAddress>,
|
||||
) -> crate::Result<
|
||||
impl Iterator<Item = impl Future<Output = crate::Result<Vec<(DocAddress, D)>>>> + '_,
|
||||
> {
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
use crate::store::CacheKey;
|
||||
use crate::{DocId, SegmentOrdinal};
|
||||
|
||||
let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec<DocId>> = Default::default();
|
||||
|
||||
for doc_address in doc_addresses {
|
||||
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
||||
let cache_key = store_reader.cache_key(doc_address.doc_id)?;
|
||||
|
||||
groups
|
||||
.entry((doc_address.segment_ord, cache_key))
|
||||
.or_default()
|
||||
.push(doc_address.doc_id);
|
||||
}
|
||||
|
||||
let futures = groups
|
||||
.into_iter()
|
||||
.map(|((segment_ord, cache_key), doc_ids)| {
|
||||
// Each group fetches documents from exactly one block and
|
||||
// therefore gets an independent block cache of size one.
|
||||
let store_reader =
|
||||
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);
|
||||
|
||||
async move {
|
||||
let mut docs = Vec::new();
|
||||
|
||||
for doc_id in doc_ids {
|
||||
let doc = store_reader.get_async(doc_id).await?;
|
||||
|
||||
docs.push((
|
||||
DocAddress {
|
||||
segment_ord,
|
||||
doc_id,
|
||||
},
|
||||
doc,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(docs)
|
||||
}
|
||||
});
|
||||
|
||||
Ok(futures)
|
||||
}
|
||||
|
||||
/// Access the schema associated with the index of this searcher.
|
||||
pub fn schema(&self) -> &Schema {
|
||||
&self.inner.schema
|
||||
|
||||
@@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() {
|
||||
json_term_writer.set_fast_value(75u64);
|
||||
let postings = inv_idx
|
||||
.read_postings(
|
||||
&json_term_writer.term(),
|
||||
json_term_writer.term(),
|
||||
IndexRecordOption::WithFreqsAndPositions,
|
||||
)
|
||||
.unwrap()
|
||||
@@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() {
|
||||
json_term_writer.set_fast_value(75u64);
|
||||
let mut postings = inv_idx
|
||||
.read_postings(
|
||||
&json_term_writer.term(),
|
||||
json_term_writer.term(),
|
||||
IndexRecordOption::WithFreqsAndPositions,
|
||||
)
|
||||
.unwrap()
|
||||
@@ -474,3 +474,60 @@ fn test_non_text_json_term_freq_bitpacked() {
|
||||
assert_eq!(postings.term_freq(), 1u32);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "quickwit")]
|
||||
#[test]
|
||||
fn test_get_many_docs() -> crate::Result<()> {
|
||||
use futures::executor::block_on;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
|
||||
use crate::schema::{OwnedValue, STORED};
|
||||
use crate::{DocAddress, TantivyError};
|
||||
|
||||
let mut schema_builder = Schema::builder();
|
||||
let num_field = schema_builder.add_u64_field("num", STORED);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer: IndexWriter = index.writer_for_tests()?;
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
for i in 0..10u64 {
|
||||
let doc = doc!(num_field=>i);
|
||||
index_writer.add_document(doc)?;
|
||||
}
|
||||
|
||||
index_writer.commit()?;
|
||||
let segment_ids = index.searchable_segment_ids()?;
|
||||
index_writer.merge(&segment_ids).wait().unwrap();
|
||||
|
||||
let searcher = index.reader()?.searcher();
|
||||
assert_eq!(searcher.num_docs(), 10);
|
||||
|
||||
let doc_addresses = (0..10).map(|i| DocAddress::new(0, i));
|
||||
|
||||
let mut groups: FuturesUnordered<_> = searcher
|
||||
.docs_async::<TantivyDocument>(doc_addresses)?
|
||||
.collect();
|
||||
|
||||
let mut doc_nums = Vec::new();
|
||||
|
||||
block_on(async {
|
||||
while let Some(group) = groups.next().await {
|
||||
for (_doc_address, doc) in group? {
|
||||
let num_value = doc.get_first(num_field).unwrap();
|
||||
|
||||
if let OwnedValue::U64(num) = num_value {
|
||||
doc_nums.push(*num);
|
||||
} else {
|
||||
panic!("Expected u64 value");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, TantivyError>(())
|
||||
})?;
|
||||
|
||||
doc_nums.sort();
|
||||
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -37,6 +37,8 @@ mod reader;
|
||||
mod writer;
|
||||
pub use self::compressors::{Compressor, ZstdCompressor};
|
||||
pub use self::decompressors::Decompressor;
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub(crate) use self::reader::CacheKey;
|
||||
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
|
||||
pub use self::reader::{CacheStats, StoreReader};
|
||||
pub use self::writer::StoreWriter;
|
||||
|
||||
@@ -40,6 +40,15 @@ struct BlockCache {
|
||||
}
|
||||
|
||||
impl BlockCache {
|
||||
fn new(cache_num_blocks: usize) -> Self {
|
||||
Self {
|
||||
cache: NonZeroUsize::new(cache_num_blocks)
|
||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_from_cache(&self, pos: usize) -> Option<Block> {
|
||||
if let Some(block) = self
|
||||
.cache
|
||||
@@ -81,6 +90,10 @@ impl BlockCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Opaque cache key which indicates which documents are cached together.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub(crate) struct CacheKey(usize);
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// CacheStats for the `StoreReader`.
|
||||
pub struct CacheStats {
|
||||
@@ -128,17 +141,35 @@ impl StoreReader {
|
||||
Ok(StoreReader {
|
||||
decompressor: footer.decompressor,
|
||||
data: data_file,
|
||||
cache: BlockCache {
|
||||
cache: NonZeroUsize::new(cache_num_blocks)
|
||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
},
|
||||
cache: BlockCache::new(cache_num_blocks),
|
||||
skip_index: Arc::new(skip_index),
|
||||
space_usage,
|
||||
})
|
||||
}
|
||||
|
||||
/// Clones the given store reader with an independent block cache of the given size.
|
||||
///
|
||||
/// `cache_keys` is used to seed the forked cache from the current cache
|
||||
/// if some blocks are already available.
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
|
||||
let forked = Self {
|
||||
decompressor: self.decompressor,
|
||||
data: self.data.clone(),
|
||||
cache: BlockCache::new(cache_num_blocks),
|
||||
skip_index: Arc::clone(&self.skip_index),
|
||||
space_usage: self.space_usage.clone(),
|
||||
};
|
||||
|
||||
for &CacheKey(pos) in cache_keys {
|
||||
if let Some(block) = self.cache.get_from_cache(pos) {
|
||||
forked.cache.put_into_cache(pos, block);
|
||||
}
|
||||
}
|
||||
|
||||
forked
|
||||
}
|
||||
|
||||
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
|
||||
self.skip_index.checkpoints()
|
||||
}
|
||||
@@ -152,6 +183,21 @@ impl StoreReader {
|
||||
self.cache.stats()
|
||||
}
|
||||
|
||||
/// Returns the cache key for a given document
|
||||
///
|
||||
/// These keys are opaque and are not used with the public API,
|
||||
/// but having the same cache key means that the documents
|
||||
/// will only require one I/O and decompression operation
|
||||
/// when retrieve from the same store reader consecutively.
|
||||
///
|
||||
/// Note that looking up the cache key of a document
|
||||
/// will not yet pull anything into the block cache.
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result<CacheKey> {
|
||||
let checkpoint = self.block_checkpoint(doc_id)?;
|
||||
Ok(CacheKey(checkpoint.byte_range.start))
|
||||
}
|
||||
|
||||
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
|
||||
/// document.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user