mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
1 Commits
store-read
...
term_agg_b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b09dda479 |
@@ -11,7 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
|
||||
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::MultiValuedFastFieldReader;
|
||||
use crate::schema::Type;
|
||||
@@ -268,21 +268,18 @@ impl TermBuckets {
|
||||
term_ids: &[u64],
|
||||
doc: DocId,
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
bucket_count: &BucketCount,
|
||||
blueprint: &Option<SegmentAggregationResultsCollector>,
|
||||
) -> crate::Result<()> {
|
||||
for &term_id in term_ids {
|
||||
let entry = self.entries.entry(term_id as u32).or_insert_with(|| {
|
||||
bucket_count.add_count(1);
|
||||
|
||||
TermBucketEntry::from_blueprint(blueprint)
|
||||
});
|
||||
let entry = self
|
||||
.entries
|
||||
.entry(term_id as u32)
|
||||
.or_insert_with(|| TermBucketEntry::from_blueprint(blueprint));
|
||||
entry.doc_count += 1;
|
||||
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
|
||||
sub_aggregations.collect(doc, sub_aggregation)?;
|
||||
}
|
||||
}
|
||||
bucket_count.validate_bucket_count()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -372,7 +369,7 @@ impl SegmentTermCollector {
|
||||
}
|
||||
OrderTarget::SubAggregation(_name) => {
|
||||
// don't sort and cut off since it's hard to make assumptions on the quality of the
|
||||
// results when cutting off du to unknown nature of the sub_aggregation (possible
|
||||
// results when cutting off due to unknown nature of the sub_aggregation (possible
|
||||
// to check).
|
||||
}
|
||||
OrderTarget::Count => {
|
||||
@@ -412,6 +409,10 @@ impl SegmentTermCollector {
|
||||
if self.req.min_doc_count == 0 {
|
||||
let mut stream = term_dict.stream()?;
|
||||
while let Some((key, _ord)) = stream.next() {
|
||||
if dict.len() >= self.req.segment_size as usize {
|
||||
break;
|
||||
}
|
||||
|
||||
let key = std::str::from_utf8(key)
|
||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
||||
if !dict.contains_key(key) {
|
||||
@@ -433,6 +434,8 @@ impl SegmentTermCollector {
|
||||
sum_other_doc_count += sum_other_docs;
|
||||
dict = dict_entries.into_iter().collect();
|
||||
}
|
||||
agg_with_accessor.bucket_count.add_count(dict.len() as u32);
|
||||
agg_with_accessor.bucket_count.validate_bucket_count()?;
|
||||
|
||||
Ok(IntermediateBucketResult::Terms(
|
||||
IntermediateTermBucketResult {
|
||||
@@ -469,28 +472,24 @@ impl SegmentTermCollector {
|
||||
&vals1,
|
||||
docs[0],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals2,
|
||||
docs[1],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals3,
|
||||
docs[2],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals4,
|
||||
docs[3],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
}
|
||||
@@ -501,7 +500,6 @@ impl SegmentTermCollector {
|
||||
&vals1,
|
||||
doc,
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
}
|
||||
@@ -1136,6 +1134,33 @@ mod tests {
|
||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
||||
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
|
||||
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_texts".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "string_id".to_string(),
|
||||
min_doc_count: Some(0),
|
||||
size: Some(1),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// searching for terma, but min_doc_count will return all terms
|
||||
let res = exec_request_with_query(agg_req, &index, Some(("string_id", "terma")))?;
|
||||
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
|
||||
assert_eq!(
|
||||
res["my_texts"]["buckets"][1]["key"],
|
||||
serde_json::Value::Null
|
||||
);
|
||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
||||
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1214,6 +1239,27 @@ mod tests {
|
||||
|
||||
let index = get_test_index_from_terms(true, &terms_per_segment)?;
|
||||
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_texts".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "string_id".to_string(),
|
||||
// min_doc_count: Some(0),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None);
|
||||
|
||||
assert!(res.is_ok());
|
||||
|
||||
// This request has min_doc_count set to 0
|
||||
// That means we load potentially the whole dict
|
||||
// Make sure the bucket count is still fine
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_texts".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
@@ -1228,6 +1274,24 @@ mod tests {
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None);
|
||||
assert!(res.is_ok());
|
||||
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_texts".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "string_id".to_string(),
|
||||
// min_doc_count: Some(0),
|
||||
size: Some(70_000),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None);
|
||||
|
||||
assert!(res.is_err());
|
||||
@@ -1384,14 +1448,10 @@ mod bench {
|
||||
let mut collector = get_collector_with_buckets(total_terms);
|
||||
let vals = get_rand_terms(total_terms, num_terms);
|
||||
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
|
||||
let bucket_count: BucketCount = BucketCount {
|
||||
bucket_count: Default::default(),
|
||||
max_bucket_count: 1_000_001u32,
|
||||
};
|
||||
b.iter(|| {
|
||||
for &val in &vals {
|
||||
collector
|
||||
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
|
||||
.increment_bucket(&[val], 0, &aggregations_with_accessor, &None)
|
||||
.unwrap();
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user