refactor multivalue fastfield, refactor range query (#1749)

Introduce MakeZero trait, remove make_zero from FastValue
Merge two multivalue fastfield implementations into one
prepare range query on fastfield for different types
This commit is contained in:
PSeitz
2023-01-05 12:09:50 +01:00
committed by GitHub
parent 2080c370c2
commit 07a51eb7c8
11 changed files with 287 additions and 356 deletions

View File

@@ -16,7 +16,6 @@ mod phrase_query;
mod query;
mod query_parser;
mod range_query;
mod range_query_ip_fastfield;
mod regex_query;
mod reqopt_scorer;
mod scorer;

View File

@@ -0,0 +1,209 @@
use std::ops::RangeInclusive;
use std::sync::Arc;
use fastfield_codecs::Column;
use crate::fastfield::{MakeZero, MultiValuedFastFieldReader};
use crate::{DocId, DocSet, TERMINATED};
/// Helper to have a cursor over a vec of docids
struct VecCursor {
docs: Vec<u32>,
current_pos: usize,
}
impl VecCursor {
fn new() -> Self {
Self {
docs: Vec::with_capacity(32),
current_pos: 0,
}
}
fn next(&mut self) -> Option<u32> {
self.current_pos += 1;
self.current()
}
#[inline]
fn current(&self) -> Option<u32> {
self.docs.get(self.current_pos).copied()
}
fn get_cleared_data(&mut self) -> &mut Vec<u32> {
self.docs.clear();
self.current_pos = 0;
&mut self.docs
}
fn last_value(&self) -> Option<u32> {
self.docs.iter().last().cloned()
}
fn is_empty(&self) -> bool {
self.current().is_none()
}
}
pub(crate) enum FastFieldCardinality<T: MakeZero> {
SingleValue(Arc<dyn Column<T>>),
MultiValue(MultiValuedFastFieldReader<T>),
}
impl<T: MakeZero + PartialOrd + Clone> FastFieldCardinality<T> {
fn num_docs(&self) -> u32 {
match self {
FastFieldCardinality::SingleValue(single_value) => single_value.num_vals(),
FastFieldCardinality::MultiValue(multi_value) => {
multi_value.get_index_reader().num_docs()
}
}
}
}
pub(crate) struct RangeDocSet<T: MakeZero> {
/// The range filter on the values.
value_range: RangeInclusive<T>,
fast_field: FastFieldCardinality<T>,
/// The next docid start range to fetch (inclusive).
next_fetch_start: u32,
/// Number of docs range checked in a batch.
///
/// There are two patterns.
/// - We do a full scan. => We can load large chunks. We don't know in advance if seek call
/// will come, so we start with small chunks
/// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we
/// should load small chunks. When the seeks are small, we can employ the same strategy as on a
/// full scan.
fetch_horizon: u32,
/// Current batch of loaded docs.
loaded_docs: VecCursor,
last_seek_pos_opt: Option<u32>,
}
const DEFAULT_FETCH_HORIZON: u32 = 128;
impl<T: MakeZero + Send + PartialOrd + Clone> RangeDocSet<T> {
pub(crate) fn new(value_range: RangeInclusive<T>, fast_field: FastFieldCardinality<T>) -> Self {
let mut range_docset = Self {
value_range,
fast_field,
loaded_docs: VecCursor::new(),
next_fetch_start: 0,
fetch_horizon: DEFAULT_FETCH_HORIZON,
last_seek_pos_opt: None,
};
range_docset.reset_fetch_range();
range_docset.fetch_block();
range_docset
}
fn reset_fetch_range(&mut self) {
self.fetch_horizon = DEFAULT_FETCH_HORIZON;
}
/// Returns true if more data could be fetched
fn fetch_block(&mut self) {
const MAX_HORIZON: u32 = 100_000;
while self.loaded_docs.is_empty() {
let finished_to_end = self.fetch_horizon(self.fetch_horizon);
if finished_to_end {
break;
}
// Fetch more data, increase horizon. Horizon only gets reset when doing a seek.
self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON);
}
}
/// check if the distance between the seek calls is large
fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool {
if let Some(last_seek_pos) = self.last_seek_pos_opt {
(new_seek - last_seek_pos) >= 128
} else {
true
}
}
/// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON]
fn fetch_horizon(&mut self, horizon: u32) -> bool {
let mut finished_to_end = false;
let limit = self.fast_field.num_docs();
let mut end = self.next_fetch_start + horizon;
if end >= limit {
end = limit;
finished_to_end = true;
}
match &self.fast_field {
FastFieldCardinality::MultiValue(multi) => {
let last_value = self.loaded_docs.last_value();
multi.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
self.loaded_docs.get_cleared_data(),
);
// In case of multivalues, we may have an overlap of the same docid between fetching
// blocks
if let Some(last_value) = last_value {
while self.loaded_docs.current() == Some(last_value) {
self.loaded_docs.next();
}
}
}
FastFieldCardinality::SingleValue(single) => {
single.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
self.loaded_docs.get_cleared_data(),
);
}
}
self.next_fetch_start = end;
finished_to_end
}
}
impl<T: MakeZero + Send + PartialOrd + Clone> DocSet for RangeDocSet<T> {
#[inline]
fn advance(&mut self) -> DocId {
if let Some(docid) = self.loaded_docs.next() {
docid
} else {
if self.next_fetch_start >= self.fast_field.num_docs() {
return TERMINATED;
}
self.fetch_block();
self.loaded_docs.current().unwrap_or(TERMINATED)
}
}
#[inline]
fn doc(&self) -> DocId {
self.loaded_docs.current().unwrap_or(TERMINATED)
}
/// Advances the `DocSet` forward until reaching the target, or going to the
/// lowest [`DocId`] greater than the target.
///
/// If the end of the `DocSet` is reached, [`TERMINATED`] is returned.
///
/// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation
/// of `DocSet` should support it.
///
/// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`.
fn seek(&mut self, target: DocId) -> DocId {
if self.is_last_seek_distance_large(target) {
self.reset_fetch_range();
}
if target > self.next_fetch_start {
self.next_fetch_start = target;
}
let mut doc = self.doc();
debug_assert!(doc <= target);
while doc < target {
doc = self.advance();
}
self.last_seek_pos_opt = Some(target);
doc
}
fn size_hint(&self) -> u32 {
0 // heuristic possible by checking number of hits when fetching a block
}
}

View File

@@ -0,0 +1,5 @@
mod fast_field_range_query;
mod range_query;
mod range_query_ip_fastfield;
pub use self::range_query::RangeQuery;

View File

@@ -6,7 +6,7 @@ use common::BitSet;
use crate::core::SegmentReader;
use crate::error::TantivyError;
use crate::query::explanation::does_not_match;
use crate::query::range_query_ip_fastfield::IPFastFieldRangeWeight;
use crate::query::range_query::range_query_ip_fastfield::IPFastFieldRangeWeight;
use crate::query::{BitSetDocSet, ConstScorer, EnableScoring, Explanation, Query, Scorer, Weight};
use crate::schema::{Field, IndexRecordOption, Term, Type};
use crate::termdict::{TermDictionary, TermStreamer};

View File

@@ -4,16 +4,15 @@
use std::net::Ipv6Addr;
use std::ops::{Bound, RangeInclusive};
use std::sync::Arc;
use common::BinarySerializable;
use fastfield_codecs::{Column, MonotonicallyMappableToU128};
use fastfield_codecs::MonotonicallyMappableToU128;
use super::fast_field_range_query::{FastFieldCardinality, RangeDocSet};
use super::range_query::map_bound;
use super::{ConstScorer, Explanation, Scorer, Weight};
use crate::fastfield::MultiValuedU128FastFieldReader;
use crate::query::{ConstScorer, Explanation, Scorer, Weight};
use crate::schema::{Cardinality, Field};
use crate::{DocId, DocSet, Score, SegmentReader, TantivyError, TERMINATED};
use crate::{DocId, DocSet, Score, SegmentReader, TantivyError};
/// `IPFastFieldRangeWeight` uses the ip address fast field to execute range queries.
pub struct IPFastFieldRangeWeight {
@@ -51,9 +50,9 @@ impl Weight for IPFastFieldRangeWeight {
ip_addr_fast_field.min_value(),
ip_addr_fast_field.max_value(),
);
let docset = IpRangeDocSet::new(
let docset = RangeDocSet::new(
value_range,
IpFastFieldCardinality::SingleValue(ip_addr_fast_field),
FastFieldCardinality::SingleValue(ip_addr_fast_field),
);
Ok(Box::new(ConstScorer::new(docset, boost)))
}
@@ -65,9 +64,9 @@ impl Weight for IPFastFieldRangeWeight {
ip_addr_fast_field.min_value(),
ip_addr_fast_field.max_value(),
);
let docset = IpRangeDocSet::new(
let docset = RangeDocSet::new(
value_range,
IpFastFieldCardinality::MultiValue(ip_addr_fast_field),
FastFieldCardinality::MultiValue(ip_addr_fast_field),
);
Ok(Box::new(ConstScorer::new(docset, boost)))
}
@@ -108,211 +107,6 @@ fn bound_to_value_range(
start_value..=end_value
}
/// Helper to have a cursor over a vec of docids
struct VecCursor {
docs: Vec<u32>,
current_pos: usize,
}
impl VecCursor {
fn new() -> Self {
Self {
docs: Vec::with_capacity(32),
current_pos: 0,
}
}
fn next(&mut self) -> Option<u32> {
self.current_pos += 1;
self.current()
}
#[inline]
fn current(&self) -> Option<u32> {
self.docs.get(self.current_pos).copied()
}
fn get_cleared_data(&mut self) -> &mut Vec<u32> {
self.docs.clear();
self.current_pos = 0;
&mut self.docs
}
fn last_value(&self) -> Option<u32> {
self.docs.iter().last().cloned()
}
fn is_empty(&self) -> bool {
self.current_pos >= self.docs.len()
}
}
pub(crate) enum IpFastFieldCardinality {
SingleValue(Arc<dyn Column<Ipv6Addr>>),
MultiValue(MultiValuedU128FastFieldReader<Ipv6Addr>),
}
impl IpFastFieldCardinality {
fn num_docs(&self) -> u32 {
match self {
IpFastFieldCardinality::SingleValue(single_value) => single_value.num_vals(),
IpFastFieldCardinality::MultiValue(multi_value) => {
multi_value.get_index_reader().num_docs()
}
}
}
}
struct IpRangeDocSet {
/// The range filter on the values.
value_range: RangeInclusive<Ipv6Addr>,
ip_addr_fast_field: IpFastFieldCardinality,
/// The next docid start range to fetch (inclusive).
next_fetch_start: u32,
/// Number of docs range checked in a batch.
///
/// There are two patterns.
/// - We do a full scan. => We can load large chunks. We don't know in advance if seek call
/// will come, so we start with small chunks
/// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we
/// should load small chunks. When the seeks are small, we can employ the same strategy as on a
/// full scan.
fetch_horizon: u32,
/// Current batch of loaded docs.
loaded_docs: VecCursor,
last_seek_pos_opt: Option<u32>,
}
const DEFAULT_FETCH_HORIZON: u32 = 128;
impl IpRangeDocSet {
fn new(
value_range: RangeInclusive<Ipv6Addr>,
ip_addr_fast_field: IpFastFieldCardinality,
) -> Self {
let mut ip_range_docset = Self {
value_range,
ip_addr_fast_field,
loaded_docs: VecCursor::new(),
next_fetch_start: 0,
fetch_horizon: DEFAULT_FETCH_HORIZON,
last_seek_pos_opt: None,
};
ip_range_docset.reset_fetch_range();
ip_range_docset.fetch_block();
ip_range_docset
}
fn reset_fetch_range(&mut self) {
self.fetch_horizon = DEFAULT_FETCH_HORIZON;
}
/// Returns true if more data could be fetched
fn fetch_block(&mut self) {
const MAX_HORIZON: u32 = 100_000;
while self.loaded_docs.is_empty() {
let finished_to_end = self.fetch_horizon(self.fetch_horizon);
if finished_to_end {
break;
}
// Fetch more data, increase horizon. Horizon only gets reset when doing a seek.
self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON);
}
}
/// check if the distance between the seek calls is large
fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool {
if let Some(last_seek_pos) = self.last_seek_pos_opt {
(new_seek - last_seek_pos) >= 128
} else {
true
}
}
/// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON]
fn fetch_horizon(&mut self, horizon: u32) -> bool {
let mut finished_to_end = false;
let limit = self.ip_addr_fast_field.num_docs();
let mut end = self.next_fetch_start + horizon;
if end >= limit {
end = limit;
finished_to_end = true;
}
match &self.ip_addr_fast_field {
IpFastFieldCardinality::MultiValue(multi) => {
let last_value = self.loaded_docs.last_value();
multi.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
self.loaded_docs.get_cleared_data(),
);
// In case of multivalues, we may have an overlap of the same docid between fetching
// blocks
if let Some(last_value) = last_value {
while self.loaded_docs.current() == Some(last_value) {
self.loaded_docs.next();
}
}
}
IpFastFieldCardinality::SingleValue(single) => {
single.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
self.loaded_docs.get_cleared_data(),
);
}
}
self.next_fetch_start = end;
finished_to_end
}
}
impl DocSet for IpRangeDocSet {
#[inline]
fn advance(&mut self) -> DocId {
if let Some(docid) = self.loaded_docs.next() {
docid
} else {
if self.next_fetch_start >= self.ip_addr_fast_field.num_docs() {
return TERMINATED;
}
self.fetch_block();
self.loaded_docs.current().unwrap_or(TERMINATED)
}
}
#[inline]
fn doc(&self) -> DocId {
self.loaded_docs.current().unwrap_or(TERMINATED)
}
/// Advances the `DocSet` forward until reaching the target, or going to the
/// lowest [`DocId`] greater than the target.
///
/// If the end of the `DocSet` is reached, [`TERMINATED`] is returned.
///
/// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation
/// of `DocSet` should support it.
///
/// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`.
fn seek(&mut self, target: DocId) -> DocId {
if self.is_last_seek_distance_large(target) {
self.reset_fetch_range();
}
if target > self.next_fetch_start {
self.next_fetch_start = target;
}
let mut doc = self.doc();
debug_assert!(doc <= target);
while doc < target {
doc = self.advance();
}
self.last_seek_pos_opt = Some(target);
doc
}
fn size_hint(&self) -> u32 {
0 // heuristic possible by checking number of hits when fetching a block
}
}
#[cfg(test)]
mod tests {
use proptest::prelude::ProptestConfig;
@@ -401,7 +195,7 @@ mod tests {
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer(3_000_000).unwrap();
let mut index_writer = index.writer(10_000_000).unwrap();
for doc in docs.iter() {
index_writer
.add_document(doc!(