mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
Compare commits
1 Commits
block-cach
...
term_agg_b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b09dda479 |
@@ -11,7 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{
|
|||||||
use crate::aggregation::intermediate_agg_result::{
|
use crate::aggregation::intermediate_agg_result::{
|
||||||
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
|
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
|
||||||
};
|
};
|
||||||
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
|
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
|
||||||
use crate::error::DataCorruption;
|
use crate::error::DataCorruption;
|
||||||
use crate::fastfield::MultiValuedFastFieldReader;
|
use crate::fastfield::MultiValuedFastFieldReader;
|
||||||
use crate::schema::Type;
|
use crate::schema::Type;
|
||||||
@@ -268,21 +268,18 @@ impl TermBuckets {
|
|||||||
term_ids: &[u64],
|
term_ids: &[u64],
|
||||||
doc: DocId,
|
doc: DocId,
|
||||||
sub_aggregation: &AggregationsWithAccessor,
|
sub_aggregation: &AggregationsWithAccessor,
|
||||||
bucket_count: &BucketCount,
|
|
||||||
blueprint: &Option<SegmentAggregationResultsCollector>,
|
blueprint: &Option<SegmentAggregationResultsCollector>,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
for &term_id in term_ids {
|
for &term_id in term_ids {
|
||||||
let entry = self.entries.entry(term_id as u32).or_insert_with(|| {
|
let entry = self
|
||||||
bucket_count.add_count(1);
|
.entries
|
||||||
|
.entry(term_id as u32)
|
||||||
TermBucketEntry::from_blueprint(blueprint)
|
.or_insert_with(|| TermBucketEntry::from_blueprint(blueprint));
|
||||||
});
|
|
||||||
entry.doc_count += 1;
|
entry.doc_count += 1;
|
||||||
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
|
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
|
||||||
sub_aggregations.collect(doc, sub_aggregation)?;
|
sub_aggregations.collect(doc, sub_aggregation)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bucket_count.validate_bucket_count()?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -372,7 +369,7 @@ impl SegmentTermCollector {
|
|||||||
}
|
}
|
||||||
OrderTarget::SubAggregation(_name) => {
|
OrderTarget::SubAggregation(_name) => {
|
||||||
// don't sort and cut off since it's hard to make assumptions on the quality of the
|
// don't sort and cut off since it's hard to make assumptions on the quality of the
|
||||||
// results when cutting off du to unknown nature of the sub_aggregation (possible
|
// results when cutting off due to unknown nature of the sub_aggregation (possible
|
||||||
// to check).
|
// to check).
|
||||||
}
|
}
|
||||||
OrderTarget::Count => {
|
OrderTarget::Count => {
|
||||||
@@ -412,6 +409,10 @@ impl SegmentTermCollector {
|
|||||||
if self.req.min_doc_count == 0 {
|
if self.req.min_doc_count == 0 {
|
||||||
let mut stream = term_dict.stream()?;
|
let mut stream = term_dict.stream()?;
|
||||||
while let Some((key, _ord)) = stream.next() {
|
while let Some((key, _ord)) = stream.next() {
|
||||||
|
if dict.len() >= self.req.segment_size as usize {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
let key = std::str::from_utf8(key)
|
let key = std::str::from_utf8(key)
|
||||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
||||||
if !dict.contains_key(key) {
|
if !dict.contains_key(key) {
|
||||||
@@ -433,6 +434,8 @@ impl SegmentTermCollector {
|
|||||||
sum_other_doc_count += sum_other_docs;
|
sum_other_doc_count += sum_other_docs;
|
||||||
dict = dict_entries.into_iter().collect();
|
dict = dict_entries.into_iter().collect();
|
||||||
}
|
}
|
||||||
|
agg_with_accessor.bucket_count.add_count(dict.len() as u32);
|
||||||
|
agg_with_accessor.bucket_count.validate_bucket_count()?;
|
||||||
|
|
||||||
Ok(IntermediateBucketResult::Terms(
|
Ok(IntermediateBucketResult::Terms(
|
||||||
IntermediateTermBucketResult {
|
IntermediateTermBucketResult {
|
||||||
@@ -469,28 +472,24 @@ impl SegmentTermCollector {
|
|||||||
&vals1,
|
&vals1,
|
||||||
docs[0],
|
docs[0],
|
||||||
&bucket_with_accessor.sub_aggregation,
|
&bucket_with_accessor.sub_aggregation,
|
||||||
&bucket_with_accessor.bucket_count,
|
|
||||||
&self.blueprint,
|
&self.blueprint,
|
||||||
)?;
|
)?;
|
||||||
self.term_buckets.increment_bucket(
|
self.term_buckets.increment_bucket(
|
||||||
&vals2,
|
&vals2,
|
||||||
docs[1],
|
docs[1],
|
||||||
&bucket_with_accessor.sub_aggregation,
|
&bucket_with_accessor.sub_aggregation,
|
||||||
&bucket_with_accessor.bucket_count,
|
|
||||||
&self.blueprint,
|
&self.blueprint,
|
||||||
)?;
|
)?;
|
||||||
self.term_buckets.increment_bucket(
|
self.term_buckets.increment_bucket(
|
||||||
&vals3,
|
&vals3,
|
||||||
docs[2],
|
docs[2],
|
||||||
&bucket_with_accessor.sub_aggregation,
|
&bucket_with_accessor.sub_aggregation,
|
||||||
&bucket_with_accessor.bucket_count,
|
|
||||||
&self.blueprint,
|
&self.blueprint,
|
||||||
)?;
|
)?;
|
||||||
self.term_buckets.increment_bucket(
|
self.term_buckets.increment_bucket(
|
||||||
&vals4,
|
&vals4,
|
||||||
docs[3],
|
docs[3],
|
||||||
&bucket_with_accessor.sub_aggregation,
|
&bucket_with_accessor.sub_aggregation,
|
||||||
&bucket_with_accessor.bucket_count,
|
|
||||||
&self.blueprint,
|
&self.blueprint,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@@ -501,7 +500,6 @@ impl SegmentTermCollector {
|
|||||||
&vals1,
|
&vals1,
|
||||||
doc,
|
doc,
|
||||||
&bucket_with_accessor.sub_aggregation,
|
&bucket_with_accessor.sub_aggregation,
|
||||||
&bucket_with_accessor.bucket_count,
|
|
||||||
&self.blueprint,
|
&self.blueprint,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@@ -1136,6 +1134,33 @@ mod tests {
|
|||||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
||||||
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
|
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
|
||||||
|
|
||||||
|
let agg_req: Aggregations = vec![(
|
||||||
|
"my_texts".to_string(),
|
||||||
|
Aggregation::Bucket(BucketAggregation {
|
||||||
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
|
field: "string_id".to_string(),
|
||||||
|
min_doc_count: Some(0),
|
||||||
|
size: Some(1),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
sub_aggregation: Default::default(),
|
||||||
|
}),
|
||||||
|
)]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// searching for terma, but min_doc_count will return all terms
|
||||||
|
let res = exec_request_with_query(agg_req, &index, Some(("string_id", "terma")))?;
|
||||||
|
|
||||||
|
assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
|
||||||
|
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
|
||||||
|
assert_eq!(
|
||||||
|
res["my_texts"]["buckets"][1]["key"],
|
||||||
|
serde_json::Value::Null
|
||||||
|
);
|
||||||
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
||||||
|
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1214,6 +1239,27 @@ mod tests {
|
|||||||
|
|
||||||
let index = get_test_index_from_terms(true, &terms_per_segment)?;
|
let index = get_test_index_from_terms(true, &terms_per_segment)?;
|
||||||
|
|
||||||
|
let agg_req: Aggregations = vec![(
|
||||||
|
"my_texts".to_string(),
|
||||||
|
Aggregation::Bucket(BucketAggregation {
|
||||||
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
|
field: "string_id".to_string(),
|
||||||
|
// min_doc_count: Some(0),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
sub_aggregation: Default::default(),
|
||||||
|
}),
|
||||||
|
)]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let res = exec_request_with_query(agg_req, &index, None);
|
||||||
|
|
||||||
|
assert!(res.is_ok());
|
||||||
|
|
||||||
|
// This request has min_doc_count set to 0
|
||||||
|
// That means we load potentially the whole dict
|
||||||
|
// Make sure the bucket count is still fine
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
@@ -1228,6 +1274,24 @@ mod tests {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let res = exec_request_with_query(agg_req, &index, None);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
|
||||||
|
let agg_req: Aggregations = vec![(
|
||||||
|
"my_texts".to_string(),
|
||||||
|
Aggregation::Bucket(BucketAggregation {
|
||||||
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
|
field: "string_id".to_string(),
|
||||||
|
// min_doc_count: Some(0),
|
||||||
|
size: Some(70_000),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
sub_aggregation: Default::default(),
|
||||||
|
}),
|
||||||
|
)]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
let res = exec_request_with_query(agg_req, &index, None);
|
let res = exec_request_with_query(agg_req, &index, None);
|
||||||
|
|
||||||
assert!(res.is_err());
|
assert!(res.is_err());
|
||||||
@@ -1384,14 +1448,10 @@ mod bench {
|
|||||||
let mut collector = get_collector_with_buckets(total_terms);
|
let mut collector = get_collector_with_buckets(total_terms);
|
||||||
let vals = get_rand_terms(total_terms, num_terms);
|
let vals = get_rand_terms(total_terms, num_terms);
|
||||||
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
|
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
|
||||||
let bucket_count: BucketCount = BucketCount {
|
|
||||||
bucket_count: Default::default(),
|
|
||||||
max_bucket_count: 1_000_001u32,
|
|
||||||
};
|
|
||||||
b.iter(|| {
|
b.iter(|| {
|
||||||
for &val in &vals {
|
for &val in &vals {
|
||||||
collector
|
collector
|
||||||
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
|
.increment_bucket(&[val], 0, &aggregations_with_accessor, &None)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user