mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-05 16:52:55 +00:00
Compare commits
5 Commits
missing_te
...
block-cach
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1177fe22f | ||
|
|
0361a1edaa | ||
|
|
3d48ce80c5 | ||
|
|
49a913f6f8 | ||
|
|
19a773da47 |
@@ -1,4 +1,6 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
use std::future::Future;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{fmt, io};
|
use std::{fmt, io};
|
||||||
|
|
||||||
@@ -112,6 +114,108 @@ impl Searcher {
|
|||||||
store_reader.get_async(doc_address.doc_id).await
|
store_reader.get_async(doc_address.doc_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetches multiple documents in an asynchronous manner.
|
||||||
|
///
|
||||||
|
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times,
|
||||||
|
/// as it groups overlapping requests to segments and blocks and avoids concurrent requests
|
||||||
|
/// trashing the caches of each other. However, it does so using intermediate data structures
|
||||||
|
/// and independent block caches so it will be slower if documents from very few blocks are
|
||||||
|
/// fetched which would have fit into the global block cache.
|
||||||
|
///
|
||||||
|
/// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`)
|
||||||
|
/// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether
|
||||||
|
/// it is predominately I/O-bound or rather CPU-bound.
|
||||||
|
///
|
||||||
|
/// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled
|
||||||
|
/// into the global block cache and hence not be available for subsequent calls.
|
||||||
|
///
|
||||||
|
/// Note that there is no synchronous variant of this method as the same degree of efficiency
|
||||||
|
/// can be had by accessing documents in address order.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust,no_run
|
||||||
|
/// # use futures::executor::block_on;
|
||||||
|
/// # use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
|
/// #
|
||||||
|
/// # use tantivy::schema::Schema;
|
||||||
|
/// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError};
|
||||||
|
/// #
|
||||||
|
/// # let index = Index::create_in_ram(Schema::builder().build());
|
||||||
|
/// # let searcher = index.reader()?.searcher();
|
||||||
|
/// #
|
||||||
|
/// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0));
|
||||||
|
/// #
|
||||||
|
/// let mut groups: FuturesUnordered<_> = searcher
|
||||||
|
/// .docs_async::<TantivyDocument>(doc_addresses)?
|
||||||
|
/// .collect();
|
||||||
|
///
|
||||||
|
/// let mut docs = Vec::new();
|
||||||
|
///
|
||||||
|
/// block_on(async {
|
||||||
|
/// while let Some(group) = groups.next().await {
|
||||||
|
/// docs.extend(group?);
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// Ok::<_, TantivyError>(())
|
||||||
|
/// })?;
|
||||||
|
/// #
|
||||||
|
/// # Ok::<_, TantivyError>(())
|
||||||
|
/// ```
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
pub fn docs_async<D: DocumentDeserialize>(
|
||||||
|
&self,
|
||||||
|
doc_addresses: impl IntoIterator<Item = DocAddress>,
|
||||||
|
) -> crate::Result<
|
||||||
|
impl Iterator<Item = impl Future<Output = crate::Result<Vec<(DocAddress, D)>>>> + '_,
|
||||||
|
> {
|
||||||
|
use rustc_hash::FxHashMap;
|
||||||
|
|
||||||
|
use crate::store::CacheKey;
|
||||||
|
use crate::{DocId, SegmentOrdinal};
|
||||||
|
|
||||||
|
let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec<DocId>> = Default::default();
|
||||||
|
|
||||||
|
for doc_address in doc_addresses {
|
||||||
|
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
||||||
|
let cache_key = store_reader.cache_key(doc_address.doc_id)?;
|
||||||
|
|
||||||
|
groups
|
||||||
|
.entry((doc_address.segment_ord, cache_key))
|
||||||
|
.or_default()
|
||||||
|
.push(doc_address.doc_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
let futures = groups
|
||||||
|
.into_iter()
|
||||||
|
.map(|((segment_ord, cache_key), doc_ids)| {
|
||||||
|
// Each group fetches documents from exactly one block and
|
||||||
|
// therefore gets an independent block cache of size one.
|
||||||
|
let store_reader =
|
||||||
|
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let mut docs = Vec::new();
|
||||||
|
|
||||||
|
for doc_id in doc_ids {
|
||||||
|
let doc = store_reader.get_async(doc_id).await?;
|
||||||
|
|
||||||
|
docs.push((
|
||||||
|
DocAddress {
|
||||||
|
segment_ord,
|
||||||
|
doc_id,
|
||||||
|
},
|
||||||
|
doc,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(docs)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(futures)
|
||||||
|
}
|
||||||
|
|
||||||
/// Access the schema associated with the index of this searcher.
|
/// Access the schema associated with the index of this searcher.
|
||||||
pub fn schema(&self) -> &Schema {
|
pub fn schema(&self) -> &Schema {
|
||||||
&self.inner.schema
|
&self.inner.schema
|
||||||
|
|||||||
@@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() {
|
|||||||
json_term_writer.set_fast_value(75u64);
|
json_term_writer.set_fast_value(75u64);
|
||||||
let postings = inv_idx
|
let postings = inv_idx
|
||||||
.read_postings(
|
.read_postings(
|
||||||
&json_term_writer.term(),
|
json_term_writer.term(),
|
||||||
IndexRecordOption::WithFreqsAndPositions,
|
IndexRecordOption::WithFreqsAndPositions,
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() {
|
|||||||
json_term_writer.set_fast_value(75u64);
|
json_term_writer.set_fast_value(75u64);
|
||||||
let mut postings = inv_idx
|
let mut postings = inv_idx
|
||||||
.read_postings(
|
.read_postings(
|
||||||
&json_term_writer.term(),
|
json_term_writer.term(),
|
||||||
IndexRecordOption::WithFreqsAndPositions,
|
IndexRecordOption::WithFreqsAndPositions,
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@@ -474,3 +474,60 @@ fn test_non_text_json_term_freq_bitpacked() {
|
|||||||
assert_eq!(postings.term_freq(), 1u32);
|
assert_eq!(postings.term_freq(), 1u32);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
#[test]
|
||||||
|
fn test_get_many_docs() -> crate::Result<()> {
|
||||||
|
use futures::executor::block_on;
|
||||||
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
|
|
||||||
|
use crate::schema::{OwnedValue, STORED};
|
||||||
|
use crate::{DocAddress, TantivyError};
|
||||||
|
|
||||||
|
let mut schema_builder = Schema::builder();
|
||||||
|
let num_field = schema_builder.add_u64_field("num", STORED);
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
let index = Index::create_in_ram(schema);
|
||||||
|
let mut index_writer: IndexWriter = index.writer_for_tests()?;
|
||||||
|
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||||
|
for i in 0..10u64 {
|
||||||
|
let doc = doc!(num_field=>i);
|
||||||
|
index_writer.add_document(doc)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
index_writer.commit()?;
|
||||||
|
let segment_ids = index.searchable_segment_ids()?;
|
||||||
|
index_writer.merge(&segment_ids).wait().unwrap();
|
||||||
|
|
||||||
|
let searcher = index.reader()?.searcher();
|
||||||
|
assert_eq!(searcher.num_docs(), 10);
|
||||||
|
|
||||||
|
let doc_addresses = (0..10).map(|i| DocAddress::new(0, i));
|
||||||
|
|
||||||
|
let mut groups: FuturesUnordered<_> = searcher
|
||||||
|
.docs_async::<TantivyDocument>(doc_addresses)?
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut doc_nums = Vec::new();
|
||||||
|
|
||||||
|
block_on(async {
|
||||||
|
while let Some(group) = groups.next().await {
|
||||||
|
for (_doc_address, doc) in group? {
|
||||||
|
let num_value = doc.get_first(num_field).unwrap();
|
||||||
|
|
||||||
|
if let OwnedValue::U64(num) = num_value {
|
||||||
|
doc_nums.push(*num);
|
||||||
|
} else {
|
||||||
|
panic!("Expected u64 value");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok::<_, TantivyError>(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
doc_nums.sort();
|
||||||
|
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -213,7 +213,7 @@ impl Recorder for TermFrequencyRecorder {
|
|||||||
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);
|
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);
|
||||||
|
|
||||||
for (doc_id, tf) in doc_id_and_tf {
|
for (doc_id, tf) in doc_id_and_tf {
|
||||||
serializer.write_doc(doc_id, 0, &[][..]);
|
serializer.write_doc(doc_id, tf, &[][..]);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let mut prev_doc = 0;
|
let mut prev_doc = 0;
|
||||||
@@ -221,7 +221,7 @@ impl Recorder for TermFrequencyRecorder {
|
|||||||
let doc_id = prev_doc + delta_doc_id;
|
let doc_id = prev_doc + delta_doc_id;
|
||||||
prev_doc = doc_id;
|
prev_doc = doc_id;
|
||||||
let term_freq = u32_it.next().unwrap_or(self.current_tf);
|
let term_freq = u32_it.next().unwrap_or(self.current_tf);
|
||||||
serializer.write_doc(doc_id, 0, &[][..]);
|
serializer.write_doc(doc_id, term_freq, &[][..]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ mod reader;
|
|||||||
mod writer;
|
mod writer;
|
||||||
pub use self::compressors::{Compressor, ZstdCompressor};
|
pub use self::compressors::{Compressor, ZstdCompressor};
|
||||||
pub use self::decompressors::Decompressor;
|
pub use self::decompressors::Decompressor;
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
pub(crate) use self::reader::CacheKey;
|
||||||
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
|
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
|
||||||
pub use self::reader::{CacheStats, StoreReader};
|
pub use self::reader::{CacheStats, StoreReader};
|
||||||
pub use self::writer::StoreWriter;
|
pub use self::writer::StoreWriter;
|
||||||
|
|||||||
@@ -40,6 +40,15 @@ struct BlockCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockCache {
|
impl BlockCache {
|
||||||
|
fn new(cache_num_blocks: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
cache: NonZeroUsize::new(cache_num_blocks)
|
||||||
|
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
||||||
|
cache_hits: Default::default(),
|
||||||
|
cache_misses: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn get_from_cache(&self, pos: usize) -> Option<Block> {
|
fn get_from_cache(&self, pos: usize) -> Option<Block> {
|
||||||
if let Some(block) = self
|
if let Some(block) = self
|
||||||
.cache
|
.cache
|
||||||
@@ -81,6 +90,10 @@ impl BlockCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Opaque cache key which indicates which documents are cached together.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
|
pub(crate) struct CacheKey(usize);
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
/// CacheStats for the `StoreReader`.
|
/// CacheStats for the `StoreReader`.
|
||||||
pub struct CacheStats {
|
pub struct CacheStats {
|
||||||
@@ -128,17 +141,35 @@ impl StoreReader {
|
|||||||
Ok(StoreReader {
|
Ok(StoreReader {
|
||||||
decompressor: footer.decompressor,
|
decompressor: footer.decompressor,
|
||||||
data: data_file,
|
data: data_file,
|
||||||
cache: BlockCache {
|
cache: BlockCache::new(cache_num_blocks),
|
||||||
cache: NonZeroUsize::new(cache_num_blocks)
|
|
||||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
|
||||||
cache_hits: Default::default(),
|
|
||||||
cache_misses: Default::default(),
|
|
||||||
},
|
|
||||||
skip_index: Arc::new(skip_index),
|
skip_index: Arc::new(skip_index),
|
||||||
space_usage,
|
space_usage,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Clones the given store reader with an independent block cache of the given size.
|
||||||
|
///
|
||||||
|
/// `cache_keys` is used to seed the forked cache from the current cache
|
||||||
|
/// if some blocks are already available.
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
|
||||||
|
let forked = Self {
|
||||||
|
decompressor: self.decompressor,
|
||||||
|
data: self.data.clone(),
|
||||||
|
cache: BlockCache::new(cache_num_blocks),
|
||||||
|
skip_index: Arc::clone(&self.skip_index),
|
||||||
|
space_usage: self.space_usage.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
for &CacheKey(pos) in cache_keys {
|
||||||
|
if let Some(block) = self.cache.get_from_cache(pos) {
|
||||||
|
forked.cache.put_into_cache(pos, block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
forked
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
|
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
|
||||||
self.skip_index.checkpoints()
|
self.skip_index.checkpoints()
|
||||||
}
|
}
|
||||||
@@ -152,6 +183,21 @@ impl StoreReader {
|
|||||||
self.cache.stats()
|
self.cache.stats()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the cache key for a given document
|
||||||
|
///
|
||||||
|
/// These keys are opaque and are not used with the public API,
|
||||||
|
/// but having the same cache key means that the documents
|
||||||
|
/// will only require one I/O and decompression operation
|
||||||
|
/// when retrieve from the same store reader consecutively.
|
||||||
|
///
|
||||||
|
/// Note that looking up the cache key of a document
|
||||||
|
/// will not yet pull anything into the block cache.
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result<CacheKey> {
|
||||||
|
let checkpoint = self.block_checkpoint(doc_id)?;
|
||||||
|
Ok(CacheKey(checkpoint.byte_range.start))
|
||||||
|
}
|
||||||
|
|
||||||
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
|
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
|
||||||
/// document.
|
/// document.
|
||||||
///
|
///
|
||||||
|
|||||||
Reference in New Issue
Block a user