mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-03-25 22:50:42 +00:00
Compare commits
4 Commits
composite-
...
optim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d53bad6cac | ||
|
|
d45602fb9a | ||
|
|
545169c0d8 | ||
|
|
68a9066d13 |
@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
rust-version = "1.85"
|
||||
rust-version = "1.86"
|
||||
exclude = ["benches/*.json", "benches/*.txt"]
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -141,6 +141,13 @@ fn main() {
|
||||
0.01,
|
||||
0.15,
|
||||
),
|
||||
(
|
||||
"N=1M, p(a)=1%, p(b)=30%, p(c)=30%".to_string(),
|
||||
1_000_000,
|
||||
0.01,
|
||||
0.30,
|
||||
0.30,
|
||||
),
|
||||
];
|
||||
|
||||
let queries = &["a", "+a +b", "+a +b +c", "a OR b", "a OR b OR c"];
|
||||
|
||||
@@ -31,7 +31,7 @@ pub use u64_based::{
|
||||
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
};
|
||||
pub use u128_based::{
|
||||
CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
|
||||
CompactHit, CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
|
||||
serialize_column_values_u128,
|
||||
};
|
||||
pub use vec_column::VecColumn;
|
||||
|
||||
@@ -292,6 +292,19 @@ impl BinarySerializable for IPCodecParams {
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the result of looking up a u128 value in the compact space.
|
||||
///
|
||||
/// If a value is outside the compact space, the next compact value is returned.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum CompactHit {
|
||||
/// The value exists in the compact space
|
||||
Exact(u32),
|
||||
/// The value does not exist in the compact space, but the next higher value does
|
||||
Next(u32),
|
||||
/// The value is greater than the maximum compact value
|
||||
AfterLast,
|
||||
}
|
||||
|
||||
/// Exposes the compact space compressed values as u64.
|
||||
///
|
||||
/// This allows faster access to the values, as u64 is faster to work with than u128.
|
||||
@@ -309,6 +322,11 @@ impl CompactSpaceU64Accessor {
|
||||
pub fn compact_to_u128(&self, compact: u32) -> u128 {
|
||||
self.0.compact_to_u128(compact)
|
||||
}
|
||||
|
||||
/// Finds the next compact space value for a given u128 value.
|
||||
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
|
||||
self.0.u128_to_next_compact(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl ColumnValues<u64> for CompactSpaceU64Accessor {
|
||||
@@ -441,6 +459,21 @@ impl CompactSpaceDecompressor {
|
||||
self.params.compact_space.u128_to_compact(value)
|
||||
}
|
||||
|
||||
/// Finds the next compact space value for a given u128 value.
|
||||
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
|
||||
match self.u128_to_compact(value) {
|
||||
Ok(compact) => CompactHit::Exact(compact),
|
||||
Err(pos) => {
|
||||
if pos >= self.params.compact_space.ranges_mapping.len() {
|
||||
CompactHit::AfterLast
|
||||
} else {
|
||||
let next_range = &self.params.compact_space.ranges_mapping[pos];
|
||||
CompactHit::Next(next_range.compact_start)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_to_u128(&self, compact: u32) -> u128 {
|
||||
self.params.compact_space.compact_to_u128(compact)
|
||||
}
|
||||
@@ -823,6 +856,41 @@ mod tests {
|
||||
let _data = test_aux_vals(vals);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_u128_to_next_compact() {
|
||||
let vals = &[100u128, 200u128, 1_000_000_000u128, 1_000_000_100u128];
|
||||
let mut data = test_aux_vals(vals);
|
||||
|
||||
let _header = U128Header::deserialize(&mut data);
|
||||
let decomp = CompactSpaceDecompressor::open(data).unwrap();
|
||||
|
||||
// Test value that's already in a range
|
||||
let compact_100 = decomp.u128_to_compact(100).unwrap();
|
||||
assert_eq!(
|
||||
decomp.u128_to_next_compact(100),
|
||||
CompactHit::Exact(compact_100)
|
||||
);
|
||||
|
||||
// Test value between two ranges
|
||||
let compact_million = decomp.u128_to_compact(1_000_000_000).unwrap();
|
||||
assert_eq!(
|
||||
decomp.u128_to_next_compact(250),
|
||||
CompactHit::Next(compact_million)
|
||||
);
|
||||
|
||||
// Test value before the first range
|
||||
assert_eq!(
|
||||
decomp.u128_to_next_compact(50),
|
||||
CompactHit::Next(compact_100)
|
||||
);
|
||||
|
||||
// Test value after the last range
|
||||
assert_eq!(
|
||||
decomp.u128_to_next_compact(10_000_000_000),
|
||||
CompactHit::AfterLast
|
||||
);
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
fn num_strategy() -> impl Strategy<Value = u128> {
|
||||
|
||||
@@ -7,7 +7,7 @@ mod compact_space;
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes, VInt};
|
||||
pub use compact_space::{
|
||||
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
|
||||
CompactHit, CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
|
||||
};
|
||||
|
||||
use crate::column_values::monotonic_map_column;
|
||||
|
||||
@@ -47,6 +47,9 @@ impl TinySet {
|
||||
TinySet(val)
|
||||
}
|
||||
|
||||
/// An empty `TinySet` constant.
|
||||
pub const EMPTY: TinySet = TinySet(0u64);
|
||||
|
||||
/// Returns an empty `TinySet`.
|
||||
#[inline]
|
||||
pub fn empty() -> TinySet {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
use columnar::column_values::{CompactHit, CompactSpaceU64Accessor};
|
||||
use columnar::{Column, ColumnType, MonotonicallyMappableToU64, StrColumn, TermOrdHit};
|
||||
|
||||
use crate::aggregation::accessor_helpers::get_numeric_or_date_column_types;
|
||||
@@ -150,7 +150,7 @@ impl CompositeSourceAccessors {
|
||||
{
|
||||
match source_after_key_opt {
|
||||
Some(after_key) => PrecomputedAfterKey::precompute(
|
||||
&first_col,
|
||||
first_col,
|
||||
after_key,
|
||||
&source.field,
|
||||
source.missing_order,
|
||||
@@ -342,7 +342,7 @@ impl PrecomputedDateInterval {
|
||||
.to_string(),
|
||||
)),
|
||||
(Some(fixed_interval), None) => {
|
||||
let fixed_interval_ms = parse_into_milliseconds(&fixed_interval)?;
|
||||
let fixed_interval_ms = parse_into_milliseconds(fixed_interval)?;
|
||||
Ok(PrecomputedDateInterval::FixedNanoseconds(
|
||||
fixed_interval_ms * 1_000_000,
|
||||
))
|
||||
@@ -370,6 +370,16 @@ pub enum PrecomputedAfterKey {
|
||||
AfterLast,
|
||||
}
|
||||
|
||||
impl From<CompactHit> for PrecomputedAfterKey {
|
||||
fn from(hit: CompactHit) -> Self {
|
||||
match hit {
|
||||
CompactHit::Exact(ord) => PrecomputedAfterKey::Exact(ord as u64),
|
||||
CompactHit::Next(ord) => PrecomputedAfterKey::Next(ord as u64),
|
||||
CompactHit::AfterLast => PrecomputedAfterKey::AfterLast,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TermOrdHit> for PrecomputedAfterKey {
|
||||
fn from(hit: TermOrdHit) -> Self {
|
||||
match hit {
|
||||
@@ -418,58 +428,18 @@ impl PrecomputedAfterKey {
|
||||
}
|
||||
|
||||
fn precompute_ip_addr(column: &Column<u64>, key: &Ipv6Addr) -> crate::Result<Self> {
|
||||
// For IP addresses we need to find the compact space value.
|
||||
// We try to convert via the column's min/max range scan.
|
||||
// Since CompactSpaceU64Accessor::u128_to_compact is not public,
|
||||
// we search linearly for the exact u64 value by scanning column values.
|
||||
let compact_space_accessor = column
|
||||
.values
|
||||
.clone()
|
||||
.downcast_arc::<CompactSpaceU64Accessor>()
|
||||
.map_err(|_| {
|
||||
TantivyError::AggregationError(crate::aggregation::AggregationError::InternalError(
|
||||
"type mismatch: could not downcast to CompactSpaceU64Accessor".to_string(),
|
||||
))
|
||||
})?;
|
||||
let ip_u128 = key.to_bits();
|
||||
|
||||
// Scan for matching value - IP columns are typically small
|
||||
let num_vals = column.values.num_vals();
|
||||
let mut found_exact = false;
|
||||
let mut exact_compact = 0u64;
|
||||
let mut best_next: Option<u64> = None;
|
||||
|
||||
for doc_id in 0..num_vals {
|
||||
let val = column.values.get_val(doc_id);
|
||||
// We need the CompactSpaceU64Accessor to convert compact to u128
|
||||
let compact_accessor = column
|
||||
.values
|
||||
.clone()
|
||||
.downcast_arc::<CompactSpaceU64Accessor>()
|
||||
.map_err(|_| {
|
||||
TantivyError::AggregationError(
|
||||
crate::aggregation::AggregationError::InternalError(
|
||||
"type mismatch: could not downcast to CompactSpaceU64Accessor"
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
let val_u128 = compact_accessor.compact_to_u128(val as u32);
|
||||
if val_u128 == ip_u128 {
|
||||
found_exact = true;
|
||||
exact_compact = val;
|
||||
break;
|
||||
} else if val_u128 > ip_u128 {
|
||||
match best_next {
|
||||
None => best_next = Some(val),
|
||||
Some(current_best) => {
|
||||
let current_u128 = compact_accessor.compact_to_u128(current_best as u32);
|
||||
if val_u128 < current_u128 {
|
||||
best_next = Some(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if found_exact {
|
||||
Ok(PrecomputedAfterKey::Exact(exact_compact))
|
||||
} else if let Some(next) = best_next {
|
||||
Ok(PrecomputedAfterKey::Next(next))
|
||||
} else {
|
||||
Ok(PrecomputedAfterKey::AfterLast)
|
||||
}
|
||||
let ip_next_compact = compact_space_accessor.u128_to_next_compact(ip_u128);
|
||||
Ok(ip_next_compact.into())
|
||||
}
|
||||
|
||||
fn precompute_term_ord(
|
||||
|
||||
@@ -8,9 +8,8 @@ const NS_IN_DAY: i64 = Nanosecond::per_t::<i128>(Day) as i64;
|
||||
pub(super) fn try_year_bucket(timestamp_ns: i64) -> crate::Result<i64> {
|
||||
year_bucket_using_time_crate(timestamp_ns).map_err(|e| {
|
||||
crate::TantivyError::InvalidArgument(format!(
|
||||
"Failed to compute year bucket for timestamp {}: {}",
|
||||
timestamp_ns,
|
||||
e.to_string()
|
||||
"Failed to compute year bucket for timestamp {}: {e}",
|
||||
timestamp_ns
|
||||
))
|
||||
})
|
||||
}
|
||||
@@ -20,9 +19,8 @@ pub(super) fn try_year_bucket(timestamp_ns: i64) -> crate::Result<i64> {
|
||||
pub(super) fn try_month_bucket(timestamp_ns: i64) -> crate::Result<i64> {
|
||||
month_bucket_using_time_crate(timestamp_ns).map_err(|e| {
|
||||
crate::TantivyError::InvalidArgument(format!(
|
||||
"Failed to compute month bucket for timestamp {}: {}",
|
||||
timestamp_ns,
|
||||
e.to_string()
|
||||
"Failed to compute month bucket for timestamp {}: {e}",
|
||||
timestamp_ns
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -137,7 +137,7 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
|
||||
.name
|
||||
.clone();
|
||||
|
||||
let buckets = self.into_intermediate_bucket_result(agg_data, parent_bucket_id)?;
|
||||
let buckets = self.add_intermediate_bucket_result(agg_data, parent_bucket_id)?;
|
||||
results.push(
|
||||
name,
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Composite { buckets }),
|
||||
@@ -156,17 +156,15 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
|
||||
let composite_agg_data = agg_data.take_composite_req_data(self.accessor_idx);
|
||||
|
||||
for doc in docs {
|
||||
let mut sub_level_values = SmallVec::new();
|
||||
recursive_key_visitor(
|
||||
*doc,
|
||||
&composite_agg_data,
|
||||
0,
|
||||
&mut sub_level_values,
|
||||
&mut self.parent_buckets[parent_bucket_id as usize],
|
||||
true,
|
||||
&mut self.sub_agg,
|
||||
&mut self.bucket_id_provider,
|
||||
)?;
|
||||
let mut visitor = CompositeKeyVisitor {
|
||||
doc_id: *doc,
|
||||
composite_agg_data: &composite_agg_data,
|
||||
buckets: &mut self.parent_buckets[parent_bucket_id as usize],
|
||||
sub_agg: &mut self.sub_agg,
|
||||
bucket_id_provider: &mut self.bucket_id_provider,
|
||||
sub_level_values: SmallVec::new(),
|
||||
};
|
||||
visitor.visit(0, true)?;
|
||||
}
|
||||
agg_data.put_back_composite_req_data(self.accessor_idx, composite_agg_data);
|
||||
|
||||
@@ -238,7 +236,7 @@ impl SegmentCompositeCollector {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn into_intermediate_bucket_result(
|
||||
fn add_intermediate_bucket_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
parent_bucket_id: BucketId,
|
||||
@@ -305,6 +303,13 @@ fn validate_req(req_data: &mut AggregationsSegmentCtx, accessor_idx: usize) -> c
|
||||
"composite aggregation 'size' must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if composite_data.composite_accessors.len() > MAX_DYN_ARRAY_SIZE {
|
||||
return Err(TantivyError::InvalidArgument(format!(
|
||||
"composite aggregation source supports maximum {MAX_DYN_ARRAY_SIZE} sources",
|
||||
)));
|
||||
}
|
||||
|
||||
let column_types_for_sources = composite_data.composite_accessors.iter().map(|item| {
|
||||
item.accessors
|
||||
.iter()
|
||||
@@ -313,11 +318,6 @@ fn validate_req(req_data: &mut AggregationsSegmentCtx, accessor_idx: usize) -> c
|
||||
});
|
||||
|
||||
for column_types in column_types_for_sources {
|
||||
if column_types.len() > MAX_DYN_ARRAY_SIZE {
|
||||
return Err(TantivyError::InvalidArgument(format!(
|
||||
"composite aggregation source supports maximum {MAX_DYN_ARRAY_SIZE} sources",
|
||||
)));
|
||||
}
|
||||
if column_types.contains(&ColumnType::Bytes) {
|
||||
return Err(TantivyError::InvalidArgument(
|
||||
"composite aggregation does not support 'bytes' field type".to_string(),
|
||||
@@ -480,195 +480,173 @@ fn resolve_term(
|
||||
Ok(key)
|
||||
}
|
||||
|
||||
/// Depth-first walk of the accessors to build the composite key combinations
|
||||
/// and update the buckets.
|
||||
fn recursive_key_visitor(
|
||||
/// Browse through the cardinal product obtained by the different values of the doc composite key
|
||||
/// sources.
|
||||
///
|
||||
/// For each of those tuple-key, that are after the limit key, we call collect_bucket_with_limit.
|
||||
struct CompositeKeyVisitor<'a> {
|
||||
doc_id: crate::DocId,
|
||||
composite_agg_data: &CompositeAggReqData,
|
||||
source_idx_for_recursion: usize,
|
||||
sub_level_values: &mut SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
|
||||
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
|
||||
// whether we need to consider the after_key in the following levels
|
||||
is_on_after_key: bool,
|
||||
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
bucket_id_provider: &mut BucketIdProvider,
|
||||
) -> crate::Result<()> {
|
||||
if source_idx_for_recursion == composite_agg_data.req.sources.len() {
|
||||
if !is_on_after_key {
|
||||
collect_bucket_with_limit(
|
||||
doc_id,
|
||||
composite_agg_data.req.size as usize,
|
||||
buckets,
|
||||
sub_level_values,
|
||||
sub_agg,
|
||||
bucket_id_provider,
|
||||
);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
composite_agg_data: &'a CompositeAggReqData,
|
||||
buckets: &'a mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
|
||||
sub_agg: &'a mut Option<CachedSubAggs<HighCardSubAggCache>>,
|
||||
bucket_id_provider: &'a mut BucketIdProvider,
|
||||
sub_level_values: SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
|
||||
}
|
||||
|
||||
let current_level_accessors = &composite_agg_data.composite_accessors[source_idx_for_recursion];
|
||||
let current_level_source = &composite_agg_data.req.sources[source_idx_for_recursion];
|
||||
let mut missing = true;
|
||||
for (accessor_idx, accessor) in current_level_accessors.accessors.iter().enumerate() {
|
||||
let values = accessor.column.values_for_doc(doc_id);
|
||||
for value in values {
|
||||
missing = false;
|
||||
match current_level_source {
|
||||
CompositeAggregationSource::Terms(_) => {
|
||||
let preceeds_after_key_type =
|
||||
accessor_idx < current_level_accessors.after_key_accessor_idx;
|
||||
if is_on_after_key && preceeds_after_key_type {
|
||||
break;
|
||||
}
|
||||
let matches_after_key_type =
|
||||
accessor_idx == current_level_accessors.after_key_accessor_idx;
|
||||
|
||||
if matches_after_key_type && is_on_after_key {
|
||||
let should_skip = match current_level_source.order() {
|
||||
Order::Asc => current_level_accessors.after_key.gt(value),
|
||||
Order::Desc => current_level_accessors.after_key.lt(value),
|
||||
};
|
||||
if should_skip {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
sub_level_values.push(InternalValueRepr::new_term(
|
||||
value,
|
||||
accessor_idx as u8,
|
||||
current_level_source.order(),
|
||||
));
|
||||
let still_on_after_key =
|
||||
matches_after_key_type && current_level_accessors.after_key.equals(value);
|
||||
recursive_key_visitor(
|
||||
doc_id,
|
||||
composite_agg_data,
|
||||
source_idx_for_recursion + 1,
|
||||
sub_level_values,
|
||||
buckets,
|
||||
is_on_after_key && still_on_after_key,
|
||||
sub_agg,
|
||||
bucket_id_provider,
|
||||
)?;
|
||||
sub_level_values.pop();
|
||||
}
|
||||
CompositeAggregationSource::Histogram(source) => {
|
||||
let float_value = match accessor.column_type {
|
||||
ColumnType::U64 => value as f64,
|
||||
ColumnType::I64 => i64::from_u64(value) as f64,
|
||||
ColumnType::DateTime => i64::from_u64(value) as f64 / 1_000_000.,
|
||||
ColumnType::F64 => f64::from_u64(value),
|
||||
_ => {
|
||||
panic!(
|
||||
"unexpected type {:?}. This should not happen",
|
||||
accessor.column_type
|
||||
)
|
||||
}
|
||||
};
|
||||
let bucket_index = (float_value / source.interval).floor() as i64;
|
||||
let bucket_value = i64::to_u64(bucket_index);
|
||||
if is_on_after_key {
|
||||
let should_skip = match current_level_source.order() {
|
||||
Order::Asc => current_level_accessors.after_key.gt(bucket_value),
|
||||
Order::Desc => current_level_accessors.after_key.lt(bucket_value),
|
||||
};
|
||||
if should_skip {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
sub_level_values.push(InternalValueRepr::new_histogram(
|
||||
bucket_value,
|
||||
current_level_source.order(),
|
||||
));
|
||||
let still_on_after_key = current_level_accessors.after_key.equals(bucket_value);
|
||||
recursive_key_visitor(
|
||||
doc_id,
|
||||
composite_agg_data,
|
||||
source_idx_for_recursion + 1,
|
||||
sub_level_values,
|
||||
buckets,
|
||||
is_on_after_key && still_on_after_key,
|
||||
sub_agg,
|
||||
bucket_id_provider,
|
||||
)?;
|
||||
sub_level_values.pop();
|
||||
}
|
||||
CompositeAggregationSource::DateHistogram(_) => {
|
||||
let value_ns = match accessor.column_type {
|
||||
ColumnType::DateTime => i64::from_u64(value),
|
||||
_ => {
|
||||
panic!(
|
||||
"unexpected type {:?}. This should not happen",
|
||||
accessor.column_type
|
||||
)
|
||||
}
|
||||
};
|
||||
let bucket_index = match accessor.date_histogram_interval {
|
||||
PrecomputedDateInterval::FixedNanoseconds(fixed_interval_ns) => {
|
||||
(value_ns / fixed_interval_ns) * fixed_interval_ns
|
||||
}
|
||||
PrecomputedDateInterval::Calendar(CalendarInterval::Year) => {
|
||||
calendar_interval::try_year_bucket(value_ns)?
|
||||
}
|
||||
PrecomputedDateInterval::Calendar(CalendarInterval::Month) => {
|
||||
calendar_interval::try_month_bucket(value_ns)?
|
||||
}
|
||||
PrecomputedDateInterval::Calendar(CalendarInterval::Week) => {
|
||||
calendar_interval::week_bucket(value_ns)
|
||||
}
|
||||
PrecomputedDateInterval::NotApplicable => {
|
||||
panic!("interval not precomputed for date histogram source")
|
||||
}
|
||||
};
|
||||
let bucket_value = i64::to_u64(bucket_index);
|
||||
if is_on_after_key {
|
||||
let should_skip = match current_level_source.order() {
|
||||
Order::Asc => current_level_accessors.after_key.gt(bucket_value),
|
||||
Order::Desc => current_level_accessors.after_key.lt(bucket_value),
|
||||
};
|
||||
if should_skip {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
sub_level_values.push(InternalValueRepr::new_histogram(
|
||||
bucket_value,
|
||||
current_level_source.order(),
|
||||
));
|
||||
let still_on_after_key = current_level_accessors.after_key.equals(bucket_value);
|
||||
recursive_key_visitor(
|
||||
doc_id,
|
||||
composite_agg_data,
|
||||
source_idx_for_recursion + 1,
|
||||
sub_level_values,
|
||||
buckets,
|
||||
is_on_after_key && still_on_after_key,
|
||||
sub_agg,
|
||||
bucket_id_provider,
|
||||
)?;
|
||||
sub_level_values.pop();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
if missing && current_level_source.missing_bucket() {
|
||||
if is_on_after_key && current_level_accessors.skip_missing {
|
||||
impl CompositeKeyVisitor<'_> {
|
||||
/// Depth-first walk of the accessors to build the composite key combinations
|
||||
/// and update the buckets.
|
||||
///
|
||||
/// `source_idx` is the current source index in the recursion.
|
||||
/// `is_on_after_key` tracks whether we still need to consider the after_key
|
||||
/// for pruning at this level and below.
|
||||
fn visit(&mut self, source_idx: usize, is_on_after_key: bool) -> crate::Result<()> {
|
||||
if source_idx == self.composite_agg_data.req.sources.len() {
|
||||
if !is_on_after_key {
|
||||
collect_bucket_with_limit(
|
||||
self.doc_id,
|
||||
self.composite_agg_data.req.size as usize,
|
||||
self.buckets,
|
||||
&self.sub_level_values,
|
||||
self.sub_agg,
|
||||
self.bucket_id_provider,
|
||||
);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
sub_level_values.push(InternalValueRepr::new_missing(
|
||||
current_level_source.order(),
|
||||
current_level_source.missing_order(),
|
||||
));
|
||||
recursive_key_visitor(
|
||||
doc_id,
|
||||
composite_agg_data,
|
||||
source_idx_for_recursion + 1,
|
||||
sub_level_values,
|
||||
buckets,
|
||||
is_on_after_key && current_level_accessors.is_after_key_explicit_missing,
|
||||
sub_agg,
|
||||
bucket_id_provider,
|
||||
)?;
|
||||
sub_level_values.pop();
|
||||
|
||||
let current_level_accessors = &self.composite_agg_data.composite_accessors[source_idx];
|
||||
let current_level_source = &self.composite_agg_data.req.sources[source_idx];
|
||||
let mut missing = true;
|
||||
for (accessor_idx, accessor) in current_level_accessors.accessors.iter().enumerate() {
|
||||
let values = accessor.column.values_for_doc(self.doc_id);
|
||||
for value in values {
|
||||
missing = false;
|
||||
match current_level_source {
|
||||
CompositeAggregationSource::Terms(_) => {
|
||||
let preceeds_after_key_type =
|
||||
accessor_idx < current_level_accessors.after_key_accessor_idx;
|
||||
if is_on_after_key && preceeds_after_key_type {
|
||||
break;
|
||||
}
|
||||
let matches_after_key_type =
|
||||
accessor_idx == current_level_accessors.after_key_accessor_idx;
|
||||
|
||||
if matches_after_key_type && is_on_after_key {
|
||||
let should_skip = match current_level_source.order() {
|
||||
Order::Asc => current_level_accessors.after_key.gt(value),
|
||||
Order::Desc => current_level_accessors.after_key.lt(value),
|
||||
};
|
||||
if should_skip {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
self.sub_level_values.push(InternalValueRepr::new_term(
|
||||
value,
|
||||
accessor_idx as u8,
|
||||
current_level_source.order(),
|
||||
));
|
||||
let still_on_after_key = matches_after_key_type
|
||||
&& current_level_accessors.after_key.equals(value);
|
||||
self.visit(source_idx + 1, is_on_after_key && still_on_after_key)?;
|
||||
self.sub_level_values.pop();
|
||||
}
|
||||
CompositeAggregationSource::Histogram(source) => {
|
||||
let float_value = match accessor.column_type {
|
||||
ColumnType::U64 => value as f64,
|
||||
ColumnType::I64 => i64::from_u64(value) as f64,
|
||||
ColumnType::DateTime => i64::from_u64(value) as f64 / 1_000_000.,
|
||||
ColumnType::F64 => f64::from_u64(value),
|
||||
_ => {
|
||||
panic!(
|
||||
"unexpected type {:?}. This should not happen",
|
||||
accessor.column_type
|
||||
)
|
||||
}
|
||||
};
|
||||
let bucket_index = (float_value / source.interval).floor() as i64;
|
||||
let bucket_value = i64::to_u64(bucket_index);
|
||||
if is_on_after_key {
|
||||
let should_skip = match current_level_source.order() {
|
||||
Order::Asc => current_level_accessors.after_key.gt(bucket_value),
|
||||
Order::Desc => current_level_accessors.after_key.lt(bucket_value),
|
||||
};
|
||||
if should_skip {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
self.sub_level_values.push(InternalValueRepr::new_histogram(
|
||||
bucket_value,
|
||||
current_level_source.order(),
|
||||
));
|
||||
let still_on_after_key =
|
||||
current_level_accessors.after_key.equals(bucket_value);
|
||||
self.visit(source_idx + 1, is_on_after_key && still_on_after_key)?;
|
||||
self.sub_level_values.pop();
|
||||
}
|
||||
CompositeAggregationSource::DateHistogram(_) => {
|
||||
let value_ns = match accessor.column_type {
|
||||
ColumnType::DateTime => i64::from_u64(value),
|
||||
_ => {
|
||||
panic!(
|
||||
"unexpected type {:?}. This should not happen",
|
||||
accessor.column_type
|
||||
)
|
||||
}
|
||||
};
|
||||
let bucket_index = match accessor.date_histogram_interval {
|
||||
PrecomputedDateInterval::FixedNanoseconds(fixed_interval_ns) => {
|
||||
(value_ns / fixed_interval_ns) * fixed_interval_ns
|
||||
}
|
||||
PrecomputedDateInterval::Calendar(CalendarInterval::Year) => {
|
||||
calendar_interval::try_year_bucket(value_ns)?
|
||||
}
|
||||
PrecomputedDateInterval::Calendar(CalendarInterval::Month) => {
|
||||
calendar_interval::try_month_bucket(value_ns)?
|
||||
}
|
||||
PrecomputedDateInterval::Calendar(CalendarInterval::Week) => {
|
||||
calendar_interval::week_bucket(value_ns)
|
||||
}
|
||||
PrecomputedDateInterval::NotApplicable => {
|
||||
panic!("interval not precomputed for date histogram source")
|
||||
}
|
||||
};
|
||||
let bucket_value = i64::to_u64(bucket_index);
|
||||
if is_on_after_key {
|
||||
let should_skip = match current_level_source.order() {
|
||||
Order::Asc => current_level_accessors.after_key.gt(bucket_value),
|
||||
Order::Desc => current_level_accessors.after_key.lt(bucket_value),
|
||||
};
|
||||
if should_skip {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
self.sub_level_values.push(InternalValueRepr::new_histogram(
|
||||
bucket_value,
|
||||
current_level_source.order(),
|
||||
));
|
||||
let still_on_after_key =
|
||||
current_level_accessors.after_key.equals(bucket_value);
|
||||
self.visit(source_idx + 1, is_on_after_key && still_on_after_key)?;
|
||||
self.sub_level_values.pop();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
if missing && current_level_source.missing_bucket() {
|
||||
if is_on_after_key && current_level_accessors.skip_missing {
|
||||
return Ok(());
|
||||
}
|
||||
self.sub_level_values.push(InternalValueRepr::new_missing(
|
||||
current_level_source.order(),
|
||||
current_level_source.missing_order(),
|
||||
));
|
||||
self.visit(
|
||||
source_idx + 1,
|
||||
is_on_after_key && current_level_accessors.is_after_key_explicit_missing,
|
||||
)?;
|
||||
self.sub_level_values.pop();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -323,7 +323,7 @@ mod tests {
|
||||
let mut iter = map.into_iter();
|
||||
let (k, v) = iter.next().unwrap();
|
||||
assert_eq!(k.as_slice(), &key1);
|
||||
assert_eq!(v, "c");
|
||||
assert_eq!(v, "a");
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -820,7 +820,7 @@ mod tests {
|
||||
{"key": {"myterm": "apple"}, "doc_count": 1}
|
||||
])
|
||||
);
|
||||
assert!(res["my_composite"].get("after_key").is_none());
|
||||
assert!(res["fruity_aggreg"].get("after_key").is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/// This modules helps comparing numerical values of different types (i64, u64
|
||||
/// This module helps comparing numerical values of different types (i64, u64
|
||||
/// and f64).
|
||||
pub(super) mod num_cmp {
|
||||
use std::cmp::Ordering;
|
||||
@@ -93,7 +93,7 @@ pub(super) mod num_cmp {
|
||||
}
|
||||
}
|
||||
|
||||
/// This modules helps projecting numerical values to other numerical types.
|
||||
/// This module helps projecting numerical values to other numerical types.
|
||||
/// When the target value space cannot exactly represent the source value, the
|
||||
/// next representable value is returned (or AfterLast if the source value is
|
||||
/// larger than the largest representable value).
|
||||
@@ -138,9 +138,9 @@ pub(super) mod num_proj {
|
||||
|
||||
pub fn f64_to_i64(value: f64) -> ProjectedNumber<i64> {
|
||||
if value < (i64::MIN as f64) {
|
||||
return ProjectedNumber::Next(i64::MIN);
|
||||
ProjectedNumber::Next(i64::MIN)
|
||||
} else if value >= (i64::MAX as f64) {
|
||||
return ProjectedNumber::AfterLast;
|
||||
ProjectedNumber::AfterLast
|
||||
} else if value.fract() == 0.0 {
|
||||
ProjectedNumber::Exact(value as i64)
|
||||
} else if value > 0.0 {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::Collector;
|
||||
use crate::collector::SegmentCollector;
|
||||
use crate::query::Weight;
|
||||
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
|
||||
|
||||
/// `CountCollector` collector only counts how many
|
||||
@@ -55,6 +56,15 @@ impl Collector for Count {
|
||||
fn merge_fruits(&self, segment_counts: Vec<usize>) -> crate::Result<usize> {
|
||||
Ok(segment_counts.into_iter().sum())
|
||||
}
|
||||
|
||||
fn collect_segment(
|
||||
&self,
|
||||
weight: &dyn Weight,
|
||||
_segment_ord: u32,
|
||||
reader: &SegmentReader,
|
||||
) -> crate::Result<usize> {
|
||||
Ok(weight.count(reader)? as usize)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -167,6 +167,7 @@ impl CompositeFile {
|
||||
.map(|byte_range| self.data.slice(byte_range.clone()))
|
||||
}
|
||||
|
||||
/// Returns the space usage per field in this composite file.
|
||||
pub fn space_usage(&self, schema: &Schema) -> PerFieldSpaceUsage {
|
||||
let mut fields = Vec::new();
|
||||
for (&field_addr, byte_range) in &self.offsets_index {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::borrow::{Borrow, BorrowMut};
|
||||
|
||||
use common::TinySet;
|
||||
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::DocId;
|
||||
|
||||
@@ -14,6 +16,12 @@ pub const TERMINATED: DocId = i32::MAX as u32;
|
||||
/// exactly this size as long as we can fill the buffer.
|
||||
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
|
||||
|
||||
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].
|
||||
pub const BLOCK_NUM_TINYBITSETS: usize = 16;
|
||||
|
||||
/// Number of doc IDs covered by one block: `BLOCK_NUM_TINYBITSETS * 64 = 1024`.
|
||||
pub const BLOCK_WINDOW: u32 = BLOCK_NUM_TINYBITSETS as u32 * 64;
|
||||
|
||||
/// Represents an iterable set of sorted doc ids.
|
||||
pub trait DocSet: Send {
|
||||
/// Goes to the next element.
|
||||
@@ -160,6 +168,31 @@ pub trait DocSet: Send {
|
||||
self.size_hint() as u64
|
||||
}
|
||||
|
||||
/// Fills a bitmask representing which documents in `[min_doc, min_doc + BLOCK_WINDOW)` are
|
||||
/// present in this docset.
|
||||
///
|
||||
/// The window is divided into `BLOCK_NUM_TINYBITSETS` buckets of 64 docs each.
|
||||
/// Returns the next doc `>= min_doc + BLOCK_WINDOW`, or `TERMINATED` if exhausted.
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> DocId {
|
||||
self.seek(min_doc);
|
||||
let horizon = min_doc + BLOCK_WINDOW;
|
||||
loop {
|
||||
let doc = self.doc();
|
||||
if doc >= horizon {
|
||||
return doc;
|
||||
}
|
||||
let delta = doc - min_doc;
|
||||
mask[(delta / 64) as usize].insert_mut(delta % 64);
|
||||
if self.advance() == TERMINATED {
|
||||
return TERMINATED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number documents matching.
|
||||
/// Calling this method consumes the `DocSet`.
|
||||
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
@@ -214,6 +247,18 @@ impl DocSet for &mut dyn DocSet {
|
||||
(**self).seek_danger(target)
|
||||
}
|
||||
|
||||
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
|
||||
(**self).fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> DocId {
|
||||
(**self).fill_bitset_block(min_doc, mask)
|
||||
}
|
||||
|
||||
fn doc(&self) -> u32 {
|
||||
(**self).doc()
|
||||
}
|
||||
@@ -256,6 +301,15 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
|
||||
unboxed.fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> DocId {
|
||||
let unboxed: &mut TDocSet = self.borrow_mut();
|
||||
unboxed.fill_bitset_block(min_doc, mask)
|
||||
}
|
||||
|
||||
fn doc(&self) -> DocId {
|
||||
let unboxed: &TDocSet = self.borrow();
|
||||
unboxed.doc()
|
||||
|
||||
@@ -649,9 +649,6 @@ impl SegmentUpdater {
|
||||
merge_operation.segment_ids(),
|
||||
advance_deletes_err
|
||||
);
|
||||
assert!(!cfg!(test), "Merge failed.");
|
||||
|
||||
// ... cancel merge
|
||||
// `merge_operations` are tracked. As it is dropped, the
|
||||
// the segment_ids will be available again for merge.
|
||||
return Err(advance_deletes_err);
|
||||
@@ -719,7 +716,7 @@ mod tests {
|
||||
// Regression test: -(max_doc as i32) overflows for max_doc >= 2^31.
|
||||
// Using std::cmp::Reverse avoids this.
|
||||
let inventory = SegmentMetaInventory::default();
|
||||
let mut metas = vec![
|
||||
let mut metas = [
|
||||
inventory.new_segment_meta(SegmentId::generate_random(), 100),
|
||||
inventory.new_segment_meta(SegmentId::generate_random(), (1u32 << 31) - 1),
|
||||
inventory.new_segment_meta(SegmentId::generate_random(), 50_000),
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use common::TinySet;
|
||||
|
||||
use super::size_hint::estimate_intersection;
|
||||
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
|
||||
use crate::docset::{DocSet, SeekDangerResult, BLOCK_NUM_TINYBITSETS, TERMINATED};
|
||||
use crate::query::term_query::TermScorer;
|
||||
use crate::query::{EmptyScorer, Scorer};
|
||||
use crate::{DocId, Score};
|
||||
@@ -17,7 +19,7 @@ use crate::{DocId, Score};
|
||||
/// `size_hint` of the intersection.
|
||||
pub fn intersect_scorers(
|
||||
mut scorers: Vec<Box<dyn Scorer>>,
|
||||
num_docs_segment: u32,
|
||||
segment_num_docs: u32,
|
||||
) -> Box<dyn Scorer> {
|
||||
if scorers.is_empty() {
|
||||
return Box::new(EmptyScorer);
|
||||
@@ -42,14 +44,14 @@ pub fn intersect_scorers(
|
||||
left: *(left.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
|
||||
right: *(right.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
|
||||
others: scorers,
|
||||
num_docs: num_docs_segment,
|
||||
segment_num_docs,
|
||||
});
|
||||
}
|
||||
Box::new(Intersection {
|
||||
left,
|
||||
right,
|
||||
others: scorers,
|
||||
num_docs: num_docs_segment,
|
||||
segment_num_docs,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -58,7 +60,7 @@ pub struct Intersection<TDocSet: DocSet, TOtherDocSet: DocSet = Box<dyn Scorer>>
|
||||
left: TDocSet,
|
||||
right: TDocSet,
|
||||
others: Vec<TOtherDocSet>,
|
||||
num_docs: u32,
|
||||
segment_num_docs: u32,
|
||||
}
|
||||
|
||||
fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
|
||||
@@ -78,7 +80,10 @@ fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
|
||||
|
||||
impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
|
||||
/// num_docs is the number of documents in the segment.
|
||||
pub(crate) fn new(mut docsets: Vec<TDocSet>, num_docs: u32) -> Intersection<TDocSet, TDocSet> {
|
||||
pub(crate) fn new(
|
||||
mut docsets: Vec<TDocSet>,
|
||||
segment_num_docs: u32,
|
||||
) -> Intersection<TDocSet, TDocSet> {
|
||||
let num_docsets = docsets.len();
|
||||
assert!(num_docsets >= 2);
|
||||
docsets.sort_by_key(|docset| docset.cost());
|
||||
@@ -97,7 +102,7 @@ impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
|
||||
left,
|
||||
right,
|
||||
others: docsets,
|
||||
num_docs,
|
||||
segment_num_docs,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -214,7 +219,7 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
|
||||
[self.left.size_hint(), self.right.size_hint()]
|
||||
.into_iter()
|
||||
.chain(self.others.iter().map(DocSet::size_hint)),
|
||||
self.num_docs,
|
||||
self.segment_num_docs,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -224,6 +229,91 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
|
||||
// If there are docsets that are bad at skipping, they should also influence the cost.
|
||||
self.left.cost()
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
const DENSITY_THRESHOLD_INVERSE: u32 = 32;
|
||||
if self
|
||||
.left
|
||||
.size_hint()
|
||||
.saturating_mul(DENSITY_THRESHOLD_INVERSE)
|
||||
< self.segment_num_docs
|
||||
{
|
||||
// Sparse path: if the lead iterator covers less than ~3% of docs,
|
||||
// the block approach wastes time on mostly-empty blocks.
|
||||
self.count_including_deleted_sparse()
|
||||
} else {
|
||||
// Dense approach. We push documents into a block bitset to then
|
||||
// perform count using popcount.
|
||||
self.count_including_deleted_dense()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const EMPTY_BLOCK: [TinySet; BLOCK_NUM_TINYBITSETS] = [TinySet::EMPTY; BLOCK_NUM_TINYBITSETS];
|
||||
|
||||
/// ANDs `other` into `mask` in-place. Returns `true` if the result is all zeros.
|
||||
#[inline]
|
||||
fn and_is_empty(
|
||||
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
other: &[TinySet; BLOCK_NUM_TINYBITSETS],
|
||||
) -> bool {
|
||||
let mut all_empty = true;
|
||||
for (m, t) in mask.iter_mut().zip(other.iter()) {
|
||||
*m = m.intersect(*t);
|
||||
all_empty &= m.is_empty();
|
||||
}
|
||||
all_empty
|
||||
}
|
||||
|
||||
impl<TDocSet: DocSet, TOtherDocSet: DocSet> Intersection<TDocSet, TOtherDocSet> {
|
||||
fn count_including_deleted_sparse(&mut self) -> u32 {
|
||||
let mut count = 0u32;
|
||||
let mut doc = self.doc();
|
||||
while doc != TERMINATED {
|
||||
count += 1;
|
||||
doc = self.advance();
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
/// Dense block-wise bitmask intersection count.
|
||||
///
|
||||
/// Fills a 1024-doc window from each iterator, ANDs the bitmasks together,
|
||||
/// and popcounts the result. `fill_bitset_block` handles seeking tails forward
|
||||
/// when they lag behind the current block.
|
||||
fn count_including_deleted_dense(&mut self) -> u32 {
|
||||
let mut count = 0u32;
|
||||
let mut next_base = self.left.doc();
|
||||
|
||||
while next_base < TERMINATED {
|
||||
let base = next_base;
|
||||
|
||||
// Fill lead bitmask.
|
||||
let mut mask = EMPTY_BLOCK;
|
||||
next_base = next_base.max(self.left.fill_bitset_block(base, &mut mask));
|
||||
|
||||
let mut tail_mask = EMPTY_BLOCK;
|
||||
next_base = next_base.max(self.right.fill_bitset_block(base, &mut tail_mask));
|
||||
|
||||
if and_is_empty(&mut mask, &tail_mask) {
|
||||
continue;
|
||||
}
|
||||
// AND with each additional tail.
|
||||
for other in &mut self.others {
|
||||
let mut other_mask = EMPTY_BLOCK;
|
||||
next_base = next_base.max(other.fill_bitset_block(base, &mut other_mask));
|
||||
if and_is_empty(&mut mask, &other_mask) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
for tinyset in &mask {
|
||||
count += tinyset.len();
|
||||
}
|
||||
}
|
||||
|
||||
count
|
||||
}
|
||||
}
|
||||
|
||||
impl<TScorer, TOtherScorer> Scorer for Intersection<TScorer, TOtherScorer>
|
||||
@@ -421,6 +511,82 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn prop_test_count_including_deleted_matches_default(
|
||||
a in sorted_deduped_vec(1200, 400),
|
||||
b in sorted_deduped_vec(1200, 400),
|
||||
c in sorted_deduped_vec(1200, 400),
|
||||
num_docs in 1200u32..2000u32,
|
||||
) {
|
||||
// Compute expected count via set intersection.
|
||||
let expected: u32 = a.iter()
|
||||
.filter(|doc| b.contains(doc) && c.contains(doc))
|
||||
.count() as u32;
|
||||
|
||||
// Test count_including_deleted (dense path).
|
||||
let make_intersection = || {
|
||||
Intersection::new(
|
||||
vec![
|
||||
VecDocSet::from(a.clone()),
|
||||
VecDocSet::from(b.clone()),
|
||||
VecDocSet::from(c.clone()),
|
||||
],
|
||||
num_docs,
|
||||
)
|
||||
};
|
||||
|
||||
let mut intersection = make_intersection();
|
||||
let count = intersection.count_including_deleted();
|
||||
prop_assert_eq!(count, expected,
|
||||
"count_including_deleted mismatch: a={:?}, b={:?}, c={:?}", a, b, c);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_count_including_deleted_two_way() {
|
||||
let left = VecDocSet::from(vec![1, 3, 9]);
|
||||
let right = VecDocSet::from(vec![3, 4, 9, 18]);
|
||||
let mut intersection = Intersection::new(vec![left, right], 100);
|
||||
assert_eq!(intersection.count_including_deleted(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_count_including_deleted_empty() {
|
||||
let a = VecDocSet::from(vec![1, 3]);
|
||||
let b = VecDocSet::from(vec![1, 4]);
|
||||
let c = VecDocSet::from(vec![3, 9]);
|
||||
let mut intersection = Intersection::new(vec![a, b, c], 100);
|
||||
assert_eq!(intersection.count_including_deleted(), 0);
|
||||
}
|
||||
|
||||
/// Test with enough documents to exercise the dense path (>= num_docs/32).
|
||||
#[test]
|
||||
fn test_count_including_deleted_dense_path() {
|
||||
// Create dense docsets: many docs relative to segment size.
|
||||
let docs_a: Vec<u32> = (0..2000).step_by(2).collect(); // even numbers 0..2000
|
||||
let docs_b: Vec<u32> = (0..2000).step_by(3).collect(); // multiples of 3
|
||||
let expected = docs_a.iter().filter(|d| *d % 3 == 0).count() as u32;
|
||||
|
||||
let a = VecDocSet::from(docs_a);
|
||||
let b = VecDocSet::from(docs_b);
|
||||
let mut intersection = Intersection::new(vec![a, b], 2000);
|
||||
assert_eq!(intersection.count_including_deleted(), expected);
|
||||
}
|
||||
|
||||
/// Test that spans multiple blocks (>1024 docs).
|
||||
#[test]
|
||||
fn test_count_including_deleted_multi_block() {
|
||||
let docs_a: Vec<u32> = (0..5000).collect();
|
||||
let docs_b: Vec<u32> = (0..5000).step_by(7).collect();
|
||||
let expected = docs_b.len() as u32; // all of b is in a
|
||||
|
||||
let a = VecDocSet::from(docs_a);
|
||||
let b = VecDocSet::from(docs_b);
|
||||
let mut intersection = Intersection::new(vec![a, b], 5000);
|
||||
assert_eq!(intersection.count_including_deleted(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bug_2811_intersection_candidate_should_increase() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -117,6 +117,12 @@ impl DocSet for TermScorer {
|
||||
fn size_hint(&self) -> u32 {
|
||||
self.postings.size_hint()
|
||||
}
|
||||
|
||||
// TODO
|
||||
// It is probably possible to optimize fill_bitset_block for TermScorer,
|
||||
// working directly with the blocks, enabling vectorization.
|
||||
// I did not manage to get a performance improvement on Mac ARM,
|
||||
// and do not have access to x86 to investigate.
|
||||
}
|
||||
|
||||
impl Scorer for TermScorer {
|
||||
|
||||
@@ -48,8 +48,7 @@ impl BinarySerializable for TermInfoBlockMeta {
|
||||
}
|
||||
|
||||
impl FixedSize for TermInfoBlockMeta {
|
||||
const SIZE_IN_BYTES: usize =
|
||||
u64::SIZE_IN_BYTES + TermInfo::SIZE_IN_BYTES + 3 * u8::SIZE_IN_BYTES;
|
||||
const SIZE_IN_BYTES: usize = u64::SIZE_IN_BYTES + TermInfo::SIZE_IN_BYTES + 3;
|
||||
}
|
||||
|
||||
impl TermInfoBlockMeta {
|
||||
|
||||
@@ -553,7 +553,7 @@ impl FixedSize for BlockAddrBlockMetadata {
|
||||
const SIZE_IN_BYTES: usize = u64::SIZE_IN_BYTES
|
||||
+ BlockStartAddr::SIZE_IN_BYTES
|
||||
+ 2 * u32::SIZE_IN_BYTES
|
||||
+ 2 * u8::SIZE_IN_BYTES
|
||||
+ 2
|
||||
+ u16::SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user