mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-08 01:52:54 +00:00
Compare commits
1 Commits
block-cach
...
missing_te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dd57b7fa3a |
@@ -1,6 +1,4 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
#[cfg(feature = "quickwit")]
|
|
||||||
use std::future::Future;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{fmt, io};
|
use std::{fmt, io};
|
||||||
|
|
||||||
@@ -114,108 +112,6 @@ impl Searcher {
|
|||||||
store_reader.get_async(doc_address.doc_id).await
|
store_reader.get_async(doc_address.doc_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches multiple documents in an asynchronous manner.
|
|
||||||
///
|
|
||||||
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times,
|
|
||||||
/// as it groups overlapping requests to segments and blocks and avoids concurrent requests
|
|
||||||
/// trashing the caches of each other. However, it does so using intermediate data structures
|
|
||||||
/// and independent block caches so it will be slower if documents from very few blocks are
|
|
||||||
/// fetched which would have fit into the global block cache.
|
|
||||||
///
|
|
||||||
/// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`)
|
|
||||||
/// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether
|
|
||||||
/// it is predominately I/O-bound or rather CPU-bound.
|
|
||||||
///
|
|
||||||
/// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled
|
|
||||||
/// into the global block cache and hence not be available for subsequent calls.
|
|
||||||
///
|
|
||||||
/// Note that there is no synchronous variant of this method as the same degree of efficiency
|
|
||||||
/// can be had by accessing documents in address order.
|
|
||||||
///
|
|
||||||
/// # Example
|
|
||||||
///
|
|
||||||
/// ```rust,no_run
|
|
||||||
/// # use futures::executor::block_on;
|
|
||||||
/// # use futures::stream::{FuturesUnordered, StreamExt};
|
|
||||||
/// #
|
|
||||||
/// # use tantivy::schema::Schema;
|
|
||||||
/// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError};
|
|
||||||
/// #
|
|
||||||
/// # let index = Index::create_in_ram(Schema::builder().build());
|
|
||||||
/// # let searcher = index.reader()?.searcher();
|
|
||||||
/// #
|
|
||||||
/// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0));
|
|
||||||
/// #
|
|
||||||
/// let mut groups: FuturesUnordered<_> = searcher
|
|
||||||
/// .docs_async::<TantivyDocument>(doc_addresses)?
|
|
||||||
/// .collect();
|
|
||||||
///
|
|
||||||
/// let mut docs = Vec::new();
|
|
||||||
///
|
|
||||||
/// block_on(async {
|
|
||||||
/// while let Some(group) = groups.next().await {
|
|
||||||
/// docs.extend(group?);
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// Ok::<_, TantivyError>(())
|
|
||||||
/// })?;
|
|
||||||
/// #
|
|
||||||
/// # Ok::<_, TantivyError>(())
|
|
||||||
/// ```
|
|
||||||
#[cfg(feature = "quickwit")]
|
|
||||||
pub fn docs_async<D: DocumentDeserialize>(
|
|
||||||
&self,
|
|
||||||
doc_addresses: impl IntoIterator<Item = DocAddress>,
|
|
||||||
) -> crate::Result<
|
|
||||||
impl Iterator<Item = impl Future<Output = crate::Result<Vec<(DocAddress, D)>>>> + '_,
|
|
||||||
> {
|
|
||||||
use rustc_hash::FxHashMap;
|
|
||||||
|
|
||||||
use crate::store::CacheKey;
|
|
||||||
use crate::{DocId, SegmentOrdinal};
|
|
||||||
|
|
||||||
let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec<DocId>> = Default::default();
|
|
||||||
|
|
||||||
for doc_address in doc_addresses {
|
|
||||||
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
|
||||||
let cache_key = store_reader.cache_key(doc_address.doc_id)?;
|
|
||||||
|
|
||||||
groups
|
|
||||||
.entry((doc_address.segment_ord, cache_key))
|
|
||||||
.or_default()
|
|
||||||
.push(doc_address.doc_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
let futures = groups
|
|
||||||
.into_iter()
|
|
||||||
.map(|((segment_ord, cache_key), doc_ids)| {
|
|
||||||
// Each group fetches documents from exactly one block and
|
|
||||||
// therefore gets an independent block cache of size one.
|
|
||||||
let store_reader =
|
|
||||||
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);
|
|
||||||
|
|
||||||
async move {
|
|
||||||
let mut docs = Vec::new();
|
|
||||||
|
|
||||||
for doc_id in doc_ids {
|
|
||||||
let doc = store_reader.get_async(doc_id).await?;
|
|
||||||
|
|
||||||
docs.push((
|
|
||||||
DocAddress {
|
|
||||||
segment_ord,
|
|
||||||
doc_id,
|
|
||||||
},
|
|
||||||
doc,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(docs)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(futures)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Access the schema associated with the index of this searcher.
|
/// Access the schema associated with the index of this searcher.
|
||||||
pub fn schema(&self) -> &Schema {
|
pub fn schema(&self) -> &Schema {
|
||||||
&self.inner.schema
|
&self.inner.schema
|
||||||
|
|||||||
@@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() {
|
|||||||
json_term_writer.set_fast_value(75u64);
|
json_term_writer.set_fast_value(75u64);
|
||||||
let postings = inv_idx
|
let postings = inv_idx
|
||||||
.read_postings(
|
.read_postings(
|
||||||
json_term_writer.term(),
|
&json_term_writer.term(),
|
||||||
IndexRecordOption::WithFreqsAndPositions,
|
IndexRecordOption::WithFreqsAndPositions,
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() {
|
|||||||
json_term_writer.set_fast_value(75u64);
|
json_term_writer.set_fast_value(75u64);
|
||||||
let mut postings = inv_idx
|
let mut postings = inv_idx
|
||||||
.read_postings(
|
.read_postings(
|
||||||
json_term_writer.term(),
|
&json_term_writer.term(),
|
||||||
IndexRecordOption::WithFreqsAndPositions,
|
IndexRecordOption::WithFreqsAndPositions,
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@@ -474,60 +474,3 @@ fn test_non_text_json_term_freq_bitpacked() {
|
|||||||
assert_eq!(postings.term_freq(), 1u32);
|
assert_eq!(postings.term_freq(), 1u32);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "quickwit")]
|
|
||||||
#[test]
|
|
||||||
fn test_get_many_docs() -> crate::Result<()> {
|
|
||||||
use futures::executor::block_on;
|
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
|
||||||
|
|
||||||
use crate::schema::{OwnedValue, STORED};
|
|
||||||
use crate::{DocAddress, TantivyError};
|
|
||||||
|
|
||||||
let mut schema_builder = Schema::builder();
|
|
||||||
let num_field = schema_builder.add_u64_field("num", STORED);
|
|
||||||
let schema = schema_builder.build();
|
|
||||||
let index = Index::create_in_ram(schema);
|
|
||||||
let mut index_writer: IndexWriter = index.writer_for_tests()?;
|
|
||||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
|
||||||
for i in 0..10u64 {
|
|
||||||
let doc = doc!(num_field=>i);
|
|
||||||
index_writer.add_document(doc)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
index_writer.commit()?;
|
|
||||||
let segment_ids = index.searchable_segment_ids()?;
|
|
||||||
index_writer.merge(&segment_ids).wait().unwrap();
|
|
||||||
|
|
||||||
let searcher = index.reader()?.searcher();
|
|
||||||
assert_eq!(searcher.num_docs(), 10);
|
|
||||||
|
|
||||||
let doc_addresses = (0..10).map(|i| DocAddress::new(0, i));
|
|
||||||
|
|
||||||
let mut groups: FuturesUnordered<_> = searcher
|
|
||||||
.docs_async::<TantivyDocument>(doc_addresses)?
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut doc_nums = Vec::new();
|
|
||||||
|
|
||||||
block_on(async {
|
|
||||||
while let Some(group) = groups.next().await {
|
|
||||||
for (_doc_address, doc) in group? {
|
|
||||||
let num_value = doc.get_first(num_field).unwrap();
|
|
||||||
|
|
||||||
if let OwnedValue::U64(num) = num_value {
|
|
||||||
doc_nums.push(*num);
|
|
||||||
} else {
|
|
||||||
panic!("Expected u64 value");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok::<_, TantivyError>(())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
doc_nums.sort();
|
|
||||||
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -213,7 +213,7 @@ impl Recorder for TermFrequencyRecorder {
|
|||||||
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);
|
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);
|
||||||
|
|
||||||
for (doc_id, tf) in doc_id_and_tf {
|
for (doc_id, tf) in doc_id_and_tf {
|
||||||
serializer.write_doc(doc_id, tf, &[][..]);
|
serializer.write_doc(doc_id, 0, &[][..]);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let mut prev_doc = 0;
|
let mut prev_doc = 0;
|
||||||
@@ -221,7 +221,7 @@ impl Recorder for TermFrequencyRecorder {
|
|||||||
let doc_id = prev_doc + delta_doc_id;
|
let doc_id = prev_doc + delta_doc_id;
|
||||||
prev_doc = doc_id;
|
prev_doc = doc_id;
|
||||||
let term_freq = u32_it.next().unwrap_or(self.current_tf);
|
let term_freq = u32_it.next().unwrap_or(self.current_tf);
|
||||||
serializer.write_doc(doc_id, term_freq, &[][..]);
|
serializer.write_doc(doc_id, 0, &[][..]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,8 +37,6 @@ mod reader;
|
|||||||
mod writer;
|
mod writer;
|
||||||
pub use self::compressors::{Compressor, ZstdCompressor};
|
pub use self::compressors::{Compressor, ZstdCompressor};
|
||||||
pub use self::decompressors::Decompressor;
|
pub use self::decompressors::Decompressor;
|
||||||
#[cfg(feature = "quickwit")]
|
|
||||||
pub(crate) use self::reader::CacheKey;
|
|
||||||
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
|
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
|
||||||
pub use self::reader::{CacheStats, StoreReader};
|
pub use self::reader::{CacheStats, StoreReader};
|
||||||
pub use self::writer::StoreWriter;
|
pub use self::writer::StoreWriter;
|
||||||
|
|||||||
@@ -40,15 +40,6 @@ struct BlockCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockCache {
|
impl BlockCache {
|
||||||
fn new(cache_num_blocks: usize) -> Self {
|
|
||||||
Self {
|
|
||||||
cache: NonZeroUsize::new(cache_num_blocks)
|
|
||||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
|
||||||
cache_hits: Default::default(),
|
|
||||||
cache_misses: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_from_cache(&self, pos: usize) -> Option<Block> {
|
fn get_from_cache(&self, pos: usize) -> Option<Block> {
|
||||||
if let Some(block) = self
|
if let Some(block) = self
|
||||||
.cache
|
.cache
|
||||||
@@ -90,10 +81,6 @@ impl BlockCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Opaque cache key which indicates which documents are cached together.
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
||||||
pub(crate) struct CacheKey(usize);
|
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
/// CacheStats for the `StoreReader`.
|
/// CacheStats for the `StoreReader`.
|
||||||
pub struct CacheStats {
|
pub struct CacheStats {
|
||||||
@@ -141,35 +128,17 @@ impl StoreReader {
|
|||||||
Ok(StoreReader {
|
Ok(StoreReader {
|
||||||
decompressor: footer.decompressor,
|
decompressor: footer.decompressor,
|
||||||
data: data_file,
|
data: data_file,
|
||||||
cache: BlockCache::new(cache_num_blocks),
|
cache: BlockCache {
|
||||||
|
cache: NonZeroUsize::new(cache_num_blocks)
|
||||||
|
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
||||||
|
cache_hits: Default::default(),
|
||||||
|
cache_misses: Default::default(),
|
||||||
|
},
|
||||||
skip_index: Arc::new(skip_index),
|
skip_index: Arc::new(skip_index),
|
||||||
space_usage,
|
space_usage,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clones the given store reader with an independent block cache of the given size.
|
|
||||||
///
|
|
||||||
/// `cache_keys` is used to seed the forked cache from the current cache
|
|
||||||
/// if some blocks are already available.
|
|
||||||
#[cfg(feature = "quickwit")]
|
|
||||||
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
|
|
||||||
let forked = Self {
|
|
||||||
decompressor: self.decompressor,
|
|
||||||
data: self.data.clone(),
|
|
||||||
cache: BlockCache::new(cache_num_blocks),
|
|
||||||
skip_index: Arc::clone(&self.skip_index),
|
|
||||||
space_usage: self.space_usage.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
for &CacheKey(pos) in cache_keys {
|
|
||||||
if let Some(block) = self.cache.get_from_cache(pos) {
|
|
||||||
forked.cache.put_into_cache(pos, block);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
forked
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
|
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
|
||||||
self.skip_index.checkpoints()
|
self.skip_index.checkpoints()
|
||||||
}
|
}
|
||||||
@@ -183,21 +152,6 @@ impl StoreReader {
|
|||||||
self.cache.stats()
|
self.cache.stats()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the cache key for a given document
|
|
||||||
///
|
|
||||||
/// These keys are opaque and are not used with the public API,
|
|
||||||
/// but having the same cache key means that the documents
|
|
||||||
/// will only require one I/O and decompression operation
|
|
||||||
/// when retrieve from the same store reader consecutively.
|
|
||||||
///
|
|
||||||
/// Note that looking up the cache key of a document
|
|
||||||
/// will not yet pull anything into the block cache.
|
|
||||||
#[cfg(feature = "quickwit")]
|
|
||||||
pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result<CacheKey> {
|
|
||||||
let checkpoint = self.block_checkpoint(doc_id)?;
|
|
||||||
Ok(CacheKey(checkpoint.byte_range.start))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
|
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
|
||||||
/// document.
|
/// document.
|
||||||
///
|
///
|
||||||
|
|||||||
Reference in New Issue
Block a user