Compare commits

...

6 Commits

Author SHA1 Message Date
PSeitz
7ce8a65619 fix: doc store for files larger 4GB (#1856)
Fixes an issue in the skip list deserialization, which deserialized the byte start offset incorrectly as u32.
`get_doc` will fail for any docs that live in a block with start offset larger than u32::MAX (~4GB).
Causes index corruption, if a segment with a doc store larger 4GB is merged.

tantivy version 0.19 is affected
2023-03-13 15:07:55 +09:00
PSeitz
7bf0a14041 fix: auto downgrade index record option, instead of vint error (#1857)
Prev: thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: IoError(Custom { kind: InvalidData, error: "Reach end of buffer while reading VInt" })', src/main.rs:46:14
Now: Automatic downgrade to next available level
2023-03-13 15:07:28 +09:00
PSeitz
c91d4e4e65 fix sort order test for term aggregation (#1858)
fix sort order test for term aggregation
fix invalid request test
2023-03-13 13:49:10 +08:00
PSeitz
6f6f639170 fmt code, update lz4_flex (#1838)
formatting on nightly changed
2023-03-13 14:14:15 +09:00
Paul Masurel
a022e97dc2 Bumped tantivy version 2023-03-13 14:10:41 +09:00
Paul Masurel
6474a0f58e Created branch specifically for Quickwit 0.5 2023-03-11 12:27:20 +09:00
10 changed files with 77 additions and 124 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.19.0"
version = "0.19.1-quickwit"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -23,7 +23,7 @@ regex = { version = "1.5.5", default-features = false, features = ["std", "unico
aho-corasick = "0.7"
tantivy-fst = "0.4.0"
memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.12", optional = true, default-features = false }
snap = { version = "1.0.5", optional = true }
@@ -55,7 +55,7 @@ measure_time = "0.8.2"
async-trait = "0.1.53"
arc-swap = "1.5.0"
columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
#columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }

View File

@@ -362,13 +362,19 @@ impl SegmentTermCollector {
let mut entries: Vec<(u32, TermBucketEntry)> =
self.term_buckets.entries.into_iter().collect();
let order_by_key = self.req.order.target == OrderTarget::Key;
let order_by_sub_aggregation =
matches!(self.req.order.target, OrderTarget::SubAggregation(_));
match self.req.order.target {
OrderTarget::Key => {
// defer order and cut_off after loading the texts from the dictionary
// We rely on the fact, that term ordinals match the order of the strings
// TODO: We could have a special collector, that keeps only TOP n results at any
// time.
if self.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
} else {
entries.sort_unstable_by_key(|bucket| bucket.0);
}
}
OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the
@@ -384,12 +390,11 @@ impl SegmentTermCollector {
}
}
let (term_doc_count_before_cutoff, mut sum_other_doc_count) =
if order_by_key || order_by_sub_aggregation {
(0, 0)
} else {
cut_off_buckets(&mut entries, self.req.segment_size as usize)
};
let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation {
(0, 0)
} else {
cut_off_buckets(&mut entries, self.req.segment_size as usize)
};
let inverted_index = agg_with_accessor
.inverted_index
@@ -412,6 +417,10 @@ impl SegmentTermCollector {
if self.req.min_doc_count == 0 {
let mut stream = term_dict.stream()?;
while let Some((key, _ord)) = stream.next() {
if dict.len() >= self.req.segment_size as usize {
break;
}
let key = std::str::from_utf8(key)
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
if !dict.contains_key(key) {
@@ -420,20 +429,6 @@ impl SegmentTermCollector {
}
}
if order_by_key {
let mut dict_entries = dict.into_iter().collect_vec();
if self.req.order.order == Order::Desc {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key1.cmp(key2));
} else {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key2.cmp(key1));
}
let (_, sum_other_docs) =
cut_off_buckets(&mut dict_entries, self.req.segment_size as usize);
sum_other_doc_count += sum_other_docs;
dict = dict_entries.into_iter().collect();
}
Ok(IntermediateBucketResult::Terms(
IntermediateTermBucketResult {
entries: dict,
@@ -923,14 +918,14 @@ mod tests {
];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
// key desc
// key asc
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Desc,
order: Order::Asc,
target: OrderTarget::Key,
}),
..Default::default()
@@ -957,7 +952,7 @@ mod tests {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Desc,
order: Order::Asc,
target: OrderTarget::Key,
}),
size: Some(2),
@@ -981,14 +976,14 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
// key desc and segment_size cut_off
// key asc and segment_size cut_off
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Desc,
order: Order::Asc,
target: OrderTarget::Key,
}),
size: Some(2),
@@ -1011,14 +1006,14 @@ mod tests {
serde_json::Value::Null
);
// key asc
// key desc
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Asc,
order: Order::Desc,
target: OrderTarget::Key,
}),
..Default::default()
@@ -1038,14 +1033,14 @@ mod tests {
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
// key asc, size cut_off
// key desc, size cut_off
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Asc,
order: Order::Desc,
target: OrderTarget::Key,
}),
size: Some(2),
@@ -1068,14 +1063,14 @@ mod tests {
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
// key asc, segment_size cut_off
// key desc, segment_size cut_off
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Asc,
order: Order::Desc,
target: OrderTarget::Key,
}),
size: Some(2),
@@ -1352,68 +1347,3 @@ mod tests {
Ok(())
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
use super::*;
fn get_collector_with_buckets(num_docs: u64) -> TermBuckets {
TermBuckets::from_req_and_validate(&Default::default(), num_docs as usize).unwrap()
}
fn get_rand_terms(total_terms: u64, num_terms_returned: u64) -> Vec<u64> {
let mut rng = thread_rng();
let all_terms = (0..total_terms - 1).collect_vec();
let mut vals = vec![];
for _ in 0..num_terms_returned {
let val = all_terms.as_slice().choose(&mut rng).unwrap();
vals.push(*val);
}
vals
}
fn bench_term_buckets(b: &mut test::Bencher, num_terms: u64, total_terms: u64) {
let mut collector = get_collector_with_buckets(total_terms);
let vals = get_rand_terms(total_terms, num_terms);
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
let bucket_count: BucketCount = BucketCount {
bucket_count: Default::default(),
max_bucket_count: 1_000_001u32,
};
b.iter(|| {
for &val in &vals {
collector
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
.unwrap();
}
})
}
#[bench]
fn bench_term_buckets_500_of_1_000_000(b: &mut test::Bencher) {
bench_term_buckets(b, 500u64, 1_000_000u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_50_000(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 50_000u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_50(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 50u64)
}
#[bench]
fn bench_term_buckets_1_000_000_of_1_000_000(b: &mut test::Bencher) {
bench_term_buckets(b, 1_000_000u64, 1_000_000u64)
}
}

View File

@@ -499,7 +499,7 @@ impl IntermediateTermBucketResult {
match req.order.target {
OrderTarget::Key => {
buckets.sort_by(|left, right| {
if req.order.order == Order::Desc {
if req.order.order == Order::Asc {
left.key.partial_cmp(&right.key)
} else {
right.key.partial_cmp(&left.key)

View File

@@ -1156,12 +1156,6 @@ mod tests {
r#"FieldNotFound("not_exist_field")"#
);
let agg_res = avg_on_field("scores_i64");
assert_eq!(
format!("{:?}", agg_res),
r#"InvalidArgument("Invalid field cardinality on field scores_i64 expected SingleValue, but got MultiValues")"#
);
Ok(())
}

View File

@@ -135,6 +135,8 @@ impl InvertedIndexReader {
term_info: &TermInfo,
option: IndexRecordOption,
) -> io::Result<SegmentPostings> {
let option = option.downgrade(self.record_option);
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
let position_reader = {
if option.has_positions() {

View File

@@ -834,20 +834,23 @@ mod tests {
// This is a bit of a contrived example.
let tokens = PreTokenizedString {
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
tokens: vec![Token { // Not the last token, yet ends after the last token.
offset_from: 0,
offset_to: 14,
position: 0,
text: "long_token".to_string(),
position_length: 3,
},
Token {
offset_from: 0,
offset_to: 14,
position: 1,
text: "short".to_string(),
position_length: 1,
}],
tokens: vec![
Token {
// Not the last token, yet ends after the last token.
offset_from: 0,
offset_to: 14,
position: 0,
text: "long_token".to_string(),
position_length: 3,
},
Token {
offset_from: 0,
offset_to: 14,
position: 1,
text: "short".to_string(),
position_length: 1,
},
],
};
doc.add_pre_tokenized_text(text, tokens);
doc.add_text(text, "hello");

View File

@@ -109,6 +109,7 @@ impl TermQuery {
} else {
IndexRecordOption::Basic
};
Ok(TermWeight::new(
self.term.clone(),
index_record_option,

View File

@@ -49,4 +49,17 @@ impl IndexRecordOption {
IndexRecordOption::WithFreqsAndPositions => true,
}
}
/// Downgrades to the next level if provided `IndexRecordOption` is unavailable.
pub fn downgrade(&self, other: IndexRecordOption) -> IndexRecordOption {
use IndexRecordOption::*;
match (other, self) {
(WithFreqsAndPositions, WithFreqsAndPositions) => WithFreqsAndPositions,
(WithFreqs, WithFreqs) => WithFreqs,
(WithFreqsAndPositions, WithFreqs) => WithFreqs,
(WithFreqs, WithFreqsAndPositions) => WithFreqs,
_ => Basic,
}
}
}

View File

@@ -375,7 +375,8 @@ where B: AsRef<[u8]>
///
/// Do NOT rely on this byte representation in the index.
/// This value is likely to change in the future.
pub(crate) fn as_slice(&self) -> &[u8] {
#[inline(always)]
pub fn as_slice(&self) -> &[u8] {
self.0.as_ref()
}
}

View File

@@ -90,7 +90,7 @@ impl CheckpointBlock {
return Ok(());
}
let mut doc = read_u32_vint(data);
let mut start_offset = read_u32_vint(data) as usize;
let mut start_offset = VInt::deserialize_u64(data)? as usize;
for _ in 0..len {
let num_docs = read_u32_vint(data);
let block_num_bytes = read_u32_vint(data) as usize;
@@ -147,6 +147,15 @@ mod tests {
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize_large_byte_range() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
doc_range: 10..12,
byte_range: 8_000_000_000..9_000_000_000,
}];
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect();