mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 21:12:54 +00:00
Compare commits
3 Commits
block-cach
...
check_bug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c53f9f37f | ||
|
|
53f2fe1fbe | ||
|
|
9c75942aaf |
@@ -110,6 +110,9 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
|
||||
}
|
||||
|
||||
/// Get the docids of values which are in the provided value range.
|
||||
///
|
||||
/// # Panic
|
||||
/// Panics if a value in the selected_docid_range range is larger than the number of documents.
|
||||
#[inline]
|
||||
pub fn get_docids_for_value_range(
|
||||
&self,
|
||||
|
||||
@@ -126,6 +126,8 @@ impl ColumnIndex {
|
||||
}
|
||||
}
|
||||
|
||||
/// # Panic
|
||||
/// Panics if a value in the doc_id range is larger than the number of documents.
|
||||
pub fn docid_range_to_rowids(&self, doc_id: Range<DocId>) -> Range<RowId> {
|
||||
match self {
|
||||
ColumnIndex::Empty { .. } => 0..0,
|
||||
|
||||
@@ -21,8 +21,6 @@ const DENSE_BLOCK_THRESHOLD: u32 =
|
||||
|
||||
const ELEMENTS_PER_BLOCK: u32 = u16::MAX as u32 + 1;
|
||||
|
||||
const BLOCK_SIZE: RowId = 1 << 16;
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
struct BlockMeta {
|
||||
non_null_rows_before_block: u32,
|
||||
@@ -109,8 +107,8 @@ struct RowAddr {
|
||||
#[inline(always)]
|
||||
fn row_addr_from_row_id(row_id: RowId) -> RowAddr {
|
||||
RowAddr {
|
||||
block_id: (row_id / BLOCK_SIZE) as u16,
|
||||
in_block_row_id: (row_id % BLOCK_SIZE) as u16,
|
||||
block_id: (row_id / ELEMENTS_PER_BLOCK) as u16,
|
||||
in_block_row_id: (row_id % ELEMENTS_PER_BLOCK) as u16,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -490,8 +488,9 @@ fn deserialize_optional_index_block_metadatas(
|
||||
start_byte_offset += block_variant.num_bytes_in_block();
|
||||
non_null_rows_before_block += num_non_null_rows;
|
||||
}
|
||||
let last_block = row_addr_from_row_id(num_rows).block_id;
|
||||
block_metas.resize(
|
||||
((num_rows + BLOCK_SIZE - 1) / BLOCK_SIZE) as usize,
|
||||
last_block as usize + 1, // +1 since last block is an index
|
||||
BlockMeta {
|
||||
non_null_rows_before_block,
|
||||
start_byte_offset,
|
||||
|
||||
@@ -3,6 +3,29 @@ use proptest::strategy::Strategy;
|
||||
use proptest::{prop_oneof, proptest};
|
||||
|
||||
use super::*;
|
||||
use crate::{ColumnarReader, ColumnarWriter, DynamicColumnHandle};
|
||||
|
||||
#[test]
|
||||
fn test_optional_index_bug_2293() {
|
||||
test_optional_index_with_num_docs(ELEMENTS_PER_BLOCK - 1);
|
||||
test_optional_index_with_num_docs(ELEMENTS_PER_BLOCK);
|
||||
test_optional_index_with_num_docs(ELEMENTS_PER_BLOCK + 1);
|
||||
}
|
||||
fn test_optional_index_with_num_docs(num_docs: u32) {
|
||||
let mut dataframe_writer = ColumnarWriter::default();
|
||||
dataframe_writer.record_numerical(100, "score", 80i64);
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
dataframe_writer
|
||||
.serialize(num_docs, None, &mut buffer)
|
||||
.unwrap();
|
||||
let columnar = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar.num_columns(), 1);
|
||||
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("score").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
|
||||
let col = cols[0].open().unwrap();
|
||||
col.column_index().docid_range_to_rowids(0..num_docs);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dense_block_threshold() {
|
||||
@@ -35,7 +58,7 @@ proptest! {
|
||||
|
||||
#[test]
|
||||
fn test_with_random_sets_simple() {
|
||||
let vals = 10..BLOCK_SIZE * 2;
|
||||
let vals = 10..ELEMENTS_PER_BLOCK * 2;
|
||||
let mut out: Vec<u8> = Vec::new();
|
||||
serialize_optional_index(&vals, 100, &mut out).unwrap();
|
||||
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
|
||||
@@ -171,7 +194,7 @@ fn test_optional_index_rank() {
|
||||
test_optional_index_rank_aux(&[0u32, 1u32]);
|
||||
let mut block = Vec::new();
|
||||
block.push(3u32);
|
||||
block.extend((0..BLOCK_SIZE).map(|i| i + BLOCK_SIZE + 1));
|
||||
block.extend((0..ELEMENTS_PER_BLOCK).map(|i| i + ELEMENTS_PER_BLOCK + 1));
|
||||
test_optional_index_rank_aux(&block);
|
||||
}
|
||||
|
||||
@@ -185,8 +208,8 @@ fn test_optional_index_iter_empty_one() {
|
||||
fn test_optional_index_iter_dense_block() {
|
||||
let mut block = Vec::new();
|
||||
block.push(3u32);
|
||||
block.extend((0..BLOCK_SIZE).map(|i| i + BLOCK_SIZE + 1));
|
||||
test_optional_index_iter_aux(&block, 3 * BLOCK_SIZE);
|
||||
block.extend((0..ELEMENTS_PER_BLOCK).map(|i| i + ELEMENTS_PER_BLOCK + 1));
|
||||
test_optional_index_iter_aux(&block, 3 * ELEMENTS_PER_BLOCK);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use std::collections::BTreeMap;
|
||||
#[cfg(feature = "quickwit")]
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
@@ -114,108 +112,6 @@ impl Searcher {
|
||||
store_reader.get_async(doc_address.doc_id).await
|
||||
}
|
||||
|
||||
/// Fetches multiple documents in an asynchronous manner.
|
||||
///
|
||||
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times,
|
||||
/// as it groups overlapping requests to segments and blocks and avoids concurrent requests
|
||||
/// trashing the caches of each other. However, it does so using intermediate data structures
|
||||
/// and independent block caches so it will be slower if documents from very few blocks are
|
||||
/// fetched which would have fit into the global block cache.
|
||||
///
|
||||
/// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`)
|
||||
/// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether
|
||||
/// it is predominately I/O-bound or rather CPU-bound.
|
||||
///
|
||||
/// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled
|
||||
/// into the global block cache and hence not be available for subsequent calls.
|
||||
///
|
||||
/// Note that there is no synchronous variant of this method as the same degree of efficiency
|
||||
/// can be had by accessing documents in address order.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use futures::executor::block_on;
|
||||
/// # use futures::stream::{FuturesUnordered, StreamExt};
|
||||
/// #
|
||||
/// # use tantivy::schema::Schema;
|
||||
/// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError};
|
||||
/// #
|
||||
/// # let index = Index::create_in_ram(Schema::builder().build());
|
||||
/// # let searcher = index.reader()?.searcher();
|
||||
/// #
|
||||
/// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0));
|
||||
/// #
|
||||
/// let mut groups: FuturesUnordered<_> = searcher
|
||||
/// .docs_async::<TantivyDocument>(doc_addresses)?
|
||||
/// .collect();
|
||||
///
|
||||
/// let mut docs = Vec::new();
|
||||
///
|
||||
/// block_on(async {
|
||||
/// while let Some(group) = groups.next().await {
|
||||
/// docs.extend(group?);
|
||||
/// }
|
||||
///
|
||||
/// Ok::<_, TantivyError>(())
|
||||
/// })?;
|
||||
/// #
|
||||
/// # Ok::<_, TantivyError>(())
|
||||
/// ```
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub fn docs_async<D: DocumentDeserialize>(
|
||||
&self,
|
||||
doc_addresses: impl IntoIterator<Item = DocAddress>,
|
||||
) -> crate::Result<
|
||||
impl Iterator<Item = impl Future<Output = crate::Result<Vec<(DocAddress, D)>>>> + '_,
|
||||
> {
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
use crate::store::CacheKey;
|
||||
use crate::{DocId, SegmentOrdinal};
|
||||
|
||||
let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec<DocId>> = Default::default();
|
||||
|
||||
for doc_address in doc_addresses {
|
||||
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
|
||||
let cache_key = store_reader.cache_key(doc_address.doc_id)?;
|
||||
|
||||
groups
|
||||
.entry((doc_address.segment_ord, cache_key))
|
||||
.or_default()
|
||||
.push(doc_address.doc_id);
|
||||
}
|
||||
|
||||
let futures = groups
|
||||
.into_iter()
|
||||
.map(|((segment_ord, cache_key), doc_ids)| {
|
||||
// Each group fetches documents from exactly one block and
|
||||
// therefore gets an independent block cache of size one.
|
||||
let store_reader =
|
||||
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);
|
||||
|
||||
async move {
|
||||
let mut docs = Vec::new();
|
||||
|
||||
for doc_id in doc_ids {
|
||||
let doc = store_reader.get_async(doc_id).await?;
|
||||
|
||||
docs.push((
|
||||
DocAddress {
|
||||
segment_ord,
|
||||
doc_id,
|
||||
},
|
||||
doc,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(docs)
|
||||
}
|
||||
});
|
||||
|
||||
Ok(futures)
|
||||
}
|
||||
|
||||
/// Access the schema associated with the index of this searcher.
|
||||
pub fn schema(&self) -> &Schema {
|
||||
&self.inner.schema
|
||||
|
||||
@@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() {
|
||||
json_term_writer.set_fast_value(75u64);
|
||||
let postings = inv_idx
|
||||
.read_postings(
|
||||
json_term_writer.term(),
|
||||
&json_term_writer.term(),
|
||||
IndexRecordOption::WithFreqsAndPositions,
|
||||
)
|
||||
.unwrap()
|
||||
@@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() {
|
||||
json_term_writer.set_fast_value(75u64);
|
||||
let mut postings = inv_idx
|
||||
.read_postings(
|
||||
json_term_writer.term(),
|
||||
&json_term_writer.term(),
|
||||
IndexRecordOption::WithFreqsAndPositions,
|
||||
)
|
||||
.unwrap()
|
||||
@@ -474,60 +474,3 @@ fn test_non_text_json_term_freq_bitpacked() {
|
||||
assert_eq!(postings.term_freq(), 1u32);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "quickwit")]
|
||||
#[test]
|
||||
fn test_get_many_docs() -> crate::Result<()> {
|
||||
use futures::executor::block_on;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
|
||||
use crate::schema::{OwnedValue, STORED};
|
||||
use crate::{DocAddress, TantivyError};
|
||||
|
||||
let mut schema_builder = Schema::builder();
|
||||
let num_field = schema_builder.add_u64_field("num", STORED);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer: IndexWriter = index.writer_for_tests()?;
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
for i in 0..10u64 {
|
||||
let doc = doc!(num_field=>i);
|
||||
index_writer.add_document(doc)?;
|
||||
}
|
||||
|
||||
index_writer.commit()?;
|
||||
let segment_ids = index.searchable_segment_ids()?;
|
||||
index_writer.merge(&segment_ids).wait().unwrap();
|
||||
|
||||
let searcher = index.reader()?.searcher();
|
||||
assert_eq!(searcher.num_docs(), 10);
|
||||
|
||||
let doc_addresses = (0..10).map(|i| DocAddress::new(0, i));
|
||||
|
||||
let mut groups: FuturesUnordered<_> = searcher
|
||||
.docs_async::<TantivyDocument>(doc_addresses)?
|
||||
.collect();
|
||||
|
||||
let mut doc_nums = Vec::new();
|
||||
|
||||
block_on(async {
|
||||
while let Some(group) = groups.next().await {
|
||||
for (_doc_address, doc) in group? {
|
||||
let num_value = doc.get_first(num_field).unwrap();
|
||||
|
||||
if let OwnedValue::U64(num) = num_value {
|
||||
doc_nums.push(*num);
|
||||
} else {
|
||||
panic!("Expected u64 value");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, TantivyError>(())
|
||||
})?;
|
||||
|
||||
doc_nums.sort();
|
||||
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1651,6 +1651,7 @@ mod tests {
|
||||
force_end_merge: bool,
|
||||
) -> crate::Result<Index> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let json_field = schema_builder.add_json_field("json", FAST | TEXT | STORED);
|
||||
let ip_field = schema_builder.add_ip_addr_field("ip", FAST | INDEXED | STORED);
|
||||
let ips_field = schema_builder
|
||||
.add_ip_addr_field("ips", IpAddrOptions::default().set_fast().set_indexed());
|
||||
@@ -1729,7 +1730,9 @@ mod tests {
|
||||
id_field=>id,
|
||||
))?;
|
||||
} else {
|
||||
let json = json!({"date1": format!("2022-{id}-01T00:00:01Z"), "date2": format!("{id}-05-01T00:00:01Z"), "id": id, "ip": ip.to_string()});
|
||||
index_writer.add_document(doc!(id_field=>id,
|
||||
json_field=>json,
|
||||
bytes_field => id.to_le_bytes().as_slice(),
|
||||
id_opt_field => id,
|
||||
ip_field => ip,
|
||||
|
||||
@@ -605,6 +605,10 @@ impl IndexMerger {
|
||||
segment_postings.positions(&mut positions_buffer);
|
||||
segment_postings.term_freq()
|
||||
} else {
|
||||
// The positions_buffer may contain positions from the previous term
|
||||
// Existence of positions depend on the value type in JSON fields.
|
||||
// https://github.com/quickwit-oss/tantivy/issues/2283
|
||||
positions_buffer.clear();
|
||||
0u32
|
||||
};
|
||||
|
||||
|
||||
@@ -879,6 +879,31 @@ mod tests {
|
||||
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
|
||||
// https://github.com/quickwit-oss/tantivy/issues/2283
|
||||
let mut schema_builder = Schema::builder();
|
||||
let json = schema_builder.add_json_field("json", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
let doc = json!({"field": "a"});
|
||||
writer.add_document(doc!(json=>doc)).unwrap();
|
||||
writer.commit().unwrap();
|
||||
let doc = json!({"field": "a", "id": 1});
|
||||
writer.add_document(doc!(json=>doc.clone())).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
// Force Merge
|
||||
writer.wait_merging_threads().unwrap();
|
||||
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
|
||||
let segment_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
index_writer.merge(&segment_ids).wait().unwrap();
|
||||
assert!(index_writer.wait_merging_threads().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bug_regression_1629_position_when_array_with_a_field_value_that_does_not_contain_any_token(
|
||||
) {
|
||||
|
||||
@@ -63,7 +63,7 @@ impl RegexQuery {
|
||||
/// Creates a new RegexQuery from a given pattern
|
||||
pub fn from_pattern(regex_pattern: &str, field: Field) -> crate::Result<Self> {
|
||||
let regex = Regex::new(regex_pattern)
|
||||
.map_err(|_| TantivyError::InvalidArgument(regex_pattern.to_string()))?;
|
||||
.map_err(|err| TantivyError::InvalidArgument(format!("RegexQueryError: {err}")))?;
|
||||
Ok(RegexQuery::from_regex(regex, field))
|
||||
}
|
||||
|
||||
@@ -176,4 +176,16 @@ mod test {
|
||||
verify_regex_query(matching_one, matching_zero, reader);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_pattern_error() {
|
||||
let (_reader, field) = build_test_index().unwrap();
|
||||
|
||||
match RegexQuery::from_pattern(r"(foo", field) {
|
||||
Err(crate::TantivyError::InvalidArgument(msg)) => {
|
||||
assert!(msg.contains("error: unclosed group"))
|
||||
}
|
||||
res => panic!("unexpected result: {:?}", res),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,8 +37,6 @@ mod reader;
|
||||
mod writer;
|
||||
pub use self::compressors::{Compressor, ZstdCompressor};
|
||||
pub use self::decompressors::Decompressor;
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub(crate) use self::reader::CacheKey;
|
||||
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
|
||||
pub use self::reader::{CacheStats, StoreReader};
|
||||
pub use self::writer::StoreWriter;
|
||||
|
||||
@@ -40,15 +40,6 @@ struct BlockCache {
|
||||
}
|
||||
|
||||
impl BlockCache {
|
||||
fn new(cache_num_blocks: usize) -> Self {
|
||||
Self {
|
||||
cache: NonZeroUsize::new(cache_num_blocks)
|
||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_from_cache(&self, pos: usize) -> Option<Block> {
|
||||
if let Some(block) = self
|
||||
.cache
|
||||
@@ -90,10 +81,6 @@ impl BlockCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Opaque cache key which indicates which documents are cached together.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub(crate) struct CacheKey(usize);
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// CacheStats for the `StoreReader`.
|
||||
pub struct CacheStats {
|
||||
@@ -141,35 +128,17 @@ impl StoreReader {
|
||||
Ok(StoreReader {
|
||||
decompressor: footer.decompressor,
|
||||
data: data_file,
|
||||
cache: BlockCache::new(cache_num_blocks),
|
||||
cache: BlockCache {
|
||||
cache: NonZeroUsize::new(cache_num_blocks)
|
||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
},
|
||||
skip_index: Arc::new(skip_index),
|
||||
space_usage,
|
||||
})
|
||||
}
|
||||
|
||||
/// Clones the given store reader with an independent block cache of the given size.
|
||||
///
|
||||
/// `cache_keys` is used to seed the forked cache from the current cache
|
||||
/// if some blocks are already available.
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
|
||||
let forked = Self {
|
||||
decompressor: self.decompressor,
|
||||
data: self.data.clone(),
|
||||
cache: BlockCache::new(cache_num_blocks),
|
||||
skip_index: Arc::clone(&self.skip_index),
|
||||
space_usage: self.space_usage.clone(),
|
||||
};
|
||||
|
||||
for &CacheKey(pos) in cache_keys {
|
||||
if let Some(block) = self.cache.get_from_cache(pos) {
|
||||
forked.cache.put_into_cache(pos, block);
|
||||
}
|
||||
}
|
||||
|
||||
forked
|
||||
}
|
||||
|
||||
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
|
||||
self.skip_index.checkpoints()
|
||||
}
|
||||
@@ -183,21 +152,6 @@ impl StoreReader {
|
||||
self.cache.stats()
|
||||
}
|
||||
|
||||
/// Returns the cache key for a given document
|
||||
///
|
||||
/// These keys are opaque and are not used with the public API,
|
||||
/// but having the same cache key means that the documents
|
||||
/// will only require one I/O and decompression operation
|
||||
/// when retrieve from the same store reader consecutively.
|
||||
///
|
||||
/// Note that looking up the cache key of a document
|
||||
/// will not yet pull anything into the block cache.
|
||||
#[cfg(feature = "quickwit")]
|
||||
pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result<CacheKey> {
|
||||
let checkpoint = self.block_checkpoint(doc_id)?;
|
||||
Ok(CacheKey(checkpoint.byte_range.start))
|
||||
}
|
||||
|
||||
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
|
||||
/// document.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user