AllQuery handling deletes, better tests

This commit is contained in:
Paul Masurel
2018-05-01 21:12:17 -07:00
parent 0e68c4ac34
commit 967cf2cb02
6 changed files with 129 additions and 51 deletions

View File

@@ -200,7 +200,6 @@ pub fn advance_deletes(
target_opstamp: u64,
) -> Result<Option<FileProtection>> {
let mut file_protect: Option<FileProtection> = None;
{
if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() {
// We are already up-to-date here.
@@ -241,7 +240,6 @@ pub fn advance_deletes(
}
}
segment_entry.set_meta(segment.meta().clone());
Ok(file_protect)
}

View File

@@ -351,7 +351,7 @@ impl IndexMerger {
for doc in 0..segment_reader.max_doc() {
if !delete_bitset.is_deleted(doc) {
ff_reader.get_vals(doc, &mut vals);
for prev_term_ord in vals.iter().cloned() {
for &prev_term_ord in &vals {
let new_term_ord = term_ordinal_mapping[prev_term_ord as usize];
serialize_vals.add_val(new_term_ord)?;
}
@@ -399,7 +399,7 @@ impl IndexMerger {
for doc in 0..segment_reader.max_doc() {
if !delete_bitset.is_deleted(doc) {
ff_reader.get_vals(doc, &mut vals);
for val in vals {
for &val in &vals {
serialize_vals.add_val(val)?;
}
}
@@ -612,6 +612,9 @@ mod tests {
use schema::IndexRecordOption;
use schema::Cardinality;
use futures::Future;
use IndexWriter;
use query::AllQuery;
use collector::FacetCollector;
#[test]
fn test_index_merger_no_deletes() {
@@ -1052,37 +1055,35 @@ mod tests {
use schema::Facet;
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_facet(facet_field, Facet::from("/top/tip"));
let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
let mut doc = Document::default();
for facet in doc_facets {
doc.add_facet(facet_field, Facet::from(facet));
}
index_writer.add_document(doc);
}
{
let mut doc = Document::new();
doc.add_facet(facet_field, Facet::from("/top/tap"));
index_writer.add_document(doc);
}
{
let mut doc = Document::new();
doc.add_facet(facet_field, Facet::from("/tap/tip"));
index_writer.add_document(doc);
}
};
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b"]);
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b", "/top/c"]);
index_doc(&mut index_writer, &["/top/a", "/top/b"]);
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b", "/top/d"]);
index_doc(&mut index_writer, &["/top/d"]);
index_doc(&mut index_writer, &["/top/e"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b"]);
index_doc(&mut index_writer, &["/top/c"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/e", "/top/f"]);
index_writer.commit().expect("committed");
{
index_writer.add_document(doc!(
facet_field=>Facet::from("/top/tap/toup")
));
index_writer.add_document(doc!(
facet_field=>Facet::from("/top/tup")
));
index_writer.commit().expect("committed");
}
}
index.load_searchers().unwrap();
let test_searcher = || {
let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
let searcher = index.searcher();
use query::AllQuery;
use collector::FacetCollector;
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet(Facet::from("/top"));
use collector::{MultiCollector, CountCollector};
@@ -1091,23 +1092,29 @@ mod tests {
let mut multi_collectors = MultiCollector::from(vec![&mut count_collector, &mut facet_collector]);
searcher.search(&AllQuery, &mut multi_collectors).unwrap();
}
assert_eq!(count_collector.count(), 5);
assert_eq!(count_collector.count(), expected_num_docs);
let facet_counts = facet_collector.harvest();
let facets: Vec<(String, u64)> = facet_counts.get("/top")
.map(|(facet, count)| (facet.to_string(), count))
.collect();
assert_eq!(
facets,
[
("/top/tap", 2),
("/top/tip", 1),
("/top/tup", 1),
].iter()
expected
.iter()
.map(|&(facet_str, count)| (String::from(facet_str), count))
.collect::<Vec<_>>()
);
};
test_searcher();
test_searcher(11, &[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1)
]);
// Merging the segments
{
let segment_ids = index
.searchable_segment_ids()
@@ -1118,9 +1125,36 @@ mod tests {
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
index.load_searchers().unwrap();
test_searcher(11, &[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1)
]);
}
index.load_searchers().unwrap();
test_searcher();
// Deleting one term
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
let facet_term = Term::from_facet(facet_field, &facet);
index_writer.delete_term(facet_term);
index_writer.commit().unwrap();
index.load_searchers().unwrap();
test_searcher(9, &[
("/top/a", 3),
("/top/b", 3),
("/top/c", 1),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1)
]);
}
}
}

View File

@@ -7,6 +7,7 @@ use Result;
use Score;
use DocId;
use core::Searcher;
use fastfield::DeleteBitSet;
/// Query that matches all of the documents.
///
@@ -26,28 +27,52 @@ pub struct AllWeight;
impl Weight for AllWeight {
fn scorer(&self, reader: &SegmentReader) -> Result<Box<Scorer>> {
Ok(box AllScorer {
started: false,
state: State::NotStarted,
doc: 0u32,
max_doc: reader.max_doc(),
deleted_bitset: reader.delete_bitset().clone()
})
}
}
enum State {
NotStarted,
Started,
Finished
}
/// Scorer associated to the `AllQuery` query.
pub struct AllScorer {
started: bool,
state: State,
doc: DocId,
max_doc: DocId,
deleted_bitset: DeleteBitSet
}
impl DocSet for AllScorer {
fn advance(&mut self) -> bool {
if self.started {
self.doc += 1u32;
} else {
self.started = true;
loop {
match self.state {
State::NotStarted => {
self.state = State::Started;
self.doc = 0;
}
State::Started => {
self.doc += 1u32;
}
State::Finished => {
return false;
}
}
if self.doc < self.max_doc {
if !self.deleted_bitset.is_deleted(self.doc) {
return true;
}
} else {
self.state = State::Finished;
return false;
}
}
self.doc < self.max_doc
}
fn doc(&self) -> DocId {

View File

@@ -212,6 +212,14 @@ mod tests {
assert!(Facet::root().is_root());
}
#[test]
fn test_from_path() {
assert_eq!(
Facet::from_path(vec!["top", "a", "firstdoc"]),
Facet::from("/top/a/firstdoc")
);
}
#[test]
fn test_facet_display() {
{

View File

@@ -4,6 +4,7 @@ use common;
use byteorder::{BigEndian, ByteOrder};
use super::Field;
use std::str;
use schema::Facet;
/// Size (in bytes) of the buffer of a int field.
const INT_TERM_LEN: usize = 4 + 8;
@@ -29,6 +30,16 @@ impl Term {
Term::from_field_u64(field, val_u64)
}
/// Creates a `Term` given a facet.
pub fn from_facet(field: Field, facet: &Facet) -> Term {
let bytes = facet.encoded_bytes();
let buffer = Vec::with_capacity(4 + bytes.len());
let mut term = Term(buffer);
term.set_field(field);
term.set_bytes(bytes);
term
}
/// Builds a term given a field, and a string value
///
/// Assuming the term has a field id of 2, and a text value of "abc",
@@ -91,10 +102,14 @@ impl Term {
self.set_u64(common::i64_to_u64(val));
}
fn set_bytes(&mut self, bytes: &[u8]) {
self.0.resize(4, 0u8);
self.0.extend(bytes);
}
/// Set the texts only, keeping the field untouched.
pub fn set_text(&mut self, text: &str) {
self.0.resize(4, 0u8);
self.0.extend(text.as_bytes());
self.set_bytes(text.as_bytes());
}
}

View File

@@ -31,8 +31,6 @@ impl<'a> Ord for HeapItem<'a> {
}
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
@@ -42,7 +40,7 @@ impl<'a> Ord for HeapItem<'a> {
/// the terms.
pub struct TermMerger<'a> {
heap: BinaryHeap<HeapItem<'a>>,
current_streamers: Vec<HeapItem<'a>>
current_streamers: Vec<HeapItem<'a>>,
}
impl<'a> TermMerger<'a> {