Compare commits

..

1 Commits

Author SHA1 Message Date
Pascal Seitz
1b09dda479 improve bucket size aggregation limit
postpone bucket size aggregation limit for terms aggregation
improve min_doc_count special use case 0, it loaded more texts from the
dict than segment_size limit

Note that we effectively check now that segment_size does not exceed the
bucket limit. segment_size is 10 * size. So requests would fail with
size 6500 for a bucket limit of 65000.

closes #1822
2023-01-24 00:01:01 +08:00
15 changed files with 218 additions and 118 deletions

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tantivy" name = "tantivy"
version = "0.19.1-quickwit" version = "0.19.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"] authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT" license = "MIT"
categories = ["database-implementations", "data-structures"] categories = ["database-implementations", "data-structures"]
@@ -23,7 +23,7 @@ regex = { version = "1.5.5", default-features = false, features = ["std", "unico
aho-corasick = "0.7" aho-corasick = "0.7"
tantivy-fst = "0.4.0" tantivy-fst = "0.4.0"
memmap2 = { version = "0.5.3", optional = true } memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"], optional = true } lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true } brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.12", optional = true, default-features = false } zstd = { version = "0.12", optional = true, default-features = false }
snap = { version = "1.0.5", optional = true } snap = { version = "1.0.5", optional = true }
@@ -55,7 +55,7 @@ measure_time = "0.8.2"
async-trait = "0.1.53" async-trait = "0.1.53"
arc-swap = "1.5.0" arc-swap = "1.5.0"
#columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" } columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true } sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" } stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" } tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }

View File

@@ -11,7 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{
use crate::aggregation::intermediate_agg_result::{ use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult, IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
}; };
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector}; use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::error::DataCorruption; use crate::error::DataCorruption;
use crate::fastfield::MultiValuedFastFieldReader; use crate::fastfield::MultiValuedFastFieldReader;
use crate::schema::Type; use crate::schema::Type;
@@ -268,21 +268,18 @@ impl TermBuckets {
term_ids: &[u64], term_ids: &[u64],
doc: DocId, doc: DocId,
sub_aggregation: &AggregationsWithAccessor, sub_aggregation: &AggregationsWithAccessor,
bucket_count: &BucketCount,
blueprint: &Option<SegmentAggregationResultsCollector>, blueprint: &Option<SegmentAggregationResultsCollector>,
) -> crate::Result<()> { ) -> crate::Result<()> {
for &term_id in term_ids { for &term_id in term_ids {
let entry = self.entries.entry(term_id as u32).or_insert_with(|| { let entry = self
bucket_count.add_count(1); .entries
.entry(term_id as u32)
TermBucketEntry::from_blueprint(blueprint) .or_insert_with(|| TermBucketEntry::from_blueprint(blueprint));
});
entry.doc_count += 1; entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(doc, sub_aggregation)?; sub_aggregations.collect(doc, sub_aggregation)?;
} }
} }
bucket_count.validate_bucket_count()?;
Ok(()) Ok(())
} }
@@ -362,23 +359,17 @@ impl SegmentTermCollector {
let mut entries: Vec<(u32, TermBucketEntry)> = let mut entries: Vec<(u32, TermBucketEntry)> =
self.term_buckets.entries.into_iter().collect(); self.term_buckets.entries.into_iter().collect();
let order_by_key = self.req.order.target == OrderTarget::Key;
let order_by_sub_aggregation = let order_by_sub_aggregation =
matches!(self.req.order.target, OrderTarget::SubAggregation(_)); matches!(self.req.order.target, OrderTarget::SubAggregation(_));
match self.req.order.target { match self.req.order.target {
OrderTarget::Key => { OrderTarget::Key => {
// We rely on the fact, that term ordinals match the order of the strings // defer order and cut_off after loading the texts from the dictionary
// TODO: We could have a special collector, that keeps only TOP n results at any
// time.
if self.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
} else {
entries.sort_unstable_by_key(|bucket| bucket.0);
}
} }
OrderTarget::SubAggregation(_name) => { OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the // don't sort and cut off since it's hard to make assumptions on the quality of the
// results when cutting off du to unknown nature of the sub_aggregation (possible // results when cutting off due to unknown nature of the sub_aggregation (possible
// to check). // to check).
} }
OrderTarget::Count => { OrderTarget::Count => {
@@ -390,11 +381,12 @@ impl SegmentTermCollector {
} }
} }
let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation { let (term_doc_count_before_cutoff, mut sum_other_doc_count) =
(0, 0) if order_by_key || order_by_sub_aggregation {
} else { (0, 0)
cut_off_buckets(&mut entries, self.req.segment_size as usize) } else {
}; cut_off_buckets(&mut entries, self.req.segment_size as usize)
};
let inverted_index = agg_with_accessor let inverted_index = agg_with_accessor
.inverted_index .inverted_index
@@ -429,6 +421,22 @@ impl SegmentTermCollector {
} }
} }
if order_by_key {
let mut dict_entries = dict.into_iter().collect_vec();
if self.req.order.order == Order::Desc {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key1.cmp(key2));
} else {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key2.cmp(key1));
}
let (_, sum_other_docs) =
cut_off_buckets(&mut dict_entries, self.req.segment_size as usize);
sum_other_doc_count += sum_other_docs;
dict = dict_entries.into_iter().collect();
}
agg_with_accessor.bucket_count.add_count(dict.len() as u32);
agg_with_accessor.bucket_count.validate_bucket_count()?;
Ok(IntermediateBucketResult::Terms( Ok(IntermediateBucketResult::Terms(
IntermediateTermBucketResult { IntermediateTermBucketResult {
entries: dict, entries: dict,
@@ -464,28 +472,24 @@ impl SegmentTermCollector {
&vals1, &vals1,
docs[0], docs[0],
&bucket_with_accessor.sub_aggregation, &bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint, &self.blueprint,
)?; )?;
self.term_buckets.increment_bucket( self.term_buckets.increment_bucket(
&vals2, &vals2,
docs[1], docs[1],
&bucket_with_accessor.sub_aggregation, &bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint, &self.blueprint,
)?; )?;
self.term_buckets.increment_bucket( self.term_buckets.increment_bucket(
&vals3, &vals3,
docs[2], docs[2],
&bucket_with_accessor.sub_aggregation, &bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint, &self.blueprint,
)?; )?;
self.term_buckets.increment_bucket( self.term_buckets.increment_bucket(
&vals4, &vals4,
docs[3], docs[3],
&bucket_with_accessor.sub_aggregation, &bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint, &self.blueprint,
)?; )?;
} }
@@ -496,7 +500,6 @@ impl SegmentTermCollector {
&vals1, &vals1,
doc, doc,
&bucket_with_accessor.sub_aggregation, &bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint, &self.blueprint,
)?; )?;
} }
@@ -918,14 +921,14 @@ mod tests {
]; ];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?; let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
// key asc // key desc
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Asc, order: Order::Desc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
..Default::default() ..Default::default()
@@ -952,7 +955,7 @@ mod tests {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Asc, order: Order::Desc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -976,14 +979,14 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3); assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
// key asc and segment_size cut_off // key desc and segment_size cut_off
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Asc, order: Order::Desc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -1006,14 +1009,14 @@ mod tests {
serde_json::Value::Null serde_json::Value::Null
); );
// key desc // key asc
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Desc, order: Order::Asc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
..Default::default() ..Default::default()
@@ -1033,14 +1036,14 @@ mod tests {
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5); assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
// key desc, size cut_off // key asc, size cut_off
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Desc, order: Order::Asc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -1063,14 +1066,14 @@ mod tests {
); );
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5); assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
// key desc, segment_size cut_off // key asc, segment_size cut_off
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation { bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(), field: "string_id".to_string(),
order: Some(CustomOrder { order: Some(CustomOrder {
order: Order::Desc, order: Order::Asc,
target: OrderTarget::Key, target: OrderTarget::Key,
}), }),
size: Some(2), size: Some(2),
@@ -1131,6 +1134,33 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
min_doc_count: Some(0),
size: Some(1),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
// searching for terma, but min_doc_count will return all terms
let res = exec_request_with_query(agg_req, &index, Some(("string_id", "terma")))?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
assert_eq!(
res["my_texts"]["buckets"][1]["key"],
serde_json::Value::Null
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
Ok(()) Ok(())
} }
@@ -1209,6 +1239,27 @@ mod tests {
let index = get_test_index_from_terms(true, &terms_per_segment)?; let index = get_test_index_from_terms(true, &terms_per_segment)?;
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_ok());
// This request has min_doc_count set to 0
// That means we load potentially the whole dict
// Make sure the bucket count is still fine
let agg_req: Aggregations = vec![( let agg_req: Aggregations = vec![(
"my_texts".to_string(), "my_texts".to_string(),
Aggregation::Bucket(BucketAggregation { Aggregation::Bucket(BucketAggregation {
@@ -1223,6 +1274,24 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_ok());
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
size: Some(70_000),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None); let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_err()); assert!(res.is_err());
@@ -1347,3 +1416,64 @@ mod tests {
Ok(()) Ok(())
} }
} }
#[cfg(all(test, feature = "unstable"))]
mod bench {
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
use super::*;
fn get_collector_with_buckets(num_docs: u64) -> TermBuckets {
TermBuckets::from_req_and_validate(&Default::default(), num_docs as usize).unwrap()
}
fn get_rand_terms(total_terms: u64, num_terms_returned: u64) -> Vec<u64> {
let mut rng = thread_rng();
let all_terms = (0..total_terms - 1).collect_vec();
let mut vals = vec![];
for _ in 0..num_terms_returned {
let val = all_terms.as_slice().choose(&mut rng).unwrap();
vals.push(*val);
}
vals
}
fn bench_term_buckets(b: &mut test::Bencher, num_terms: u64, total_terms: u64) {
let mut collector = get_collector_with_buckets(total_terms);
let vals = get_rand_terms(total_terms, num_terms);
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
b.iter(|| {
for &val in &vals {
collector
.increment_bucket(&[val], 0, &aggregations_with_accessor, &None)
.unwrap();
}
})
}
#[bench]
fn bench_term_buckets_500_of_1_000_000(b: &mut test::Bencher) {
bench_term_buckets(b, 500u64, 1_000_000u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_50_000(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 50_000u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_50(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 50u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_1_000_000(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 1_000_000u64)
}
}

View File

@@ -499,7 +499,7 @@ impl IntermediateTermBucketResult {
match req.order.target { match req.order.target {
OrderTarget::Key => { OrderTarget::Key => {
buckets.sort_by(|left, right| { buckets.sort_by(|left, right| {
if req.order.order == Order::Asc { if req.order.order == Order::Desc {
left.key.partial_cmp(&right.key) left.key.partial_cmp(&right.key)
} else { } else {
right.key.partial_cmp(&left.key) right.key.partial_cmp(&left.key)

View File

@@ -1156,6 +1156,12 @@ mod tests {
r#"FieldNotFound("not_exist_field")"# r#"FieldNotFound("not_exist_field")"#
); );
let agg_res = avg_on_field("scores_i64");
assert_eq!(
format!("{:?}", agg_res),
r#"InvalidArgument("Invalid field cardinality on field scores_i64 expected SingleValue, but got MultiValues")"#
);
Ok(()) Ok(())
} }

View File

@@ -135,8 +135,6 @@ impl InvertedIndexReader {
term_info: &TermInfo, term_info: &TermInfo,
option: IndexRecordOption, option: IndexRecordOption,
) -> io::Result<SegmentPostings> { ) -> io::Result<SegmentPostings> {
let option = option.downgrade(self.record_option);
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?; let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
let position_reader = { let position_reader = {
if option.has_positions() { if option.has_positions() {

View File

@@ -249,7 +249,7 @@ impl SearcherInner {
index: Index, index: Index,
segment_readers: Vec<SegmentReader>, segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>, generation: TrackedObject<SearcherGeneration>,
doc_store_cache_num_blocks: usize, doc_store_cache_size: usize,
) -> io::Result<SearcherInner> { ) -> io::Result<SearcherInner> {
assert_eq!( assert_eq!(
&segment_readers &segment_readers
@@ -261,7 +261,7 @@ impl SearcherInner {
); );
let store_readers: Vec<StoreReader> = segment_readers let store_readers: Vec<StoreReader> = segment_readers
.iter() .iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_num_blocks)) .map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.collect::<io::Result<Vec<_>>>()?; .collect::<io::Result<Vec<_>>>()?;
Ok(SearcherInner { Ok(SearcherInner {

View File

@@ -134,12 +134,9 @@ impl SegmentReader {
&self.fieldnorm_readers &self.fieldnorm_readers
} }
/// Accessor to the segment's [`StoreReader`](crate::store::StoreReader). /// Accessor to the segment's `StoreReader`.
/// pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU. StoreReader::open(self.store_file.clone(), cache_size)
/// The size of blocks is configurable, this should be reflexted in the
pub fn get_store_reader(&self, cache_num_blocks: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_num_blocks)
} }
/// Open a new segment for reading. /// Open a new segment for reading.

View File

@@ -834,23 +834,20 @@ mod tests {
// This is a bit of a contrived example. // This is a bit of a contrived example.
let tokens = PreTokenizedString { let tokens = PreTokenizedString {
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life. text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
tokens: vec![ tokens: vec![Token { // Not the last token, yet ends after the last token.
Token { offset_from: 0,
// Not the last token, yet ends after the last token. offset_to: 14,
offset_from: 0, position: 0,
offset_to: 14, text: "long_token".to_string(),
position: 0, position_length: 3,
text: "long_token".to_string(), },
position_length: 3, Token {
}, offset_from: 0,
Token { offset_to: 14,
offset_from: 0, position: 1,
offset_to: 14, text: "short".to_string(),
position: 1, position_length: 1,
text: "short".to_string(), }],
position_length: 1,
},
],
}; };
doc.add_pre_tokenized_text(text, tokens); doc.add_pre_tokenized_text(text, tokens);
doc.add_text(text, "hello"); doc.add_text(text, "hello");

View File

@@ -109,7 +109,6 @@ impl TermQuery {
} else { } else {
IndexRecordOption::Basic IndexRecordOption::Basic
}; };
Ok(TermWeight::new( Ok(TermWeight::new(
self.term.clone(), self.term.clone(),
index_record_option, index_record_option,

View File

@@ -44,7 +44,7 @@ pub struct IndexReaderBuilder {
index: Index, index: Index,
warmers: Vec<Weak<dyn Warmer>>, warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize, num_warming_threads: usize,
doc_store_cache_num_blocks: usize, doc_store_cache_size: usize,
} }
impl IndexReaderBuilder { impl IndexReaderBuilder {
@@ -55,7 +55,7 @@ impl IndexReaderBuilder {
index, index,
warmers: Vec::new(), warmers: Vec::new(),
num_warming_threads: 1, num_warming_threads: 1,
doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY, doc_store_cache_size: DOCSTORE_CACHE_CAPACITY,
} }
} }
@@ -72,7 +72,7 @@ impl IndexReaderBuilder {
searcher_generation_inventory.clone(), searcher_generation_inventory.clone(),
)?; )?;
let inner_reader = InnerIndexReader::new( let inner_reader = InnerIndexReader::new(
self.doc_store_cache_num_blocks, self.doc_store_cache_size,
self.index, self.index,
warming_state, warming_state,
searcher_generation_inventory, searcher_generation_inventory,
@@ -119,11 +119,8 @@ impl IndexReaderBuilder {
/// ///
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks. /// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
#[must_use] #[must_use]
pub fn doc_store_cache_num_blocks( pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder {
mut self, self.doc_store_cache_size = doc_store_cache_size;
doc_store_cache_num_blocks: usize,
) -> IndexReaderBuilder {
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
self self
} }
@@ -154,7 +151,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
} }
struct InnerIndexReader { struct InnerIndexReader {
doc_store_cache_num_blocks: usize, doc_store_cache_size: usize,
index: Index, index: Index,
warming_state: WarmingState, warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>, searcher: arc_swap::ArcSwap<SearcherInner>,
@@ -164,7 +161,7 @@ struct InnerIndexReader {
impl InnerIndexReader { impl InnerIndexReader {
fn new( fn new(
doc_store_cache_num_blocks: usize, doc_store_cache_size: usize,
index: Index, index: Index,
warming_state: WarmingState, warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the // The searcher_generation_inventory is not used as source, but as target to track the
@@ -175,13 +172,13 @@ impl InnerIndexReader {
let searcher = Self::create_searcher( let searcher = Self::create_searcher(
&index, &index,
doc_store_cache_num_blocks, doc_store_cache_size,
&warming_state, &warming_state,
&searcher_generation_counter, &searcher_generation_counter,
&searcher_generation_inventory, &searcher_generation_inventory,
)?; )?;
Ok(InnerIndexReader { Ok(InnerIndexReader {
doc_store_cache_num_blocks, doc_store_cache_size,
index, index,
warming_state, warming_state,
searcher: ArcSwap::from(searcher), searcher: ArcSwap::from(searcher),
@@ -217,7 +214,7 @@ impl InnerIndexReader {
fn create_searcher( fn create_searcher(
index: &Index, index: &Index,
doc_store_cache_num_blocks: usize, doc_store_cache_size: usize,
warming_state: &WarmingState, warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>, searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>, searcher_generation_inventory: &Inventory<SearcherGeneration>,
@@ -235,7 +232,7 @@ impl InnerIndexReader {
index.clone(), index.clone(),
segment_readers, segment_readers,
searcher_generation, searcher_generation,
doc_store_cache_num_blocks, doc_store_cache_size,
)?); )?);
warming_state.warm_new_searcher_generation(&searcher.clone().into())?; warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
@@ -245,7 +242,7 @@ impl InnerIndexReader {
fn reload(&self) -> crate::Result<()> { fn reload(&self) -> crate::Result<()> {
let searcher = Self::create_searcher( let searcher = Self::create_searcher(
&self.index, &self.index,
self.doc_store_cache_num_blocks, self.doc_store_cache_size,
&self.warming_state, &self.warming_state,
&self.searcher_generation_counter, &self.searcher_generation_counter,
&self.searcher_generation_inventory, &self.searcher_generation_inventory,

View File

@@ -49,17 +49,4 @@ impl IndexRecordOption {
IndexRecordOption::WithFreqsAndPositions => true, IndexRecordOption::WithFreqsAndPositions => true,
} }
} }
/// Downgrades to the next level if provided `IndexRecordOption` is unavailable.
pub fn downgrade(&self, other: IndexRecordOption) -> IndexRecordOption {
use IndexRecordOption::*;
match (other, self) {
(WithFreqsAndPositions, WithFreqsAndPositions) => WithFreqsAndPositions,
(WithFreqs, WithFreqs) => WithFreqs,
(WithFreqsAndPositions, WithFreqs) => WithFreqs,
(WithFreqs, WithFreqsAndPositions) => WithFreqs,
_ => Basic,
}
}
} }

View File

@@ -375,8 +375,7 @@ where B: AsRef<[u8]>
/// ///
/// Do NOT rely on this byte representation in the index. /// Do NOT rely on this byte representation in the index.
/// This value is likely to change in the future. /// This value is likely to change in the future.
#[inline(always)] pub(crate) fn as_slice(&self) -> &[u8] {
pub fn as_slice(&self) -> &[u8] {
self.0.as_ref() self.0.as_ref()
} }
} }

View File

@@ -90,7 +90,7 @@ impl CheckpointBlock {
return Ok(()); return Ok(());
} }
let mut doc = read_u32_vint(data); let mut doc = read_u32_vint(data);
let mut start_offset = VInt::deserialize_u64(data)? as usize; let mut start_offset = read_u32_vint(data) as usize;
for _ in 0..len { for _ in 0..len {
let num_docs = read_u32_vint(data); let num_docs = read_u32_vint(data);
let block_num_bytes = read_u32_vint(data) as usize; let block_num_bytes = read_u32_vint(data) as usize;
@@ -147,15 +147,6 @@ mod tests {
test_aux_ser_deser(&checkpoints) test_aux_ser_deser(&checkpoints)
} }
#[test]
fn test_block_serialize_large_byte_range() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
doc_range: 10..12,
byte_range: 8_000_000_000..9_000_000_000,
}];
test_aux_ser_deser(&checkpoints)
}
#[test] #[test]
fn test_block_serialize() -> io::Result<()> { fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect(); let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect();

View File

@@ -4,8 +4,8 @@
//! order to be handled in the `Store`. //! order to be handled in the `Store`.
//! //!
//! Internally, documents (or rather their stored fields) are serialized to a buffer. //! Internally, documents (or rather their stored fields) are serialized to a buffer.
//! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed using `brotli`, //! When the buffer exceeds 16K, the buffer is compressed using `brotli`, `LZ4` or `snappy`
//! `LZ4` or `snappy` and the resulting block is written to disk. //! and the resulting block is written to disk.
//! //!
//! One can then request for a specific `DocId`. //! One can then request for a specific `DocId`.
//! A skip list helps navigating to the right block, //! A skip list helps navigating to the right block,
@@ -28,6 +28,8 @@
//! - at the segment level, the //! - at the segment level, the
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc) //! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method //! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method
//!
//! !
mod compressors; mod compressors;
mod decompressors; mod decompressors;

View File

@@ -114,10 +114,7 @@ impl Sum for CacheStats {
impl StoreReader { impl StoreReader {
/// Opens a store reader /// Opens a store reader
/// pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
/// The size of blocks is configurable, this should be reflexted in the
pub fn open(store_file: FileSlice, cache_num_blocks: usize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?; let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize); let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
@@ -128,8 +125,8 @@ impl StoreReader {
decompressor: footer.decompressor, decompressor: footer.decompressor,
data: data_file, data: data_file,
cache: BlockCache { cache: BlockCache {
cache: NonZeroUsize::new(cache_num_blocks) cache: NonZeroUsize::new(cache_size)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))), .map(|cache_size| Mutex::new(LruCache::new(cache_size))),
cache_hits: Default::default(), cache_hits: Default::default(),
cache_misses: Default::default(), cache_misses: Default::default(),
}, },