Merge branch 'master' into blockwand

This commit is contained in:
Paul Masurel
2020-07-16 15:39:52 +09:00
18 changed files with 173 additions and 101 deletions

View File

@@ -117,11 +117,16 @@ fn main() -> tantivy::Result<()> {
if let Some(mut block_segment_postings) =
inverted_index.read_block_postings(&term_the, IndexRecordOption::Basic)
{
while block_segment_postings.advance() {
loop {
let docs = block_segment_postings.docs();
if docs.is_empty() {
break;
}
// Once again these docs MAY contains deleted documents as well.
let docs = block_segment_postings.docs();
// Prints `Docs [0, 2].`
println!("Docs {:?}", docs);
block_segment_postings.advance();
}
}
}

View File

@@ -213,7 +213,7 @@ pub struct IndexMeta {
#[serde(skip_serializing_if = "Option::is_none")]
/// Payload associated to the last commit.
///
/// Upon commit, clients can optionally add a small `Striing` payload to their commit
/// Upon commit, clients can optionally add a small `String` payload to their commit
/// to help identify this commit.
/// This payload is entirely unused by tantivy.
pub payload: Option<String>,

View File

@@ -38,6 +38,7 @@ pub trait DocSet {
/// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a DocSet.
fn seek(&mut self, target: DocId) -> DocId {
let mut doc = self.doc();
debug_assert!(doc <= target);
while doc < target {
doc = self.advance();
}

View File

@@ -523,7 +523,7 @@ impl SegmentUpdater {
///
/// Upon termination of the current merging threads,
/// merge opportunity may appear.
//
///
/// We keep waiting until the merge policy judges that
/// no opportunity is available.
///

View File

@@ -58,7 +58,7 @@ fn decode_vint_block(
doc_offset: DocId,
num_vint_docs: usize,
) {
doc_decoder.clear();
doc_decoder.fill(TERMINATED);
let num_consumed_bytes = doc_decoder.uncompress_vint_sorted(data, doc_offset, num_vint_docs);
if let Some(freq_decoder) = freq_decoder_opt {
freq_decoder.uncompress_vint_unsorted(&data[num_consumed_bytes..], num_vint_docs);
@@ -165,13 +165,13 @@ impl BlockSegmentPostings {
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data);
self.data = ReadOnlySource::new(postings_data);
self.loaded_offset = std::usize::MAX;
self.loaded_offset = std::usize::MAX;
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data, doc_freq);
} else {
self.skip_reader.reset(ReadOnlySource::empty(), doc_freq);
}
self.doc_freq = doc_freq;
self.load_block();
}
/// Returns the overall number of documents in the block postings.
@@ -237,6 +237,15 @@ impl BlockSegmentPostings {
self.doc_decoder.output_len
}
/// Position on a block that may contains `target_doc`.
///
/// If all docs are smaller than target, the block loaded may be empty,
/// or be the last an incomplete VInt block.
pub fn seek(&mut self, target_doc: DocId) {
self.skip_reader.seek(target_doc);
self.load_block();
}
pub(crate) fn position_offset(&self) -> u64 {
self.skip_reader.position_offset()
}
@@ -281,7 +290,14 @@ impl BlockSegmentPostings {
tf_num_bits,
);
}
BlockInfo::VInt { num_docs, .. } => {
BlockInfo::VInt { num_docs } => {
let data = {
if num_docs == 0 {
&[]
} else {
&self.data.as_slice()[offset..]
}
};
decode_vint_block(
&mut self.doc_decoder,
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
@@ -289,7 +305,7 @@ impl BlockSegmentPostings {
} else {
None
},
&self.data.as_slice()[offset..],
data,
self.skip_reader.last_doc_in_previous_block,
num_docs as usize,
);
@@ -300,10 +316,9 @@ impl BlockSegmentPostings {
/// Advance to the next block.
///
/// Returns false iff there was no remaining blocks.
pub fn advance(&mut self) -> bool {
pub fn advance(&mut self) {
self.skip_reader.advance();
self.load_block();
self.docs().len() > 0
}
/// Returns an empty segment postings object
@@ -362,7 +377,10 @@ mod tests {
#[test]
fn test_empty_block_segment_postings() {
let mut postings = BlockSegmentPostings::empty();
assert!(!postings.advance());
assert!(postings.docs().is_empty());
assert_eq!(postings.doc_freq(), 0);
postings.advance();
assert!(postings.docs().is_empty());
assert_eq!(postings.doc_freq(), 0);
}
@@ -374,13 +392,14 @@ mod tests {
assert_eq!(block_segments.doc_freq(), 100_000);
loop {
let block = block_segments.docs();
if block.is_empty() {
break;
}
for (i, doc) in block.iter().cloned().enumerate() {
assert_eq!(offset + (i as u32), doc);
}
offset += block.len() as u32;
if block_segments.advance() {
break;
}
block_segments.advance();
}
}
@@ -491,7 +510,6 @@ mod tests {
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments);
}
assert!(block_segments.advance());
assert_eq!(block_segments.docs(), &[1, 3, 5]);
}
}

View File

@@ -108,8 +108,8 @@ impl BlockDecoder {
self.output.0[idx]
}
pub fn clear(&mut self) {
self.output.0.iter_mut().for_each(|el| *el = TERMINATED);
pub fn fill(&mut self, val: u32) {
self.output.0.iter_mut().for_each(|el| *el = val);
}
}

View File

@@ -584,6 +584,9 @@ pub mod tests {
) {
for target in targets {
let mut postings_opt = postings_factory();
if target < postings_opt.doc() {
continue;
}
let mut postings_unopt = UnoptimizedDocSet::wrap(postings_factory());
let skip_result_opt = postings_opt.seek(target);
let skip_result_unopt = postings_unopt.seek(target);

View File

@@ -10,7 +10,7 @@ use crate::postings::BlockSearcher;
use crate::postings::Postings;
use crate::schema::IndexRecordOption;
use crate::{DocId, TERMINATED};
use crate::DocId;
use crate::directory::ReadOnlySource;
use crate::fieldnorm::FieldNormReader;
@@ -114,14 +114,33 @@ impl SegmentPostings {
block_searcher: BlockSearcher::default(),
}
}
}
impl DocSet for SegmentPostings {
// goes to the next element.
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> DocId {
assert!(self.block_cursor.block_is_loaded());
if self.cur == COMPRESSION_BLOCK_SIZE - 1 {
self.cur = 0;
self.block_cursor.advance();
} else {
self.cur += 1;
}
self.doc()
}
pub(crate) fn seek_after_shallow(&mut self, target: DocId) -> DocId {
self.block_cursor.load_block();
fn seek(&mut self, target: DocId) -> DocId {
debug_assert!(self.doc() <= target);
if self.doc() >= target {
return self.doc();
}
self.block_cursor.seek(target);
// At this point we are on the block, that might contain our document.
let output = self.block_cursor.docs_aligned();
self.cur = self.block_searcher.search_in_block(&output, target);
// The last block is not full and padded with the value TERMINATED,
@@ -140,32 +159,6 @@ impl SegmentPostings {
debug_assert_eq!(doc, self.doc());
doc
}
}
impl DocSet for SegmentPostings {
// goes to the next element.
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> DocId {
assert!(self.block_cursor.block_is_loaded());
if self.cur == COMPRESSION_BLOCK_SIZE - 1 {
self.cur = 0;
if !self.block_cursor.advance() {
return TERMINATED;
}
} else {
self.cur += 1;
}
self.doc()
}
fn seek(&mut self, target_doc: DocId) -> DocId {
if self.doc() == target_doc {
return target_doc;
}
self.block_cursor.shallow_seek(target_doc);
self.seek_after_shallow(target_doc)
}
/// Return the current document's `DocId`.
#[inline]

View File

@@ -92,26 +92,40 @@ impl Default for BlockInfo {
impl SkipReader {
pub fn new(data: ReadOnlySource, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader {
let mut skip_reader = SkipReader {
last_doc_in_block: 0u32,
last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
},
last_doc_in_previous_block: 0u32,
owned_read: OwnedRead::new(data),
skip_info,
block_info: BlockInfo::default(),
block_info: BlockInfo::VInt { num_docs: doc_freq },
byte_offset: 0,
remaining_docs: doc_freq,
position_offset: 0u64,
};
skip_reader.advance();
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
skip_reader.read_block_info();
}
skip_reader
}
pub fn reset(&mut self, data: ReadOnlySource, doc_freq: u32) {
self.last_doc_in_block = 0u32;
self.last_doc_in_block = if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
};
self.last_doc_in_previous_block = 0u32;
self.owned_read = OwnedRead::new(data);
self.block_info = BlockInfo::default();
self.block_info = BlockInfo::VInt { num_docs: doc_freq };
self.byte_offset = 0;
self.remaining_docs = doc_freq;
self.position_offset = 0u64;
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
}
}
pub fn block_max_score(&self, bm25_weight: &BM25Weight) -> Option<Score> {
@@ -211,8 +225,10 @@ impl SkipReader {
self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits);
self.position_offset += tf_sum as u64;
}
BlockInfo::VInt { num_docs, .. } => {
self.remaining_docs -= num_docs;
BlockInfo::VInt { num_docs} => {
debug_assert_eq!(num_docs, self.remaining_docs);
self.remaining_docs = 0;
self.byte_offset = std::usize::MAX;
}
}
self.last_doc_in_previous_block = self.last_doc_in_block;
@@ -279,6 +295,8 @@ mod tests {
assert!(matches!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 }));
skip_reader.advance();
assert!(matches!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }));
skip_reader.advance();
assert!(matches!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }));
}
#[test]
@@ -322,6 +340,8 @@ mod tests {
assert!(matches!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 }));
skip_reader.advance();
assert!(matches!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }));
skip_reader.advance();
assert!(matches!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }));
}
#[test]

View File

@@ -43,7 +43,6 @@ where
fn scorer(&self, reader: &SegmentReader, boost: f32) -> Result<Box<dyn Scorer>> {
let max_doc = reader.max_doc();
let mut doc_bitset = BitSet::with_max_value(max_doc);
let inverted_index = reader.inverted_index(self.field);
let term_dict = inverted_index.terms();
let mut term_stream = self.automaton_stream(term_dict);
@@ -52,12 +51,14 @@ where
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic);
loop {
for &doc in block_segment_postings.docs() {
doc_bitset.insert(doc);
}
if !block_segment_postings.advance() {
let docs = block_segment_postings.docs();
if docs.is_empty() {
break;
}
for &doc in docs {
doc_bitset.insert(doc);
}
block_segment_postings.advance();
}
}
let doc_bitset = BitSetDocSet::from(doc_bitset);

View File

@@ -143,7 +143,6 @@ mod tests {
.map(|doc| doc.1)
.collect::<Vec<DocId>>()
};
{
let boolean_query = BooleanQuery::from(vec![(Occur::Must, make_term_query("a"))]);
assert_eq!(matching_docs(&boolean_query), vec![0, 1, 3]);

View File

@@ -3,6 +3,11 @@ use crate::query::Scorer;
use crate::DocId;
use crate::Score;
#[inline(always)]
fn is_within<TDocSetExclude: DocSet>(docset: &mut TDocSetExclude, doc: DocId) -> bool {
docset.doc() <= doc && docset.seek(doc) == doc
}
/// Filters a given `DocSet` by removing the docs from a given `DocSet`.
///
/// The excluding docset has no impact on scoring.
@@ -23,8 +28,7 @@ where
) -> Exclude<TDocSet, TDocSetExclude> {
while underlying_docset.doc() != TERMINATED {
let target = underlying_docset.doc();
if excluding_docset.seek(target) != target {
// this document is not excluded.
if !is_within(&mut excluding_docset, target) {
break;
}
underlying_docset.advance();
@@ -36,42 +40,30 @@ where
}
}
impl<TDocSet, TDocSetExclude> Exclude<TDocSet, TDocSetExclude>
where
TDocSet: DocSet,
TDocSetExclude: DocSet,
{
/// Returns true iff the doc is not removed.
///
/// The method has to be called with non strictly
/// increasing `doc`.
fn accept(&mut self) -> bool {
let doc = self.underlying_docset.doc();
self.excluding_docset.seek(doc) != doc
}
}
impl<TDocSet, TDocSetExclude> DocSet for Exclude<TDocSet, TDocSetExclude>
where
TDocSet: DocSet,
TDocSetExclude: DocSet,
{
fn advance(&mut self) -> DocId {
while self.underlying_docset.advance() != TERMINATED {
if self.accept() {
return self.doc();
loop {
let candidate = self.underlying_docset.advance();
if candidate == TERMINATED {
return TERMINATED;
}
if !is_within(&mut self.excluding_docset, candidate) {
return candidate;
}
}
TERMINATED
}
fn seek(&mut self, target: DocId) -> DocId {
let underlying_seek_result = self.underlying_docset.seek(target);
if underlying_seek_result == TERMINATED {
let candidate = self.underlying_docset.seek(target);
if candidate == TERMINATED {
return TERMINATED;
}
if self.accept() {
return underlying_seek_result;
if !is_within(&mut self.excluding_docset, candidate) {
return candidate;
}
self.advance()
}
@@ -129,7 +121,7 @@ mod tests {
VecDocSet::from(vec![1, 2, 3, 10, 16, 24]),
))
},
vec![1, 2, 5, 8, 10, 15, 24],
vec![5, 8, 10, 15, 24],
);
}

View File

@@ -119,10 +119,9 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
continue 'outer;
}
}
assert_eq!(candidate, self.left.doc());
assert_eq!(candidate, self.right.doc());
assert!(self.others.iter().all(|docset| docset.doc() == candidate));
debug_assert_eq!(candidate, self.left.doc());
debug_assert_eq!(candidate, self.right.doc());
debug_assert!(self.others.iter().all(|docset| docset.doc() == candidate));
return candidate;
}
}
@@ -134,9 +133,7 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
docsets.push(docset);
}
let doc = go_to_first_doc(&mut docsets[..]);
for docset in docsets{
assert_eq!(docset.doc(), doc);
}
debug_assert!(docsets.iter().all(|docset| docset.doc() == doc));
debug_assert!(doc >= target);
doc
}

View File

@@ -13,9 +13,10 @@ pub mod tests {
use crate::assert_nearly_equals;
use crate::collector::tests::{TEST_COLLECTOR_WITHOUT_SCORE, TEST_COLLECTOR_WITH_SCORE};
use crate::core::Index;
use crate::query::Weight;
use crate::schema::{Schema, Term, TEXT};
use crate::DocAddress;
use crate::DocId;
use crate::{DocAddress, TERMINATED};
pub fn create_index(texts: &[&'static str]) -> Index {
let mut schema_builder = Schema::builder();
@@ -67,6 +68,23 @@ pub mod tests {
assert!(test_query(vec!["g", "a"]).is_empty());
}
#[test]
pub fn test_phrase_query_simple() -> crate::Result<()> {
let index = create_index(&["a b b d c g c", "a b a b c"]);
let text_field = index.schema().get_field("text").unwrap();
let searcher = index.reader()?.searcher();
let terms: Vec<Term> = vec!["a", "b", "c"]
.iter()
.map(|text| Term::from_field_text(text_field, text))
.collect();
let phrase_query = PhraseQuery::new(terms);
let phrase_weight = phrase_query.phrase_weight(&searcher, false)?;
let mut phrase_scorer = phrase_weight.scorer(searcher.segment_reader(0), 1.0f32)?;
assert_eq!(phrase_scorer.doc(), 1);
assert_eq!(phrase_scorer.advance(), TERMINATED);
Ok(())
}
#[test]
pub fn test_phrase_query_no_score() {
let index = create_index(&[

View File

@@ -301,12 +301,14 @@ impl Weight for RangeWeight {
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic);
loop {
let docs = block_segment_postings.docs();
if docs.is_empty() {
break;
}
for &doc in block_segment_postings.docs() {
doc_bitset.insert(doc);
}
if !block_segment_postings.advance() {
break;
}
block_segment_postings.advance();
}
}
let doc_bitset = BitSetDocSet::from(doc_bitset);

View File

@@ -72,7 +72,7 @@ where
let doc = self.doc();
let mut score_combiner = TScoreCombiner::default();
score_combiner.update(&mut self.req_scorer);
if self.opt_scorer.seek(doc) == doc {
if self.opt_scorer.doc() <= doc && self.opt_scorer.seek(doc) == doc {
score_combiner.update(&mut self.opt_scorer);
}
let score = score_combiner.score();

View File

@@ -12,11 +12,10 @@ mod tests {
use crate::assert_nearly_equals;
use crate::collector::TopDocs;
use crate::docset::DocSet;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::query::{Query, QueryParser, Scorer, TermQuery};
use crate::schema::{Field, IndexRecordOption, Schema, STRING, TEXT};
use crate::{Index, TERMINATED};
use crate::Term;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::{Term, Index, TERMINATED};
#[test]
pub fn test_term_query_no_freq() {
@@ -148,6 +147,27 @@ mod tests {
assert_eq!(term_query.count(&*reader.searcher()).unwrap(), 1);
}
#[test]
fn test_term_query_simple_seek() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a"));
index_writer.add_document(doc!(text_field=>"a"));
index_writer.commit()?;
let term_a = Term::from_field_text(text_field, "a");
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
let searcher = index.reader()?.searcher();
let term_weight = term_query.weight(&searcher, false)?;
let mut term_scorer = term_weight.scorer(searcher.segment_reader(0u32), 1.0f32)?;
assert_eq!(term_scorer.doc(), 0u32);
term_scorer.seek(1u32);
assert_eq!(term_scorer.doc(), 1u32);
Ok(())
}
#[test]
fn test_term_query_debug() {
let term_query = TermQuery::new(

View File

@@ -183,7 +183,10 @@ where
// advance all docsets to a doc >= to the target.
#[cfg_attr(feature = "cargo-clippy", allow(clippy::clippy::collapsible_if))]
unordered_drain_filter(&mut self.docsets, |docset| {
docset.seek(target) == TERMINATED
if docset.doc() < target {
docset.seek(target);
}
docset.doc() == TERMINATED
});
// at this point all of the docsets