Compare commits

...

3 Commits

Author SHA1 Message Date
Pascal Seitz
722b6c5205 bump version 2023-10-25 20:41:07 +08:00
Pascal Seitz
0f2211ca44 increase min memory to 15MB for indexing
With tantivy 0.20 the minimum memory consumption per SegmentWriter increased to
12MB. 7MB are for the different fast field collectors types (they could be
lazily created). Increase the minimum memory from 3MB to 15MB.

Change memory variable naming from arena to budget.

closes #2156
2023-10-25 20:37:47 +08:00
PSeitz
21aabf961c Fix range query (#2226)
Fix range query end check in advance
Rename vars to reduce ambiguity
add tests

Fixes #2225
2023-10-25 20:37:36 +08:00
20 changed files with 228 additions and 98 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.21.0"
version = "0.21.1"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -128,7 +128,7 @@ members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sst
[[test]]
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
required-features = ["failpoints"]
[[bench]]
name = "analyzer"

View File

@@ -16,7 +16,7 @@ use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer(3_000_000).unwrap();
/// let mut index_writer = index.writer(15_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind")).unwrap();
/// index_writer.add_document(doc!(title => "The Diary of Muadib")).unwrap();
/// index_writer.add_document(doc!(title => "A Dairy Cow")).unwrap();

View File

@@ -89,7 +89,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// // a document can be associated with any number of facets
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",

View File

@@ -233,7 +233,7 @@ mod tests {
let val_field = schema_builder.add_i64_field("val_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(val_field=>12i64))?;
writer.add_document(doc!(val_field=>-30i64))?;
writer.add_document(doc!(val_field=>-12i64))?;
@@ -255,7 +255,7 @@ mod tests {
let val_field = schema_builder.add_i64_field("val_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(val_field=>12i64))?;
writer.commit()?;
writer.add_document(doc!(val_field=>-30i64))?;
@@ -280,7 +280,7 @@ mod tests {
let date_field = schema_builder.add_date_field("date_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
writer.add_document(
doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1986, Month::March, 9)?.with_hms(0, 0, 0)?)),

View File

@@ -44,7 +44,7 @@
//! # let title = schema_builder.add_text_field("title", TEXT);
//! # let schema = schema_builder.build();
//! # let index = Index::create_in_ram(schema);
//! # let mut index_writer = index.writer(3_000_000)?;
//! # let mut index_writer = index.writer(15_000_000)?;
//! # index_writer.add_document(doc!(
//! # title => "The Name of the Wind",
//! # ))?;

View File

@@ -120,7 +120,7 @@ impl<TFruit: Fruit> FruitHandle<TFruit> {
/// let title = schema_builder.add_text_field("title", TEXT);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(title => "The Name of the Wind"))?;
/// index_writer.add_document(doc!(title => "The Diary of Muadib"))?;
/// index_writer.add_document(doc!(title => "A Dairy Cow"))?;

View File

@@ -16,7 +16,7 @@ use crate::directory::error::OpenReadError;
use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_ARENA_NUM_BYTES_MIN};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::segment_updater::save_metas;
use crate::reader::{IndexReader, IndexReaderBuilder};
use crate::schema::{Field, FieldType, Schema};
@@ -523,9 +523,9 @@ impl Index {
/// - `num_threads` defines the number of indexing workers that
/// should work at the same time.
///
/// - `overall_memory_arena_in_bytes` sets the amount of memory
/// - `overall_memory_budget_in_bytes` sets the amount of memory
/// allocated for all indexing thread.
/// Each thread will receive a budget of `overall_memory_arena_in_bytes / num_threads`.
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
///
/// # Errors
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
@@ -534,7 +534,7 @@ impl Index {
pub fn writer_with_num_threads(
&self,
num_threads: usize,
overall_memory_arena_in_bytes: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter> {
let directory_lock = self
.directory
@@ -550,7 +550,7 @@ impl Index {
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_arena_in_bytes / num_threads;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
@@ -561,7 +561,7 @@ impl Index {
/// Helper to create an index writer for tests.
///
/// That index writer only simply has a single thread and a memory arena of 10 MB.
/// That index writer only simply has a single thread and a memory budget of 15 MB.
/// Using a single thread gives us a deterministic allocation of DocId.
#[cfg(test)]
pub fn writer_for_tests(&self) -> crate::Result<IndexWriter> {
@@ -579,13 +579,13 @@ impl Index {
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer(&self, memory_arena_num_bytes: usize) -> crate::Result<IndexWriter> {
pub fn writer(&self, memory_budget_in_bytes: usize) -> crate::Result<IndexWriter> {
let mut num_threads = std::cmp::min(num_cpus::get(), MAX_NUM_THREAD);
let memory_arena_num_bytes_per_thread = memory_arena_num_bytes / num_threads;
if memory_arena_num_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
num_threads = (memory_arena_num_bytes / MEMORY_ARENA_NUM_BYTES_MIN).max(1);
let memory_budget_num_bytes_per_thread = memory_budget_in_bytes / num_threads;
if memory_budget_num_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
num_threads = (memory_budget_in_bytes / MEMORY_BUDGET_NUM_BYTES_MIN).max(1);
}
self.writer_with_num_threads(num_threads, memory_arena_num_bytes)
self.writer_with_num_threads(num_threads, memory_budget_in_bytes)
}
/// Accessor to the index settings

View File

@@ -2,6 +2,7 @@ use std::collections::HashSet;
use rand::{thread_rng, Rng};
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::*;
use crate::{doc, schema, Index, IndexSettings, IndexSortByField, Order, Searcher};
@@ -30,7 +31,7 @@ fn test_functional_store() -> crate::Result<()> {
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut index_writer = index.writer_with_num_threads(3, MEMORY_BUDGET_NUM_BYTES_MIN)?;
let mut doc_set: Vec<u64> = Vec::new();

View File

@@ -27,9 +27,9 @@ use crate::{FutureResult, Opstamp};
// in the `memory_arena` goes below MARGIN_IN_BYTES.
pub const MARGIN_IN_BYTES: usize = 1_000_000;
// We impose the memory per thread to be at least 3 MB.
pub const MEMORY_ARENA_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize;
pub const MEMORY_ARENA_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
// We impose the memory per thread to be at least 15 MB, as the baseline consumption is 12MB.
pub const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize;
pub const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
// We impose the number of index writer threads to be at most this.
pub const MAX_NUM_THREAD: usize = 8;
@@ -57,7 +57,8 @@ pub struct IndexWriter {
index: Index,
memory_arena_in_bytes_per_thread: usize,
// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -264,19 +265,19 @@ impl IndexWriter {
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_arena_in_bytes_per_thread: usize,
memory_budget_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> crate::Result<IndexWriter> {
if memory_arena_in_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_ARENA_NUM_BYTES_MIN}."
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_arena_in_bytes_per_thread >= MEMORY_ARENA_NUM_BYTES_MAX {
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_ARENA_NUM_BYTES_MAX}"
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
@@ -295,7 +296,7 @@ impl IndexWriter {
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
memory_arena_in_bytes_per_thread,
memory_budget_in_bytes_per_thread,
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
@@ -396,7 +397,7 @@ impl IndexWriter {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.memory_arena_in_bytes_per_thread;
let mem_budget = self.memory_budget_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -554,7 +555,7 @@ impl IndexWriter {
let new_index_writer: IndexWriter = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_arena_in_bytes_per_thread,
self.memory_budget_in_bytes_per_thread,
directory_lock,
)?;
@@ -810,6 +811,7 @@ mod tests {
use crate::collector::TopDocs;
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
@@ -941,7 +943,7 @@ mod tests {
fn test_empty_operations_group() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let index_writer = index.writer_for_tests().unwrap();
let operations1 = vec![];
let batch_opstamp1 = index_writer.run(operations1).unwrap();
assert_eq!(batch_opstamp1, 0u64);
@@ -954,8 +956,8 @@ mod tests {
fn test_lockfile_stops_duplicates() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let _index_writer = index.writer(3_000_000).unwrap();
match index.writer(3_000_000) {
let _index_writer = index.writer_for_tests().unwrap();
match index.writer_for_tests() {
Err(TantivyError::LockFailure(LockError::LockBusy, _)) => {}
_ => panic!("Expected a `LockFailure` error"),
}
@@ -979,7 +981,7 @@ mod tests {
fn test_set_merge_policy() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let index_writer = index.writer_for_tests().unwrap();
assert_eq!(
format!("{:?}", index_writer.get_merge_policy()),
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, \
@@ -998,11 +1000,11 @@ mod tests {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
{
let _index_writer = index.writer(3_000_000).unwrap();
let _index_writer = index.writer_for_tests().unwrap();
// the lock should be released when the
// index_writer leaves the scope.
}
let _index_writer_two = index.writer(3_000_000).unwrap();
let _index_writer_two = index.writer_for_tests().unwrap();
}
#[test]
@@ -1022,7 +1024,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer(3_000_000)?;
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.rollback()?;
assert_eq!(index_writer.commit_opstamp(), 0u64);
@@ -1054,7 +1056,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.commit()?;
// this should create 1 segment
@@ -1094,7 +1096,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.commit()?;
index_writer.add_document(doc!(text_field=>"a"))?;
@@ -1140,7 +1142,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer(MEMORY_BUDGET_NUM_BYTES_MIN).unwrap();
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field=>"a"))?;
@@ -1196,7 +1198,8 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000)?;
let mut index_writer =
index.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)?;
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field => "a"))?;
@@ -1245,7 +1248,9 @@ mod tests {
let term = Term::from_field_text(text_field, s);
searcher.doc_freq(&term).unwrap()
};
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();
let commit_tstamp = index_writer.commit().unwrap();
@@ -1262,7 +1267,9 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();
@@ -1311,7 +1318,9 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let res = index_writer.delete_all_documents();
assert!(res.is_ok());
@@ -1338,7 +1347,9 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
// add one simple doc
assert!(index_writer.add_document(doc!(text_field => "a")).is_ok());
@@ -1371,7 +1382,9 @@ mod tests {
fn test_delete_all_documents_empty_index() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let clear = index_writer.delete_all_documents();
let commit = index_writer.commit();
assert!(clear.is_ok());
@@ -1382,7 +1395,9 @@ mod tests {
fn test_delete_all_documents_index_twice() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let clear = index_writer.delete_all_documents();
let commit = index_writer.commit();
assert!(clear.is_ok());
@@ -1688,7 +1703,8 @@ mod tests {
let old_reader = index.reader()?;
let id_exists = |id| id % 3 != 0; // 0 does not exist
// Every 3rd doc has only id field
let id_is_full_doc = |id| id % 3 != 0;
let multi_text_field_text1 = "test1 test2 test3 test1 test2 test3";
// rotate left
@@ -1704,7 +1720,7 @@ mod tests {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
let ip = ip_from_id(id);
if !id_exists(id) {
if !id_is_full_doc(id) {
// every 3rd doc has no ip field
index_writer.add_document(doc!(
id_field=>id,
@@ -1824,7 +1840,7 @@ mod tests {
let num_docs_with_values = expected_ids_and_num_occurrences
.iter()
.filter(|(id, _id_occurrences)| id_exists(**id))
.filter(|(id, _id_occurrences)| id_is_full_doc(**id))
.map(|(_, id_occurrences)| *id_occurrences as usize)
.sum::<usize>();
@@ -1848,7 +1864,7 @@ mod tests {
if force_end_merge && num_segments_before_merge > 1 && num_segments_after_merge == 1 {
let mut expected_multi_ips: Vec<_> = id_list
.iter()
.filter(|id| id_exists(**id))
.filter(|id| id_is_full_doc(**id))
.flat_map(|id| vec![ip_from_id(*id), ip_from_id(*id)])
.collect();
assert_eq!(num_ips, expected_multi_ips.len() as u32);
@@ -1886,7 +1902,7 @@ mod tests {
let expected_ips = expected_ids_and_num_occurrences
.keys()
.flat_map(|id| {
if !id_exists(*id) {
if !id_is_full_doc(*id) {
None
} else {
Some(Ipv6Addr::from_u128(*id as u128))
@@ -1898,7 +1914,7 @@ mod tests {
let expected_ips = expected_ids_and_num_occurrences
.keys()
.filter_map(|id| {
if !id_exists(*id) {
if !id_is_full_doc(*id) {
None
} else {
Some(Ipv6Addr::from_u128(*id as u128))
@@ -1933,7 +1949,7 @@ mod tests {
let id = id_reader.first(doc).unwrap();
let vals: Vec<u64> = ff_reader.values_for_doc(doc).collect();
if id_exists(id) {
if id_is_full_doc(id) {
assert_eq!(vals.len(), 2);
assert_eq!(vals[0], vals[1]);
assert!(expected_ids_and_num_occurrences.contains_key(&vals[0]));
@@ -1943,7 +1959,7 @@ mod tests {
}
let bool_vals: Vec<bool> = bool_ff_reader.values_for_doc(doc).collect();
if id_exists(id) {
if id_is_full_doc(id) {
assert_eq!(bool_vals.len(), 2);
assert_ne!(bool_vals[0], bool_vals[1]);
} else {
@@ -1972,7 +1988,7 @@ mod tests {
.as_u64()
.unwrap();
assert!(expected_ids_and_num_occurrences.contains_key(&id));
if id_exists(id) {
if id_is_full_doc(id) {
let id2 = store_reader
.get(doc_id)
.unwrap()
@@ -2019,7 +2035,7 @@ mod tests {
let (existing_id, count) = (*id, *count);
let get_num_hits = |field| do_search(&existing_id.to_string(), field).len() as u64;
assert_eq!(get_num_hits(id_field), count);
if !id_exists(existing_id) {
if !id_is_full_doc(existing_id) {
continue;
}
assert_eq!(get_num_hits(text_field), count);
@@ -2069,7 +2085,7 @@ mod tests {
//
for (existing_id, count) in &expected_ids_and_num_occurrences {
let (existing_id, count) = (*existing_id, *count);
if !id_exists(existing_id) {
if !id_is_full_doc(existing_id) {
continue;
}
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
@@ -2086,34 +2102,84 @@ mod tests {
}
}
// assert data is like expected
// Range query
//
for (existing_id, count) in expected_ids_and_num_occurrences.iter().take(10) {
let (existing_id, count) = (*existing_id, *count);
if !id_exists(existing_id) {
continue;
}
let gen_query_inclusive = |field: &str, from: Ipv6Addr, to: Ipv6Addr| {
format!("{}:[{} TO {}]", field, &from.to_string(), &to.to_string())
// Take half as sample
let mut sample: Vec<_> = expected_ids_and_num_occurrences.iter().collect();
sample.sort_by_key(|(k, _num_occurences)| *k);
// sample.truncate(sample.len() / 2);
if !sample.is_empty() {
let (left_sample, right_sample) = sample.split_at(sample.len() / 2);
let expected_count = |sample: &[(&u64, &u64)]| {
sample
.iter()
.filter(|(id, _)| id_is_full_doc(**id))
.map(|(_id, num_occurences)| **num_occurences)
.sum::<u64>()
};
let ip = ip_from_id(existing_id);
fn gen_query_inclusive<T1: ToString, T2: ToString>(
field: &str,
from: T1,
to: T2,
) -> String {
format!("{}:[{} TO {}]", field, &from.to_string(), &to.to_string())
}
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
// Range query on single value field
let query = gen_query_inclusive("ip", ip, ip);
assert_eq!(do_search_ip_field(&query), count);
// Query first half
if !left_sample.is_empty() {
let expected_count = expected_count(left_sample);
// Range query on multi value field
let query = gen_query_inclusive("ips", ip, ip);
let start_range = *left_sample[0].0;
let end_range = *left_sample.last().unwrap().0;
let query = gen_query_inclusive("id_opt", start_range, end_range);
assert_eq!(do_search(&query, id_opt_field).len() as u64, expected_count);
assert_eq!(do_search_ip_field(&query), count);
// Range query on ip field
let ip1 = ip_from_id(start_range);
let ip2 = ip_from_id(end_range);
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
let query = gen_query_inclusive("ip", ip1, ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
let query = gen_query_inclusive("ip", "*", ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
// Range query on multi value field
let query = gen_query_inclusive("ips", ip1, ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
let query = gen_query_inclusive("ips", "*", ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
}
// Query second half
if !right_sample.is_empty() {
let expected_count = expected_count(right_sample);
let start_range = *right_sample[0].0;
let end_range = *right_sample.last().unwrap().0;
// Range query on id opt field
let query =
gen_query_inclusive("id_opt", start_range.to_string(), end_range.to_string());
assert_eq!(do_search(&query, id_opt_field).len() as u64, expected_count);
// Range query on ip field
let ip1 = ip_from_id(start_range);
let ip2 = ip_from_id(end_range);
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
let query = gen_query_inclusive("ip", ip1, ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
let query = gen_query_inclusive("ip", ip1, "*");
assert_eq!(do_search_ip_field(&query), expected_count);
// Range query on multi value field
let query = gen_query_inclusive("ips", ip1, ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
let query = gen_query_inclusive("ips", ip1, "*");
assert_eq!(do_search_ip_field(&query), expected_count);
}
}
// ip range query on fast field
//
for (existing_id, count) in expected_ids_and_num_occurrences.iter().take(10) {
let (existing_id, count) = (*existing_id, *count);
if !id_exists(existing_id) {
if !id_is_full_doc(existing_id) {
continue;
}
let gen_query_inclusive = |field: &str, from: Ipv6Addr, to: Ipv6Addr| {
@@ -2141,7 +2207,7 @@ mod tests {
.first_or_default_col(9999);
for doc_id in segment_reader.doc_ids_alive() {
let id = ff_reader.get_val(doc_id);
if !id_exists(id) {
if !id_is_full_doc(id) {
continue;
}
let facet_ords: Vec<u64> = facet_reader.facet_ords(doc_id).collect();
@@ -2179,6 +2245,12 @@ mod tests {
Ok(index)
}
#[test]
fn test_fast_field_range() {
let ops: Vec<_> = (0..1000).map(|id| IndexingOp::AddDoc { id }).collect();
assert!(test_operation_strategy(&ops, false, true).is_ok());
}
#[test]
fn test_sort_index_on_opt_field_regression() {
assert!(test_operation_strategy(

View File

@@ -26,6 +26,8 @@ use crate::{DocId, Document, Opstamp, SegmentComponent, TantivyError};
fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
let table_memory_upper_bound = per_thread_memory_budget / 3;
(10..20) // We cap it at 2^19 = 512K capacity.
// TODO: There are cases where this limit causes a
// reallocation in the hashmap. Check if this affects performance.
.map(|power| 1 << power)
.take_while(|capacity| compute_table_memory_size(*capacity) < table_memory_upper_bound)
.last()

View File

@@ -225,7 +225,7 @@ pub mod tests {
{
let mut segment_writer =
SegmentWriter::for_segment(3_000_000, segment.clone()).unwrap();
SegmentWriter::for_segment(15_000_000, segment.clone()).unwrap();
{
// checking that position works if the field has two values
let op = AddOperation {

View File

@@ -32,7 +32,7 @@ use crate::schema::{IndexRecordOption, Term};
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;

View File

@@ -297,7 +297,7 @@ mod tests {
let text = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 5_000_000)?;
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text=>"a"))?;
index_writer.add_document(doc!(text=>"b"))?;
index_writer.commit()?;

View File

@@ -23,7 +23,7 @@ use crate::{Score, Term};
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of Girl",
/// ))?;

View File

@@ -46,7 +46,7 @@ impl Automaton for DfaWrapper {
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;

View File

@@ -31,8 +31,8 @@ impl VecCursor {
self.current_pos = 0;
&mut self.docs
}
fn last_value(&self) -> Option<u32> {
self.docs.iter().last().cloned()
fn last_doc(&self) -> Option<u32> {
self.docs.last().cloned()
}
fn is_empty(&self) -> bool {
self.current().is_none()
@@ -112,15 +112,15 @@ impl<T: Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
finished_to_end = true;
}
let last_value = self.loaded_docs.last_value();
let last_doc = self.loaded_docs.last_doc();
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
self.column.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
doc_buffer,
);
if let Some(last_value) = last_value {
while self.loaded_docs.current() == Some(last_value) {
if let Some(last_doc) = last_doc {
while self.loaded_docs.current() == Some(last_doc) {
self.loaded_docs.next();
}
}
@@ -136,7 +136,7 @@ impl<T: Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for RangeDocSe
if let Some(docid) = self.loaded_docs.next() {
return docid;
}
if self.next_fetch_start >= self.column.values.num_vals() {
if self.next_fetch_start >= self.column.num_docs() {
return TERMINATED;
}
self.fetch_block();
@@ -177,3 +177,54 @@ impl<T: Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for RangeDocSe
0 // heuristic possible by checking number of hits when fetching a block
}
}
#[cfg(test)]
mod tests {
use crate::collector::Count;
use crate::directory::RamDirectory;
use crate::query::RangeQuery;
use crate::{schema, Document, IndexBuilder};
#[test]
fn range_query_fast_optional_field_minimum() {
let mut schema_builder = schema::SchemaBuilder::new();
let id_field = schema_builder.add_text_field("id", schema::STRING);
let score_field = schema_builder.add_u64_field("score", schema::FAST | schema::INDEXED);
let dir = RamDirectory::default();
let index = IndexBuilder::new()
.schema(schema_builder.build())
.open_or_create(dir)
.unwrap();
{
let mut writer = index.writer(15_000_000).unwrap();
let count = 1000;
for i in 0..count {
let mut doc = Document::new();
doc.add_text(id_field, format!("doc{i}"));
let nb_scores = i % 2; // 0 or 1 scores
for _ in 0..nb_scores {
doc.add_u64(score_field, 80);
}
writer.add_document(doc).unwrap();
}
writer.commit().unwrap();
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query = RangeQuery::new_u64_bounds(
"score".to_string(),
std::ops::Bound::Included(70),
std::ops::Bound::Unbounded,
);
let count = searcher.search(&query, &Count).unwrap();
assert_eq!(count, 500);
}
}

View File

@@ -26,7 +26,7 @@ use crate::schema::Field;
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;

View File

@@ -27,7 +27,7 @@ use crate::Term;
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
/// ))?;
@@ -151,7 +151,7 @@ mod tests {
let ip_addr_2 = Ipv6Addr::from_u128(10);
{
let mut index_writer = index.writer(3_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(
ip_field => ip_addr_1

View File

@@ -179,6 +179,7 @@ mod tests {
use super::Warmer;
use crate::core::searcher::SearcherGeneration;
use crate::directory::RamDirectory;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::{Schema, INDEXED};
use crate::{Index, IndexSettings, ReloadPolicy, Searcher, SegmentId};
@@ -255,7 +256,10 @@ mod tests {
let num_writer_threads = 4;
let mut writer = index
.writer_with_num_threads(num_writer_threads, 25_000_000)
.writer_with_num_threads(
num_writer_threads,
MEMORY_BUDGET_NUM_BYTES_MIN * num_writer_threads,
)
.unwrap();
for i in 0u64..1000u64 {

View File

@@ -45,7 +45,7 @@ fn test_write_commit_fails() -> tantivy::Result<()> {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?;
let mut index_writer = index.writer_with_num_threads(1, 15_000_000)?;
for _ in 0..100 {
index_writer.add_document(doc!(text_field => "a"))?;
}
@@ -75,7 +75,7 @@ fn test_fail_on_flush_segment() -> tantivy::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer_with_num_threads(1, 3_000_000)?;
let index_writer = index.writer_with_num_threads(1, 15_000_000)?;
fail::cfg("FieldSerializer::close_term", "return(simulatederror)").unwrap();
for i in 0..100_000 {
if index_writer
@@ -94,7 +94,7 @@ fn test_fail_on_flush_segment_but_one_worker_remains() -> tantivy::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer_with_num_threads(2, 6_000_000)?;
let index_writer = index.writer_with_num_threads(2, 30_000_000)?;
fail::cfg("FieldSerializer::close_term", "1*return(simulatederror)").unwrap();
for i in 0..100_000 {
if index_writer
@@ -113,7 +113,7 @@ fn test_fail_on_commit_segment() -> tantivy::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?;
let mut index_writer = index.writer_with_num_threads(1, 15_000_000)?;
fail::cfg("FieldSerializer::close_term", "return(simulatederror)").unwrap();
for i in 0..10 {
index_writer