Compare commits

..

3 Commits

Author SHA1 Message Date
Pascal Seitz
6c53f9f37f Fix off by one in optional index
Fixes #2293

Fixes an off by one error in the metadata resize of the optional index
when loading the index.

Merge variables with the same meaning but different names
2024-01-09 15:09:49 +08:00
Adam Reichold
53f2fe1fbe Forward regex parser errors to enable understandin their reason. (#2288) 2023-12-22 11:01:10 +01:00
PSeitz
9c75942aaf fix merge panic for JSON fields (#2284)
Root cause was the positions buffer had residue positions from the
previous term, when the terms were alternating between having and not
having positions in JSON (terms have positions, but not numerics).

Fixes #2283
2023-12-21 11:05:34 +01:00
12 changed files with 89 additions and 227 deletions

View File

@@ -110,6 +110,9 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
}
/// Get the docids of values which are in the provided value range.
///
/// # Panic
/// Panics if a value in the selected_docid_range range is larger than the number of documents.
#[inline]
pub fn get_docids_for_value_range(
&self,

View File

@@ -126,6 +126,8 @@ impl ColumnIndex {
}
}
/// # Panic
/// Panics if a value in the doc_id range is larger than the number of documents.
pub fn docid_range_to_rowids(&self, doc_id: Range<DocId>) -> Range<RowId> {
match self {
ColumnIndex::Empty { .. } => 0..0,

View File

@@ -21,8 +21,6 @@ const DENSE_BLOCK_THRESHOLD: u32 =
const ELEMENTS_PER_BLOCK: u32 = u16::MAX as u32 + 1;
const BLOCK_SIZE: RowId = 1 << 16;
#[derive(Copy, Clone, Debug)]
struct BlockMeta {
non_null_rows_before_block: u32,
@@ -109,8 +107,8 @@ struct RowAddr {
#[inline(always)]
fn row_addr_from_row_id(row_id: RowId) -> RowAddr {
RowAddr {
block_id: (row_id / BLOCK_SIZE) as u16,
in_block_row_id: (row_id % BLOCK_SIZE) as u16,
block_id: (row_id / ELEMENTS_PER_BLOCK) as u16,
in_block_row_id: (row_id % ELEMENTS_PER_BLOCK) as u16,
}
}
@@ -490,8 +488,9 @@ fn deserialize_optional_index_block_metadatas(
start_byte_offset += block_variant.num_bytes_in_block();
non_null_rows_before_block += num_non_null_rows;
}
let last_block = row_addr_from_row_id(num_rows).block_id;
block_metas.resize(
((num_rows + BLOCK_SIZE - 1) / BLOCK_SIZE) as usize,
last_block as usize + 1, // +1 since last block is an index
BlockMeta {
non_null_rows_before_block,
start_byte_offset,

View File

@@ -3,6 +3,29 @@ use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};
use super::*;
use crate::{ColumnarReader, ColumnarWriter, DynamicColumnHandle};
#[test]
fn test_optional_index_bug_2293() {
test_optional_index_with_num_docs(ELEMENTS_PER_BLOCK - 1);
test_optional_index_with_num_docs(ELEMENTS_PER_BLOCK);
test_optional_index_with_num_docs(ELEMENTS_PER_BLOCK + 1);
}
fn test_optional_index_with_num_docs(num_docs: u32) {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(100, "score", 80i64);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(num_docs, None, &mut buffer)
.unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("score").unwrap();
assert_eq!(cols.len(), 1);
let col = cols[0].open().unwrap();
col.column_index().docid_range_to_rowids(0..num_docs);
}
#[test]
fn test_dense_block_threshold() {
@@ -35,7 +58,7 @@ proptest! {
#[test]
fn test_with_random_sets_simple() {
let vals = 10..BLOCK_SIZE * 2;
let vals = 10..ELEMENTS_PER_BLOCK * 2;
let mut out: Vec<u8> = Vec::new();
serialize_optional_index(&vals, 100, &mut out).unwrap();
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
@@ -171,7 +194,7 @@ fn test_optional_index_rank() {
test_optional_index_rank_aux(&[0u32, 1u32]);
let mut block = Vec::new();
block.push(3u32);
block.extend((0..BLOCK_SIZE).map(|i| i + BLOCK_SIZE + 1));
block.extend((0..ELEMENTS_PER_BLOCK).map(|i| i + ELEMENTS_PER_BLOCK + 1));
test_optional_index_rank_aux(&block);
}
@@ -185,8 +208,8 @@ fn test_optional_index_iter_empty_one() {
fn test_optional_index_iter_dense_block() {
let mut block = Vec::new();
block.push(3u32);
block.extend((0..BLOCK_SIZE).map(|i| i + BLOCK_SIZE + 1));
test_optional_index_iter_aux(&block, 3 * BLOCK_SIZE);
block.extend((0..ELEMENTS_PER_BLOCK).map(|i| i + ELEMENTS_PER_BLOCK + 1));
test_optional_index_iter_aux(&block, 3 * ELEMENTS_PER_BLOCK);
}
#[test]

View File

@@ -1,6 +1,4 @@
use std::collections::BTreeMap;
#[cfg(feature = "quickwit")]
use std::future::Future;
use std::sync::Arc;
use std::{fmt, io};
@@ -114,108 +112,6 @@ impl Searcher {
store_reader.get_async(doc_address.doc_id).await
}
/// Fetches multiple documents in an asynchronous manner.
///
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times,
/// as it groups overlapping requests to segments and blocks and avoids concurrent requests
/// trashing the caches of each other. However, it does so using intermediate data structures
/// and independent block caches so it will be slower if documents from very few blocks are
/// fetched which would have fit into the global block cache.
///
/// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`)
/// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether
/// it is predominately I/O-bound or rather CPU-bound.
///
/// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled
/// into the global block cache and hence not be available for subsequent calls.
///
/// Note that there is no synchronous variant of this method as the same degree of efficiency
/// can be had by accessing documents in address order.
///
/// # Example
///
/// ```rust,no_run
/// # use futures::executor::block_on;
/// # use futures::stream::{FuturesUnordered, StreamExt};
/// #
/// # use tantivy::schema::Schema;
/// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError};
/// #
/// # let index = Index::create_in_ram(Schema::builder().build());
/// # let searcher = index.reader()?.searcher();
/// #
/// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0));
/// #
/// let mut groups: FuturesUnordered<_> = searcher
/// .docs_async::<TantivyDocument>(doc_addresses)?
/// .collect();
///
/// let mut docs = Vec::new();
///
/// block_on(async {
/// while let Some(group) = groups.next().await {
/// docs.extend(group?);
/// }
///
/// Ok::<_, TantivyError>(())
/// })?;
/// #
/// # Ok::<_, TantivyError>(())
/// ```
#[cfg(feature = "quickwit")]
pub fn docs_async<D: DocumentDeserialize>(
&self,
doc_addresses: impl IntoIterator<Item = DocAddress>,
) -> crate::Result<
impl Iterator<Item = impl Future<Output = crate::Result<Vec<(DocAddress, D)>>>> + '_,
> {
use rustc_hash::FxHashMap;
use crate::store::CacheKey;
use crate::{DocId, SegmentOrdinal};
let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec<DocId>> = Default::default();
for doc_address in doc_addresses {
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
let cache_key = store_reader.cache_key(doc_address.doc_id)?;
groups
.entry((doc_address.segment_ord, cache_key))
.or_default()
.push(doc_address.doc_id);
}
let futures = groups
.into_iter()
.map(|((segment_ord, cache_key), doc_ids)| {
// Each group fetches documents from exactly one block and
// therefore gets an independent block cache of size one.
let store_reader =
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);
async move {
let mut docs = Vec::new();
for doc_id in doc_ids {
let doc = store_reader.get_async(doc_id).await?;
docs.push((
DocAddress {
segment_ord,
doc_id,
},
doc,
));
}
Ok(docs)
}
});
Ok(futures)
}
/// Access the schema associated with the index of this searcher.
pub fn schema(&self) -> &Schema {
&self.inner.schema

View File

@@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() {
json_term_writer.set_fast_value(75u64);
let postings = inv_idx
.read_postings(
json_term_writer.term(),
&json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
@@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() {
json_term_writer.set_fast_value(75u64);
let mut postings = inv_idx
.read_postings(
json_term_writer.term(),
&json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
@@ -474,60 +474,3 @@ fn test_non_text_json_term_freq_bitpacked() {
assert_eq!(postings.term_freq(), 1u32);
}
}
#[cfg(feature = "quickwit")]
#[test]
fn test_get_many_docs() -> crate::Result<()> {
use futures::executor::block_on;
use futures::stream::{FuturesUnordered, StreamExt};
use crate::schema::{OwnedValue, STORED};
use crate::{DocAddress, TantivyError};
let mut schema_builder = Schema::builder();
let num_field = schema_builder.add_u64_field("num", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
for i in 0..10u64 {
let doc = doc!(num_field=>i);
index_writer.add_document(doc)?;
}
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
index_writer.merge(&segment_ids).wait().unwrap();
let searcher = index.reader()?.searcher();
assert_eq!(searcher.num_docs(), 10);
let doc_addresses = (0..10).map(|i| DocAddress::new(0, i));
let mut groups: FuturesUnordered<_> = searcher
.docs_async::<TantivyDocument>(doc_addresses)?
.collect();
let mut doc_nums = Vec::new();
block_on(async {
while let Some(group) = groups.next().await {
for (_doc_address, doc) in group? {
let num_value = doc.get_first(num_field).unwrap();
if let OwnedValue::U64(num) = num_value {
doc_nums.push(*num);
} else {
panic!("Expected u64 value");
}
}
}
Ok::<_, TantivyError>(())
})?;
doc_nums.sort();
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());
Ok(())
}

View File

@@ -1651,6 +1651,7 @@ mod tests {
force_end_merge: bool,
) -> crate::Result<Index> {
let mut schema_builder = schema::Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST | TEXT | STORED);
let ip_field = schema_builder.add_ip_addr_field("ip", FAST | INDEXED | STORED);
let ips_field = schema_builder
.add_ip_addr_field("ips", IpAddrOptions::default().set_fast().set_indexed());
@@ -1729,7 +1730,9 @@ mod tests {
id_field=>id,
))?;
} else {
let json = json!({"date1": format!("2022-{id}-01T00:00:01Z"), "date2": format!("{id}-05-01T00:00:01Z"), "id": id, "ip": ip.to_string()});
index_writer.add_document(doc!(id_field=>id,
json_field=>json,
bytes_field => id.to_le_bytes().as_slice(),
id_opt_field => id,
ip_field => ip,

View File

@@ -605,6 +605,10 @@ impl IndexMerger {
segment_postings.positions(&mut positions_buffer);
segment_postings.term_freq()
} else {
// The positions_buffer may contain positions from the previous term
// Existence of positions depend on the value type in JSON fields.
// https://github.com/quickwit-oss/tantivy/issues/2283
positions_buffer.clear();
0u32
};

View File

@@ -879,6 +879,31 @@ mod tests {
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}
#[test]
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
// https://github.com/quickwit-oss/tantivy/issues/2283
let mut schema_builder = Schema::builder();
let json = schema_builder.add_json_field("json", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_for_tests().unwrap();
let doc = json!({"field": "a"});
writer.add_document(doc!(json=>doc)).unwrap();
writer.commit().unwrap();
let doc = json!({"field": "a", "id": 1});
writer.add_document(doc!(json=>doc.clone())).unwrap();
writer.commit().unwrap();
// Force Merge
writer.wait_merging_threads().unwrap();
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
index_writer.merge(&segment_ids).wait().unwrap();
assert!(index_writer.wait_merging_threads().is_ok());
}
#[test]
fn test_bug_regression_1629_position_when_array_with_a_field_value_that_does_not_contain_any_token(
) {

View File

@@ -63,7 +63,7 @@ impl RegexQuery {
/// Creates a new RegexQuery from a given pattern
pub fn from_pattern(regex_pattern: &str, field: Field) -> crate::Result<Self> {
let regex = Regex::new(regex_pattern)
.map_err(|_| TantivyError::InvalidArgument(regex_pattern.to_string()))?;
.map_err(|err| TantivyError::InvalidArgument(format!("RegexQueryError: {err}")))?;
Ok(RegexQuery::from_regex(regex, field))
}
@@ -176,4 +176,16 @@ mod test {
verify_regex_query(matching_one, matching_zero, reader);
Ok(())
}
#[test]
pub fn test_pattern_error() {
let (_reader, field) = build_test_index().unwrap();
match RegexQuery::from_pattern(r"(foo", field) {
Err(crate::TantivyError::InvalidArgument(msg)) => {
assert!(msg.contains("error: unclosed group"))
}
res => panic!("unexpected result: {:?}", res),
}
}
}

View File

@@ -37,8 +37,6 @@ mod reader;
mod writer;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
#[cfg(feature = "quickwit")]
pub(crate) use self::reader::CacheKey;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;

View File

@@ -40,15 +40,6 @@ struct BlockCache {
}
impl BlockCache {
fn new(cache_num_blocks: usize) -> Self {
Self {
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
}
}
fn get_from_cache(&self, pos: usize) -> Option<Block> {
if let Some(block) = self
.cache
@@ -90,10 +81,6 @@ impl BlockCache {
}
}
/// Opaque cache key which indicates which documents are cached together.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct CacheKey(usize);
#[derive(Debug, Default)]
/// CacheStats for the `StoreReader`.
pub struct CacheStats {
@@ -141,35 +128,17 @@ impl StoreReader {
Ok(StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache::new(cache_num_blocks),
cache: BlockCache {
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
},
skip_index: Arc::new(skip_index),
space_usage,
})
}
/// Clones the given store reader with an independent block cache of the given size.
///
/// `cache_keys` is used to seed the forked cache from the current cache
/// if some blocks are already available.
#[cfg(feature = "quickwit")]
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
let forked = Self {
decompressor: self.decompressor,
data: self.data.clone(),
cache: BlockCache::new(cache_num_blocks),
skip_index: Arc::clone(&self.skip_index),
space_usage: self.space_usage.clone(),
};
for &CacheKey(pos) in cache_keys {
if let Some(block) = self.cache.get_from_cache(pos) {
forked.cache.put_into_cache(pos, block);
}
}
forked
}
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
self.skip_index.checkpoints()
}
@@ -183,21 +152,6 @@ impl StoreReader {
self.cache.stats()
}
/// Returns the cache key for a given document
///
/// These keys are opaque and are not used with the public API,
/// but having the same cache key means that the documents
/// will only require one I/O and decompression operation
/// when retrieve from the same store reader consecutively.
///
/// Note that looking up the cache key of a document
/// will not yet pull anything into the block cache.
#[cfg(feature = "quickwit")]
pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result<CacheKey> {
let checkpoint = self.block_checkpoint(doc_id)?;
Ok(CacheKey(checkpoint.byte_range.start))
}
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
/// document.
///