mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
Merge pull request #1154 from PSeitz/delete_bitset
add DeleteBitSet iterator
This commit is contained in:
@@ -10,6 +10,7 @@ description = "common traits and utility functions used by multiple tantivy subc
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.4.3"
|
||||
ownedbytes = { version="0.1", path="../ownedbytes" }
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1.0.0"
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use std::fmt;
|
||||
use ownedbytes::OwnedBytes;
|
||||
use std::convert::TryInto;
|
||||
use std::io::Write;
|
||||
use std::u64;
|
||||
use std::{fmt, io};
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
pub struct TinySet(u64);
|
||||
@@ -14,6 +17,7 @@ pub struct TinySetIterator(TinySet);
|
||||
impl Iterator for TinySetIterator {
|
||||
type Item = u32;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.0.pop_lowest()
|
||||
}
|
||||
@@ -28,30 +32,54 @@ impl IntoIterator for TinySet {
|
||||
}
|
||||
|
||||
impl TinySet {
|
||||
pub fn serialize<T: Write>(&self, writer: &mut T) -> io::Result<()> {
|
||||
writer.write_all(self.0.to_le_bytes().as_ref())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn deserialize(data: [u8; 8]) -> io::Result<Self> {
|
||||
let val: u64 = u64::from_le_bytes(data);
|
||||
Ok(TinySet(val))
|
||||
}
|
||||
|
||||
/// Returns an empty `TinySet`.
|
||||
#[inline]
|
||||
pub fn empty() -> TinySet {
|
||||
TinySet(0u64)
|
||||
}
|
||||
|
||||
/// Returns a full `TinySet`.
|
||||
#[inline]
|
||||
pub fn full() -> TinySet {
|
||||
TinySet::empty().complement()
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.0 = 0u64;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns the complement of the set in `[0, 64[`.
|
||||
///
|
||||
/// Careful on making this function public, as it will break the padding handling in the last
|
||||
/// bucket.
|
||||
fn complement(self) -> TinySet {
|
||||
TinySet(!self.0)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns true iff the `TinySet` contains the element `el`.
|
||||
pub fn contains(self, el: u32) -> bool {
|
||||
!self.intersect(TinySet::singleton(el)).is_empty()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns the number of elements in the TinySet.
|
||||
pub fn len(self) -> u32 {
|
||||
self.0.count_ones()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns the intersection of `self` and `other`
|
||||
pub fn intersect(self, other: TinySet) -> TinySet {
|
||||
TinySet(self.0 & other.0)
|
||||
@@ -64,13 +92,21 @@ impl TinySet {
|
||||
TinySet(1u64 << u64::from(el))
|
||||
}
|
||||
|
||||
/// Insert a new element within [0..64[
|
||||
/// Insert a new element within [0..64)
|
||||
#[inline]
|
||||
pub fn insert(self, el: u32) -> TinySet {
|
||||
self.union(TinySet::singleton(el))
|
||||
}
|
||||
|
||||
/// Insert a new element within [0..64[
|
||||
/// Removes an element within [0..64)
|
||||
#[inline]
|
||||
pub fn remove(self, el: u32) -> TinySet {
|
||||
self.intersect(TinySet::singleton(el).complement())
|
||||
}
|
||||
|
||||
/// Insert a new element within [0..64)
|
||||
///
|
||||
/// returns true if the set changed
|
||||
#[inline]
|
||||
pub fn insert_mut(&mut self, el: u32) -> bool {
|
||||
let old = *self;
|
||||
@@ -78,6 +114,16 @@ impl TinySet {
|
||||
old != *self
|
||||
}
|
||||
|
||||
/// Remove a element within [0..64)
|
||||
///
|
||||
/// returns true if the set changed
|
||||
#[inline]
|
||||
pub fn remove_mut(&mut self, el: u32) -> bool {
|
||||
let old = *self;
|
||||
*self = old.remove(el);
|
||||
old != *self
|
||||
}
|
||||
|
||||
/// Returns the union of two tinysets
|
||||
#[inline]
|
||||
pub fn union(self, other: TinySet) -> TinySet {
|
||||
@@ -123,7 +169,7 @@ impl TinySet {
|
||||
#[derive(Clone)]
|
||||
pub struct BitSet {
|
||||
tinysets: Box<[TinySet]>,
|
||||
len: usize,
|
||||
len: u64,
|
||||
max_value: u32,
|
||||
}
|
||||
|
||||
@@ -132,8 +178,41 @@ fn num_buckets(max_val: u32) -> u32 {
|
||||
}
|
||||
|
||||
impl BitSet {
|
||||
/// serialize a `BitSet`.
|
||||
///
|
||||
pub fn serialize<T: Write>(&self, writer: &mut T) -> io::Result<()> {
|
||||
writer.write_all(self.max_value.to_le_bytes().as_ref())?;
|
||||
|
||||
for tinyset in self.tinysets.iter() {
|
||||
tinyset.serialize(writer)?;
|
||||
}
|
||||
writer.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserialize a `BitSet`.
|
||||
///
|
||||
#[cfg(test)]
|
||||
pub fn deserialize(mut data: &[u8]) -> io::Result<Self> {
|
||||
let max_value: u32 = u32::from_le_bytes(data[..4].try_into().unwrap());
|
||||
data = &data[4..];
|
||||
|
||||
let mut len: u64 = 0;
|
||||
let mut tinysets = vec![];
|
||||
for chunk in data.chunks_exact(8) {
|
||||
let tinyset = TinySet::deserialize(chunk.try_into().unwrap())?;
|
||||
len += tinyset.len() as u64;
|
||||
tinysets.push(tinyset);
|
||||
}
|
||||
Ok(BitSet {
|
||||
tinysets: tinysets.into_boxed_slice(),
|
||||
len,
|
||||
max_value,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new `BitSet` that may contain elements
|
||||
/// within `[0, max_val[`.
|
||||
/// within `[0, max_val)`.
|
||||
pub fn with_max_value(max_value: u32) -> BitSet {
|
||||
let num_buckets = num_buckets(max_value);
|
||||
let tinybisets = vec![TinySet::empty(); num_buckets as usize].into_boxed_slice();
|
||||
@@ -144,6 +223,23 @@ impl BitSet {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `BitSet` that may contain elements. Initially all values will be set.
|
||||
/// within `[0, max_val)`.
|
||||
pub fn with_max_value_and_full(max_value: u32) -> BitSet {
|
||||
let num_buckets = num_buckets(max_value);
|
||||
let mut tinybisets = vec![TinySet::full(); num_buckets as usize].into_boxed_slice();
|
||||
|
||||
// Fix padding
|
||||
let lower = max_value % 64u32;
|
||||
tinybisets[tinybisets.len() - 1] = TinySet::range_lower(lower);
|
||||
|
||||
BitSet {
|
||||
tinysets: tinybisets,
|
||||
len: max_value as u64,
|
||||
max_value,
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes all elements from the `BitSet`.
|
||||
pub fn clear(&mut self) {
|
||||
for tinyset in self.tinysets.iter_mut() {
|
||||
@@ -153,10 +249,11 @@ impl BitSet {
|
||||
|
||||
/// Returns the number of elements in the `BitSet`.
|
||||
pub fn len(&self) -> usize {
|
||||
self.len
|
||||
self.len as usize
|
||||
}
|
||||
|
||||
/// Inserts an element in the `BitSet`
|
||||
#[inline]
|
||||
pub fn insert(&mut self, el: u32) {
|
||||
// we do not check saturated els.
|
||||
let higher = el / 64u32;
|
||||
@@ -168,7 +265,21 @@ impl BitSet {
|
||||
};
|
||||
}
|
||||
|
||||
/// Inserts an element in the `BitSet`
|
||||
#[inline]
|
||||
pub fn remove(&mut self, el: u32) {
|
||||
// we do not check saturated els.
|
||||
let higher = el / 64u32;
|
||||
let lower = el % 64u32;
|
||||
self.len -= if self.tinysets[higher as usize].remove_mut(lower) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
}
|
||||
|
||||
/// Returns true iff the elements is in the `BitSet`.
|
||||
#[inline]
|
||||
pub fn contains(&self, el: u32) -> bool {
|
||||
self.tinyset(el / 64u32).contains(el % 64)
|
||||
}
|
||||
@@ -198,16 +309,144 @@ impl BitSet {
|
||||
}
|
||||
}
|
||||
|
||||
/// Lazy Read a serialized BitSet.
|
||||
#[derive(Clone)]
|
||||
pub struct ReadSerializedBitSet {
|
||||
data: OwnedBytes,
|
||||
max_value: u32,
|
||||
}
|
||||
|
||||
impl ReadSerializedBitSet {
|
||||
pub fn open(data: OwnedBytes) -> Self {
|
||||
let (max_value_data, data) = data.split(4);
|
||||
let max_value: u32 = u32::from_le_bytes(max_value_data.as_ref().try_into().unwrap());
|
||||
ReadSerializedBitSet { data, max_value }
|
||||
}
|
||||
|
||||
/// Count the number of unset bits from serialized data.
|
||||
///
|
||||
#[inline]
|
||||
pub fn count_unset(&self) -> usize {
|
||||
let num_set: usize = self
|
||||
.iter_tinysets()
|
||||
.map(|tinyset| tinyset.len() as usize)
|
||||
.sum();
|
||||
self.max_value as usize - num_set
|
||||
}
|
||||
|
||||
/// Iterate the tinyset on the fly from serialized data.
|
||||
///
|
||||
#[inline]
|
||||
fn iter_tinysets<'a>(&'a self) -> impl Iterator<Item = TinySet> + 'a {
|
||||
assert!((self.data.len()) % 8 == 0);
|
||||
self.data.chunks_exact(8).map(move |chunk| {
|
||||
let tinyset: TinySet = TinySet::deserialize(chunk.try_into().unwrap()).unwrap();
|
||||
tinyset
|
||||
})
|
||||
}
|
||||
|
||||
/// Iterate over the positions of the elements.
|
||||
///
|
||||
#[inline]
|
||||
pub fn iter<'a>(&'a self) -> impl Iterator<Item = u32> + 'a {
|
||||
self.iter_tinysets()
|
||||
.enumerate()
|
||||
.flat_map(move |(chunk_num, tinyset)| {
|
||||
let chunk_base_val = chunk_num as u32 * 64;
|
||||
tinyset
|
||||
.into_iter()
|
||||
.map(move |val| val + chunk_base_val)
|
||||
.take_while(move |doc| *doc < self.max_value)
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns true iff the elements is in the `BitSet`.
|
||||
#[inline]
|
||||
pub fn contains(&self, el: u32) -> bool {
|
||||
let byte_offset = el / 8u32;
|
||||
let b: u8 = self.data[byte_offset as usize];
|
||||
let shift = (el % 8) as u8;
|
||||
b & (1u8 << shift) != 0
|
||||
}
|
||||
|
||||
/// Returns the max_value.
|
||||
#[inline]
|
||||
pub fn max_value(&self) -> u32 {
|
||||
self.max_value
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::BitSet;
|
||||
use super::ReadSerializedBitSet;
|
||||
use super::TinySet;
|
||||
use ownedbytes::OwnedBytes;
|
||||
use rand::distributions::Bernoulli;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use std::collections::HashSet;
|
||||
use std::convert::TryInto;
|
||||
|
||||
#[test]
|
||||
fn test_read_serialized_bitset_full() {
|
||||
let mut bitset = BitSet::with_max_value_and_full(5);
|
||||
bitset.remove(3);
|
||||
let mut out = vec![];
|
||||
bitset.serialize(&mut out).unwrap();
|
||||
|
||||
let bitset = ReadSerializedBitSet::open(OwnedBytes::new(out));
|
||||
assert_eq!(bitset.count_unset(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_serialized_bitset_empty() {
|
||||
let mut bitset = BitSet::with_max_value(5);
|
||||
bitset.insert(3);
|
||||
let mut out = vec![];
|
||||
bitset.serialize(&mut out).unwrap();
|
||||
|
||||
let bitset = ReadSerializedBitSet::open(OwnedBytes::new(out));
|
||||
assert_eq!(bitset.count_unset(), 4);
|
||||
|
||||
{
|
||||
let bitset = BitSet::with_max_value(5);
|
||||
let mut out = vec![];
|
||||
bitset.serialize(&mut out).unwrap();
|
||||
|
||||
let bitset = ReadSerializedBitSet::open(OwnedBytes::new(out));
|
||||
assert_eq!(bitset.count_unset(), 5);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tiny_set_remove() {
|
||||
{
|
||||
let mut u = TinySet::empty().insert(63u32).insert(5).remove(63u32);
|
||||
assert_eq!(u.pop_lowest(), Some(5u32));
|
||||
assert!(u.pop_lowest().is_none());
|
||||
}
|
||||
{
|
||||
let mut u = TinySet::empty()
|
||||
.insert(63u32)
|
||||
.insert(1)
|
||||
.insert(5)
|
||||
.remove(63u32);
|
||||
assert_eq!(u.pop_lowest(), Some(1u32));
|
||||
assert_eq!(u.pop_lowest(), Some(5u32));
|
||||
assert!(u.pop_lowest().is_none());
|
||||
}
|
||||
{
|
||||
let mut u = TinySet::empty().insert(1).remove(63u32);
|
||||
assert_eq!(u.pop_lowest(), Some(1u32));
|
||||
assert!(u.pop_lowest().is_none());
|
||||
}
|
||||
{
|
||||
let mut u = TinySet::empty().insert(1).remove(1u32);
|
||||
assert!(u.pop_lowest().is_none());
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_tiny_set() {
|
||||
assert!(TinySet::empty().is_empty());
|
||||
@@ -233,6 +472,21 @@ mod tests {
|
||||
assert_eq!(u.pop_lowest(), Some(63u32));
|
||||
assert!(u.pop_lowest().is_none());
|
||||
}
|
||||
{
|
||||
let mut u = TinySet::empty().insert(63u32).insert(5);
|
||||
assert_eq!(u.pop_lowest(), Some(5u32));
|
||||
assert_eq!(u.pop_lowest(), Some(63u32));
|
||||
assert!(u.pop_lowest().is_none());
|
||||
}
|
||||
{
|
||||
let u = TinySet::empty().insert(63u32).insert(5);
|
||||
let mut data = vec![];
|
||||
u.serialize(&mut data).unwrap();
|
||||
let mut u = TinySet::deserialize(data[..8].try_into().unwrap()).unwrap();
|
||||
assert_eq!(u.pop_lowest(), Some(5u32));
|
||||
assert_eq!(u.pop_lowest(), Some(63u32));
|
||||
assert!(u.pop_lowest().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -249,6 +503,16 @@ mod tests {
|
||||
assert_eq!(hashset.contains(&el), bitset.contains(el));
|
||||
}
|
||||
assert_eq!(bitset.max_value(), max_value);
|
||||
|
||||
// test deser
|
||||
let mut data = vec![];
|
||||
bitset.serialize(&mut data).unwrap();
|
||||
let bitset = BitSet::deserialize(&data).unwrap();
|
||||
for el in 0..max_value {
|
||||
assert_eq!(hashset.contains(&el), bitset.contains(el));
|
||||
}
|
||||
assert_eq!(bitset.max_value(), max_value);
|
||||
assert_eq!(bitset.len(), els.len());
|
||||
};
|
||||
|
||||
test_against_hashset(&[], 0);
|
||||
@@ -313,6 +577,14 @@ mod tests {
|
||||
assert_eq!(bitset.len(), 2);
|
||||
bitset.insert(104u32);
|
||||
assert_eq!(bitset.len(), 3);
|
||||
bitset.remove(105u32);
|
||||
assert_eq!(bitset.len(), 3);
|
||||
bitset.remove(104u32);
|
||||
assert_eq!(bitset.len(), 2);
|
||||
bitset.remove(3u32);
|
||||
assert_eq!(bitset.len(), 1);
|
||||
bitset.remove(103u32);
|
||||
assert_eq!(bitset.len(), 0);
|
||||
}
|
||||
|
||||
pub fn sample_with_seed(n: u32, ratio: f64, seed_val: u8) -> Vec<u32> {
|
||||
|
||||
@@ -178,9 +178,9 @@ pub trait Collector: Sync + Send {
|
||||
) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
|
||||
let mut segment_collector = self.for_segment(segment_ord as u32, reader)?;
|
||||
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
if let Some(alive_bitset) = reader.alive_bitset() {
|
||||
weight.for_each(reader, &mut |doc, score| {
|
||||
if delete_bitset.is_alive(doc) {
|
||||
if alive_bitset.is_alive(doc) {
|
||||
segment_collector.collect(doc, score);
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -629,10 +629,10 @@ impl Collector for TopDocs {
|
||||
let heap_len = self.0.limit + self.0.offset;
|
||||
let mut heap: BinaryHeap<ComparableDoc<Score, DocId>> = BinaryHeap::with_capacity(heap_len);
|
||||
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
if let Some(alive_bitset) = reader.alive_bitset() {
|
||||
let mut threshold = Score::MIN;
|
||||
weight.for_each_pruning(threshold, reader, &mut |doc, score| {
|
||||
if delete_bitset.is_deleted(doc) {
|
||||
if alive_bitset.is_deleted(doc) {
|
||||
return threshold;
|
||||
}
|
||||
let heap_item = ComparableDoc {
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::core::SegmentId;
|
||||
use crate::directory::CompositeFile;
|
||||
use crate::directory::FileSlice;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::fastfield::FacetReader;
|
||||
use crate::fastfield::FastFieldReaders;
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders};
|
||||
@@ -47,7 +47,7 @@ pub struct SegmentReader {
|
||||
fieldnorm_readers: FieldNormReaders,
|
||||
|
||||
store_file: FileSlice,
|
||||
delete_bitset_opt: Option<DeleteBitSet>,
|
||||
alive_bitset_opt: Option<AliveBitSet>,
|
||||
schema: Schema,
|
||||
}
|
||||
|
||||
@@ -72,14 +72,14 @@ impl SegmentReader {
|
||||
/// Return the number of documents that have been
|
||||
/// deleted in the segment.
|
||||
pub fn num_deleted_docs(&self) -> DocId {
|
||||
self.delete_bitset()
|
||||
.map(|delete_set| delete_set.num_deleted() as DocId)
|
||||
self.alive_bitset()
|
||||
.map(|alive_set| alive_set.num_deleted() as DocId)
|
||||
.unwrap_or(0u32)
|
||||
}
|
||||
|
||||
/// Returns true iff some of the documents of the segment have been deleted.
|
||||
pub fn has_deletes(&self) -> bool {
|
||||
self.delete_bitset().is_some()
|
||||
self.alive_bitset().is_some()
|
||||
}
|
||||
|
||||
/// Accessor to a segment's fast field reader given a field.
|
||||
@@ -170,10 +170,10 @@ impl SegmentReader {
|
||||
let fieldnorm_data = segment.open_read(SegmentComponent::FieldNorms)?;
|
||||
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
|
||||
|
||||
let delete_bitset_opt = if segment.meta().has_deletes() {
|
||||
let alive_bitset_opt = if segment.meta().has_deletes() {
|
||||
let delete_data = segment.open_read(SegmentComponent::Delete)?;
|
||||
let delete_bitset = DeleteBitSet::open(delete_data)?;
|
||||
Some(delete_bitset)
|
||||
let alive_bitset = AliveBitSet::open(delete_data)?;
|
||||
Some(alive_bitset)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -188,7 +188,7 @@ impl SegmentReader {
|
||||
fieldnorm_readers,
|
||||
segment_id: segment.id(),
|
||||
store_file,
|
||||
delete_bitset_opt,
|
||||
alive_bitset_opt,
|
||||
positions_composite,
|
||||
schema,
|
||||
})
|
||||
@@ -274,21 +274,25 @@ impl SegmentReader {
|
||||
|
||||
/// Returns the bitset representing
|
||||
/// the documents that have been deleted.
|
||||
pub fn delete_bitset(&self) -> Option<&DeleteBitSet> {
|
||||
self.delete_bitset_opt.as_ref()
|
||||
pub fn alive_bitset(&self) -> Option<&AliveBitSet> {
|
||||
self.alive_bitset_opt.as_ref()
|
||||
}
|
||||
|
||||
/// Returns true iff the `doc` is marked
|
||||
/// as deleted.
|
||||
pub fn is_deleted(&self, doc: DocId) -> bool {
|
||||
self.delete_bitset()
|
||||
self.alive_bitset()
|
||||
.map(|delete_set| delete_set.is_deleted(doc))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Returns an iterator that will iterate over the alive document ids
|
||||
pub fn doc_ids_alive(&self) -> impl Iterator<Item = DocId> + '_ {
|
||||
(0u32..self.max_doc).filter(move |doc| !self.is_deleted(*doc))
|
||||
pub fn doc_ids_alive(&self) -> Box<dyn Iterator<Item = DocId> + '_> {
|
||||
if let Some(alive_bitset) = &self.alive_bitset_opt {
|
||||
Box::new(alive_bitset.iter_alive())
|
||||
} else {
|
||||
Box::new(0u32..self.max_doc)
|
||||
}
|
||||
}
|
||||
|
||||
/// Summarize total space usage of this segment.
|
||||
@@ -301,9 +305,9 @@ impl SegmentReader {
|
||||
self.fast_fields_readers.space_usage(),
|
||||
self.fieldnorm_readers.space_usage(),
|
||||
self.get_store_reader()?.space_usage(),
|
||||
self.delete_bitset_opt
|
||||
self.alive_bitset_opt
|
||||
.as_ref()
|
||||
.map(DeleteBitSet::space_usage)
|
||||
.map(AliveBitSet::space_usage)
|
||||
.unwrap_or(0),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::DocId;
|
||||
use std::borrow::Borrow;
|
||||
use std::borrow::BorrowMut;
|
||||
@@ -85,11 +85,11 @@ pub trait DocSet: Send {
|
||||
|
||||
/// Returns the number documents matching.
|
||||
/// Calling this method consumes the `DocSet`.
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
let mut count = 0u32;
|
||||
let mut doc = self.doc();
|
||||
while doc != TERMINATED {
|
||||
if !delete_bitset.is_deleted(doc) {
|
||||
if alive_bitset.is_alive(doc) {
|
||||
count += 1u32;
|
||||
}
|
||||
doc = self.advance();
|
||||
@@ -130,8 +130,8 @@ impl<'a> DocSet for &'a mut dyn DocSet {
|
||||
(**self).size_hint()
|
||||
}
|
||||
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
(**self).count(delete_bitset)
|
||||
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
(**self).count(alive_bitset)
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
@@ -160,9 +160,9 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
|
||||
unboxed.size_hint()
|
||||
}
|
||||
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
let unboxed: &mut TDocSet = self.borrow_mut();
|
||||
unboxed.count(delete_bitset)
|
||||
unboxed.count(alive_bitset)
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
|
||||
208
src/fastfield/alive_bitset.rs
Normal file
208
src/fastfield/alive_bitset.rs
Normal file
@@ -0,0 +1,208 @@
|
||||
use crate::directory::FileSlice;
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::space_usage::ByteCount;
|
||||
use crate::DocId;
|
||||
use common::BitSet;
|
||||
use common::ReadSerializedBitSet;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
/// Write a alive `BitSet`
|
||||
///
|
||||
/// where `alive_bitset` is the set of alive `DocId`.
|
||||
/// Warning: this function does not call terminate. The caller is in charge of
|
||||
/// closing the writer properly.
|
||||
pub fn write_alive_bitset<T: Write>(alive_bitset: &BitSet, writer: &mut T) -> io::Result<()> {
|
||||
alive_bitset.serialize(writer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set of alive `DocId`s.
|
||||
#[derive(Clone)]
|
||||
pub struct AliveBitSet {
|
||||
data: OwnedBytes,
|
||||
num_deleted: usize,
|
||||
bitset: ReadSerializedBitSet,
|
||||
}
|
||||
|
||||
impl AliveBitSet {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn for_test(deleted_docs: &[DocId], max_doc: u32) -> AliveBitSet {
|
||||
use crate::directory::{Directory, RamDirectory, TerminatingWrite};
|
||||
use std::path::Path;
|
||||
assert!(deleted_docs.iter().all(|&doc| doc < max_doc));
|
||||
let mut bitset = BitSet::with_max_value_and_full(max_doc);
|
||||
for &doc in deleted_docs {
|
||||
bitset.remove(doc);
|
||||
}
|
||||
let directory = RamDirectory::create();
|
||||
let path = Path::new("dummydeletebitset");
|
||||
let mut wrt = directory.open_write(path).unwrap();
|
||||
write_alive_bitset(&bitset, &mut wrt).unwrap();
|
||||
wrt.terminate().unwrap();
|
||||
let file = directory.open_read(path).unwrap();
|
||||
Self::open(file).unwrap()
|
||||
}
|
||||
|
||||
/// Opens a delete bitset given its file.
|
||||
pub fn open(file: FileSlice) -> crate::Result<AliveBitSet> {
|
||||
let bytes = file.read_bytes()?;
|
||||
let bitset = ReadSerializedBitSet::open(bytes.clone());
|
||||
let num_deleted = bitset.count_unset();
|
||||
|
||||
Ok(AliveBitSet {
|
||||
data: bytes,
|
||||
num_deleted,
|
||||
bitset,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns true iff the document is still "alive". In other words, if it has not been deleted.
|
||||
#[inline]
|
||||
pub fn is_alive(&self, doc: DocId) -> bool {
|
||||
self.bitset.contains(doc)
|
||||
}
|
||||
|
||||
/// Returns true iff the document has been marked as deleted.
|
||||
#[inline]
|
||||
pub fn is_deleted(&self, doc: DocId) -> bool {
|
||||
!self.is_alive(doc)
|
||||
}
|
||||
|
||||
/// Iterate over the alive docids.
|
||||
#[inline]
|
||||
pub fn iter_alive(&self) -> impl Iterator<Item = DocId> + '_ {
|
||||
self.bitset.iter()
|
||||
}
|
||||
|
||||
/// Get underlying bitset
|
||||
#[inline]
|
||||
pub fn bitset(&self) -> &ReadSerializedBitSet {
|
||||
&self.bitset
|
||||
}
|
||||
|
||||
/// The number of deleted docs
|
||||
pub fn num_deleted(&self) -> usize {
|
||||
self.num_deleted
|
||||
}
|
||||
/// Summarize total space usage of this bitset.
|
||||
pub fn space_usage(&self) -> ByteCount {
|
||||
self.data.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::AliveBitSet;
|
||||
|
||||
#[test]
|
||||
fn test_alive_bitset_empty() {
|
||||
let alive_bitset = AliveBitSet::for_test(&[], 10);
|
||||
for doc in 0..10 {
|
||||
assert_eq!(alive_bitset.is_deleted(doc), !alive_bitset.is_alive(doc));
|
||||
}
|
||||
assert_eq!(alive_bitset.num_deleted(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alive_bitset() {
|
||||
let alive_bitset = AliveBitSet::for_test(&[1, 9], 10);
|
||||
assert!(alive_bitset.is_alive(0));
|
||||
assert!(alive_bitset.is_deleted(1));
|
||||
assert!(alive_bitset.is_alive(2));
|
||||
assert!(alive_bitset.is_alive(3));
|
||||
assert!(alive_bitset.is_alive(4));
|
||||
assert!(alive_bitset.is_alive(5));
|
||||
assert!(alive_bitset.is_alive(6));
|
||||
assert!(alive_bitset.is_alive(6));
|
||||
assert!(alive_bitset.is_alive(7));
|
||||
assert!(alive_bitset.is_alive(8));
|
||||
assert!(alive_bitset.is_deleted(9));
|
||||
for doc in 0..10 {
|
||||
assert_eq!(alive_bitset.is_deleted(doc), !alive_bitset.is_alive(doc));
|
||||
}
|
||||
assert_eq!(alive_bitset.num_deleted(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alive_bitset_iter_minimal() {
|
||||
let alive_bitset = AliveBitSet::for_test(&[7], 8);
|
||||
|
||||
let data: Vec<_> = alive_bitset.iter_alive().collect();
|
||||
assert_eq!(data, vec![0, 1, 2, 3, 4, 5, 6]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alive_bitset_iter_small() {
|
||||
let alive_bitset = AliveBitSet::for_test(&[0, 2, 3, 6], 7);
|
||||
|
||||
let data: Vec<_> = alive_bitset.iter_alive().collect();
|
||||
assert_eq!(data, vec![1, 4, 5]);
|
||||
}
|
||||
#[test]
|
||||
fn test_alive_bitset_iter() {
|
||||
let alive_bitset = AliveBitSet::for_test(&[0, 1, 1000], 1001);
|
||||
|
||||
let data: Vec<_> = alive_bitset.iter_alive().collect();
|
||||
assert_eq!(data, (2..=999).collect::<Vec<_>>());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use super::AliveBitSet;
|
||||
use rand::prelude::IteratorRandom;
|
||||
use rand::thread_rng;
|
||||
use test::Bencher;
|
||||
|
||||
fn get_alive() -> Vec<u32> {
|
||||
let mut data = (0..1_000_000_u32).collect::<Vec<u32>>();
|
||||
for _ in 0..(1_000_000) * 1 / 8 {
|
||||
remove_rand(&mut data);
|
||||
}
|
||||
data
|
||||
}
|
||||
|
||||
fn remove_rand(raw: &mut Vec<u32>) {
|
||||
let i = (0..raw.len()).choose(&mut thread_rng()).unwrap();
|
||||
raw.remove(i);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_deletebitset_iter_deser_on_fly(bench: &mut Bencher) {
|
||||
let alive_bitset = AliveBitSet::for_test(&[0, 1, 1000, 10000], 1_000_000);
|
||||
|
||||
bench.iter(|| alive_bitset.iter_alive().collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_deletebitset_access(bench: &mut Bencher) {
|
||||
let alive_bitset = AliveBitSet::for_test(&[0, 1, 1000, 10000], 1_000_000);
|
||||
|
||||
bench.iter(|| {
|
||||
(0..1_000_000_u32)
|
||||
.filter(|doc| alive_bitset.is_alive(*doc))
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_deletebitset_iter_deser_on_fly_1_8_alive(bench: &mut Bencher) {
|
||||
let alive_bitset = AliveBitSet::for_test(&get_alive(), 1_000_000);
|
||||
|
||||
bench.iter(|| alive_bitset.iter_alive().collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_deletebitset_access_1_8_alive(bench: &mut Bencher) {
|
||||
let alive_bitset = AliveBitSet::for_test(&get_alive(), 1_000_000);
|
||||
|
||||
bench.iter(|| {
|
||||
(0..1_000_000_u32)
|
||||
.filter(|doc| alive_bitset.is_alive(*doc))
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,144 +0,0 @@
|
||||
use crate::directory::FileSlice;
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::space_usage::ByteCount;
|
||||
use crate::DocId;
|
||||
use common::BitSet;
|
||||
use common::HasLen;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
/// Write a delete `BitSet`
|
||||
///
|
||||
/// where `delete_bitset` is the set of deleted `DocId`.
|
||||
/// Warning: this function does not call terminate. The caller is in charge of
|
||||
/// closing the writer properly.
|
||||
pub fn write_delete_bitset(
|
||||
delete_bitset: &BitSet,
|
||||
max_doc: u32,
|
||||
writer: &mut WritePtr,
|
||||
) -> io::Result<()> {
|
||||
let mut byte = 0u8;
|
||||
let mut shift = 0u8;
|
||||
for doc in 0..max_doc {
|
||||
if delete_bitset.contains(doc) {
|
||||
byte |= 1 << shift;
|
||||
}
|
||||
if shift == 7 {
|
||||
writer.write_all(&[byte])?;
|
||||
shift = 0;
|
||||
byte = 0;
|
||||
} else {
|
||||
shift += 1;
|
||||
}
|
||||
}
|
||||
if max_doc % 8 > 0 {
|
||||
writer.write_all(&[byte])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set of deleted `DocId`s.
|
||||
#[derive(Clone)]
|
||||
pub struct DeleteBitSet {
|
||||
data: OwnedBytes,
|
||||
num_deleted: usize,
|
||||
}
|
||||
|
||||
impl DeleteBitSet {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn for_test(docs: &[DocId], max_doc: u32) -> DeleteBitSet {
|
||||
use crate::directory::{Directory, RamDirectory, TerminatingWrite};
|
||||
use std::path::Path;
|
||||
assert!(docs.iter().all(|&doc| doc < max_doc));
|
||||
let mut bitset = BitSet::with_max_value(max_doc);
|
||||
for &doc in docs {
|
||||
bitset.insert(doc);
|
||||
}
|
||||
let directory = RamDirectory::create();
|
||||
let path = Path::new("dummydeletebitset");
|
||||
let mut wrt = directory.open_write(path).unwrap();
|
||||
write_delete_bitset(&bitset, max_doc, &mut wrt).unwrap();
|
||||
wrt.terminate().unwrap();
|
||||
let file = directory.open_read(path).unwrap();
|
||||
Self::open(file).unwrap()
|
||||
}
|
||||
|
||||
/// Opens a delete bitset given its file.
|
||||
pub fn open(file: FileSlice) -> crate::Result<DeleteBitSet> {
|
||||
let bytes = file.read_bytes()?;
|
||||
let num_deleted: usize = bytes
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|b| b.count_ones() as usize)
|
||||
.sum();
|
||||
Ok(DeleteBitSet {
|
||||
data: bytes,
|
||||
num_deleted,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns true iff the document is still "alive". In other words, if it has not been deleted.
|
||||
pub fn is_alive(&self, doc: DocId) -> bool {
|
||||
!self.is_deleted(doc)
|
||||
}
|
||||
|
||||
/// Returns true iff the document has been marked as deleted.
|
||||
#[inline]
|
||||
pub fn is_deleted(&self, doc: DocId) -> bool {
|
||||
let byte_offset = doc / 8u32;
|
||||
let b: u8 = self.data.as_slice()[byte_offset as usize];
|
||||
let shift = (doc & 7u32) as u8;
|
||||
b & (1u8 << shift) != 0
|
||||
}
|
||||
|
||||
/// The number of deleted docs
|
||||
pub fn num_deleted(&self) -> usize {
|
||||
self.num_deleted
|
||||
}
|
||||
/// Summarize total space usage of this bitset.
|
||||
pub fn space_usage(&self) -> ByteCount {
|
||||
self.data.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl HasLen for DeleteBitSet {
|
||||
fn len(&self) -> usize {
|
||||
self.num_deleted
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::DeleteBitSet;
|
||||
use common::HasLen;
|
||||
|
||||
#[test]
|
||||
fn test_delete_bitset_empty() {
|
||||
let delete_bitset = DeleteBitSet::for_test(&[], 10);
|
||||
for doc in 0..10 {
|
||||
assert_eq!(delete_bitset.is_deleted(doc), !delete_bitset.is_alive(doc));
|
||||
}
|
||||
assert_eq!(delete_bitset.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_bitset() {
|
||||
let delete_bitset = DeleteBitSet::for_test(&[1, 9], 10);
|
||||
assert!(delete_bitset.is_alive(0));
|
||||
assert!(delete_bitset.is_deleted(1));
|
||||
assert!(delete_bitset.is_alive(2));
|
||||
assert!(delete_bitset.is_alive(3));
|
||||
assert!(delete_bitset.is_alive(4));
|
||||
assert!(delete_bitset.is_alive(5));
|
||||
assert!(delete_bitset.is_alive(6));
|
||||
assert!(delete_bitset.is_alive(6));
|
||||
assert!(delete_bitset.is_alive(7));
|
||||
assert!(delete_bitset.is_alive(8));
|
||||
assert!(delete_bitset.is_deleted(9));
|
||||
for doc in 0..10 {
|
||||
assert_eq!(delete_bitset.is_deleted(doc), !delete_bitset.is_alive(doc));
|
||||
}
|
||||
assert_eq!(delete_bitset.len(), 2);
|
||||
}
|
||||
}
|
||||
@@ -23,9 +23,9 @@ values stored.
|
||||
Read access performance is comparable to that of an array lookup.
|
||||
*/
|
||||
|
||||
pub use self::alive_bitset::write_alive_bitset;
|
||||
pub use self::alive_bitset::AliveBitSet;
|
||||
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
|
||||
pub use self::delete::write_delete_bitset;
|
||||
pub use self::delete::DeleteBitSet;
|
||||
pub use self::error::{FastFieldNotAvailableError, Result};
|
||||
pub use self::facet_reader::FacetReader;
|
||||
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
|
||||
@@ -46,8 +46,8 @@ use crate::{
|
||||
schema::Type,
|
||||
};
|
||||
|
||||
mod alive_bitset;
|
||||
mod bytes;
|
||||
mod delete;
|
||||
mod error;
|
||||
mod facet_reader;
|
||||
mod multivalued;
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::directory::TerminatingWrite;
|
||||
use crate::directory::{DirectoryLock, GarbageCollectionResult};
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::error::TantivyError;
|
||||
use crate::fastfield::write_delete_bitset;
|
||||
use crate::fastfield::write_alive_bitset;
|
||||
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||
use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping;
|
||||
use crate::indexer::operation::DeleteOperation;
|
||||
@@ -93,7 +93,7 @@ pub struct IndexWriter {
|
||||
}
|
||||
|
||||
fn compute_deleted_bitset(
|
||||
delete_bitset: &mut BitSet,
|
||||
alive_bitset: &mut BitSet,
|
||||
segment_reader: &SegmentReader,
|
||||
delete_cursor: &mut DeleteCursor,
|
||||
doc_opstamps: &DocToOpstampMapping,
|
||||
@@ -114,7 +114,7 @@ fn compute_deleted_bitset(
|
||||
let mut doc_matching_deleted_term = docset.doc();
|
||||
while doc_matching_deleted_term != TERMINATED {
|
||||
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
|
||||
delete_bitset.insert(doc_matching_deleted_term);
|
||||
alive_bitset.remove(doc_matching_deleted_term);
|
||||
might_have_changed = true;
|
||||
}
|
||||
doc_matching_deleted_term = docset.advance();
|
||||
@@ -141,7 +141,7 @@ pub(crate) fn advance_deletes(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
|
||||
if segment_entry.alive_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
|
||||
// There has been no `DeleteOperation` between the segment status and `target_opstamp`.
|
||||
return Ok(());
|
||||
}
|
||||
@@ -149,15 +149,15 @@ pub(crate) fn advance_deletes(
|
||||
let segment_reader = SegmentReader::open(&segment)?;
|
||||
|
||||
let max_doc = segment_reader.max_doc();
|
||||
let mut delete_bitset: BitSet = match segment_entry.delete_bitset() {
|
||||
Some(previous_delete_bitset) => (*previous_delete_bitset).clone(),
|
||||
None => BitSet::with_max_value(max_doc),
|
||||
let mut alive_bitset: BitSet = match segment_entry.alive_bitset() {
|
||||
Some(previous_alive_bitset) => (*previous_alive_bitset).clone(),
|
||||
None => BitSet::with_max_value_and_full(max_doc),
|
||||
};
|
||||
|
||||
let num_deleted_docs_before = segment.meta().num_deleted_docs();
|
||||
|
||||
compute_deleted_bitset(
|
||||
&mut delete_bitset,
|
||||
&mut alive_bitset,
|
||||
&segment_reader,
|
||||
segment_entry.delete_cursor(),
|
||||
&DocToOpstampMapping::None,
|
||||
@@ -167,20 +167,21 @@ pub(crate) fn advance_deletes(
|
||||
// TODO optimize
|
||||
// It should be possible to do something smarter by manipulation bitsets directly
|
||||
// to compute this union.
|
||||
if let Some(seg_delete_bitset) = segment_reader.delete_bitset() {
|
||||
if let Some(seg_alive_bitset) = segment_reader.alive_bitset() {
|
||||
for doc in 0u32..max_doc {
|
||||
if seg_delete_bitset.is_deleted(doc) {
|
||||
delete_bitset.insert(doc);
|
||||
if seg_alive_bitset.is_deleted(doc) {
|
||||
alive_bitset.remove(doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let num_deleted_docs: u32 = delete_bitset.len() as u32;
|
||||
let num_alive_docs: u32 = alive_bitset.len() as u32;
|
||||
let num_deleted_docs = max_doc - num_alive_docs;
|
||||
if num_deleted_docs > num_deleted_docs_before {
|
||||
// There are new deletes. We need to write a new delete file.
|
||||
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
|
||||
let mut delete_file = segment.open_write(SegmentComponent::Delete)?;
|
||||
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
|
||||
write_alive_bitset(&alive_bitset, &mut delete_file)?;
|
||||
delete_file.terminate()?;
|
||||
}
|
||||
|
||||
@@ -226,13 +227,12 @@ fn index_documents(
|
||||
|
||||
let segment_with_max_doc = segment.with_max_doc(max_doc);
|
||||
|
||||
let delete_bitset_opt =
|
||||
apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
|
||||
let alive_bitset_opt = apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
|
||||
|
||||
let meta = segment_with_max_doc.meta().clone();
|
||||
meta.untrack_temp_docstore();
|
||||
// update segment_updater inventory to remove tempstore
|
||||
let segment_entry = SegmentEntry::new(meta, delete_cursor, delete_bitset_opt);
|
||||
let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt);
|
||||
block_on(segment_updater.schedule_add_segment(segment_entry))?;
|
||||
Ok(true)
|
||||
}
|
||||
@@ -259,7 +259,7 @@ fn apply_deletes(
|
||||
let doc_to_opstamps = DocToOpstampMapping::WithMap(doc_opstamps);
|
||||
|
||||
let max_doc = segment.meta().max_doc();
|
||||
let mut deleted_bitset = BitSet::with_max_value(max_doc);
|
||||
let mut deleted_bitset = BitSet::with_max_value_and_full(max_doc);
|
||||
let may_have_deletes = compute_deleted_bitset(
|
||||
&mut deleted_bitset,
|
||||
&segment_reader,
|
||||
@@ -1518,7 +1518,7 @@ mod tests {
|
||||
for segment_reader in searcher.segment_readers().iter() {
|
||||
let store_reader = segment_reader.get_store_reader().unwrap();
|
||||
// test store iterator
|
||||
for doc in store_reader.iter(segment_reader.delete_bitset()) {
|
||||
for doc in store_reader.iter(segment_reader.alive_bitset()) {
|
||||
let id = doc
|
||||
.unwrap()
|
||||
.get_first(id_field)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::DynamicFastFieldReader;
|
||||
use crate::fastfield::FastFieldDataAccess;
|
||||
use crate::fastfield::FastFieldReader;
|
||||
@@ -29,7 +28,6 @@ use crate::{
|
||||
SegmentOrdinal,
|
||||
};
|
||||
use crate::{DocId, InvertedIndexReader, SegmentComponent};
|
||||
use common::HasLen;
|
||||
use itertools::Itertools;
|
||||
use measure_time::debug_time;
|
||||
use std::cmp;
|
||||
@@ -98,29 +96,24 @@ pub struct IndexMerger {
|
||||
|
||||
fn compute_min_max_val(
|
||||
u64_reader: &impl FastFieldReader<u64>,
|
||||
max_doc: DocId,
|
||||
delete_bitset_opt: Option<&DeleteBitSet>,
|
||||
segment_reader: &SegmentReader,
|
||||
) -> Option<(u64, u64)> {
|
||||
if max_doc == 0 {
|
||||
None
|
||||
} else {
|
||||
match delete_bitset_opt {
|
||||
Some(delete_bitset) => {
|
||||
// some deleted documents,
|
||||
// we need to recompute the max / min
|
||||
minmax(
|
||||
(0..max_doc)
|
||||
.filter(|doc_id| delete_bitset.is_alive(*doc_id))
|
||||
.map(|doc_id| u64_reader.get(doc_id)),
|
||||
)
|
||||
}
|
||||
None => {
|
||||
// no deleted documents,
|
||||
// we can use the previous min_val, max_val.
|
||||
Some((u64_reader.min_value(), u64_reader.max_value()))
|
||||
}
|
||||
}
|
||||
if segment_reader.max_doc() == 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
if segment_reader.alive_bitset().is_none() {
|
||||
// no deleted documents,
|
||||
// we can use the previous min_val, max_val.
|
||||
return Some((u64_reader.min_value(), u64_reader.max_value()));
|
||||
}
|
||||
// some deleted documents,
|
||||
// we need to recompute the max / min
|
||||
minmax(
|
||||
segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(|doc_id| u64_reader.get(doc_id)),
|
||||
)
|
||||
}
|
||||
|
||||
struct TermOrdinalMapping {
|
||||
@@ -326,7 +319,7 @@ impl IndexMerger {
|
||||
.fast_fields()
|
||||
.typed_fast_field_reader(field)
|
||||
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");
|
||||
compute_min_max_val(&u64_reader, reader.max_doc(), reader.delete_bitset())
|
||||
compute_min_max_val(&u64_reader, reader)
|
||||
})
|
||||
.flatten()
|
||||
.reduce(|a, b| {
|
||||
@@ -503,13 +496,11 @@ impl IndexMerger {
|
||||
// what should be the bit length use for bitpacking.
|
||||
let mut num_docs = 0;
|
||||
for (reader, u64s_reader) in reader_and_field_accessors.iter() {
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
num_docs += reader.max_doc() as u64 - delete_bitset.len() as u64;
|
||||
for doc in 0u32..reader.max_doc() {
|
||||
if delete_bitset.is_alive(doc) {
|
||||
let num_vals = u64s_reader.get_len(doc) as u64;
|
||||
total_num_vals += num_vals;
|
||||
}
|
||||
if let Some(alive_bitset) = reader.alive_bitset() {
|
||||
num_docs += reader.max_doc() as u64 - alive_bitset.num_deleted() as u64;
|
||||
for doc in reader.doc_ids_alive() {
|
||||
let num_vals = u64s_reader.get_len(doc) as u64;
|
||||
total_num_vals += num_vals;
|
||||
}
|
||||
} else {
|
||||
num_docs += reader.max_doc() as u64;
|
||||
@@ -896,9 +887,9 @@ impl IndexMerger {
|
||||
let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord];
|
||||
let segment_postings = inverted_index
|
||||
.read_postings_from_terminfo(&term_info, segment_postings_option)?;
|
||||
let delete_bitset_opt = segment_reader.delete_bitset();
|
||||
let doc_freq = if let Some(delete_bitset) = delete_bitset_opt {
|
||||
segment_postings.doc_freq_given_deletes(delete_bitset)
|
||||
let alive_bitset_opt = segment_reader.alive_bitset();
|
||||
let doc_freq = if let Some(alive_bitset) = alive_bitset_opt {
|
||||
segment_postings.doc_freq_given_deletes(alive_bitset)
|
||||
} else {
|
||||
segment_postings.doc_freq()
|
||||
};
|
||||
@@ -1018,7 +1009,7 @@ impl IndexMerger {
|
||||
let mut document_iterators: Vec<_> = store_readers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, store)| store.iter_raw(self.readers[i].delete_bitset()))
|
||||
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
|
||||
.collect();
|
||||
if !doc_id_mapping.is_trivial() {
|
||||
for (old_doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
|
||||
@@ -1054,7 +1045,7 @@ impl IndexMerger {
|
||||
|| store_reader.block_checkpoints().take(7).count() < 6
|
||||
|| store_reader.compressor() != store_writer.compressor()
|
||||
{
|
||||
for doc_bytes_res in store_reader.iter_raw(reader.delete_bitset()) {
|
||||
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
|
||||
let doc_bytes = doc_bytes_res?;
|
||||
store_writer.store_bytes(&doc_bytes)?;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::fastfield::{DeleteBitSet, FastFieldReader};
|
||||
use crate::fastfield::{AliveBitSet, FastFieldReader};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::{
|
||||
collector::TopDocs,
|
||||
@@ -257,10 +257,10 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(postings.doc_freq(), 2);
|
||||
let fallback_bitset = DeleteBitSet::for_test(&[0], 100);
|
||||
let fallback_bitset = AliveBitSet::for_test(&[0], 100);
|
||||
assert_eq!(
|
||||
postings.doc_freq_given_deletes(
|
||||
segment_reader.delete_bitset().unwrap_or(&fallback_bitset)
|
||||
segment_reader.alive_bitset().unwrap_or(&fallback_bitset)
|
||||
),
|
||||
2
|
||||
);
|
||||
@@ -336,10 +336,10 @@ mod tests {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(postings.doc_freq(), 2);
|
||||
let fallback_bitset = DeleteBitSet::for_test(&[0], 100);
|
||||
let fallback_bitset = AliveBitSet::for_test(&[0], 100);
|
||||
assert_eq!(
|
||||
postings.doc_freq_given_deletes(
|
||||
segment_reader.delete_bitset().unwrap_or(&fallback_bitset)
|
||||
segment_reader.alive_bitset().unwrap_or(&fallback_bitset)
|
||||
),
|
||||
2
|
||||
);
|
||||
@@ -446,10 +446,10 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(postings.doc_freq(), 2);
|
||||
let fallback_bitset = DeleteBitSet::for_test(&[0], 100);
|
||||
let fallback_bitset = AliveBitSet::for_test(&[0], 100);
|
||||
assert_eq!(
|
||||
postings.doc_freq_given_deletes(
|
||||
segment_reader.delete_bitset().unwrap_or(&fallback_bitset)
|
||||
segment_reader.alive_bitset().unwrap_or(&fallback_bitset)
|
||||
),
|
||||
2
|
||||
);
|
||||
|
||||
@@ -9,18 +9,16 @@ use std::fmt;
|
||||
///
|
||||
/// In addition to segment `meta`,
|
||||
/// it contains a few transient states
|
||||
/// - `state` expresses whether the segment is already in the
|
||||
/// middle of a merge
|
||||
/// - `delete_bitset` is a bitset describing
|
||||
/// documents that were deleted during the commit
|
||||
/// - `alive_bitset` is a bitset describing
|
||||
/// documents that were alive during the commit
|
||||
/// itself.
|
||||
/// - `delete_cursor` is the position in the delete queue.
|
||||
/// Deletes happening before the cursor are reflected either
|
||||
/// in the .del file or in the `delete_bitset`.
|
||||
/// in the .del file or in the `alive_bitset`.
|
||||
#[derive(Clone)]
|
||||
pub struct SegmentEntry {
|
||||
meta: SegmentMeta,
|
||||
delete_bitset: Option<BitSet>,
|
||||
alive_bitset: Option<BitSet>,
|
||||
delete_cursor: DeleteCursor,
|
||||
}
|
||||
|
||||
@@ -29,11 +27,11 @@ impl SegmentEntry {
|
||||
pub fn new(
|
||||
segment_meta: SegmentMeta,
|
||||
delete_cursor: DeleteCursor,
|
||||
delete_bitset: Option<BitSet>,
|
||||
alive_bitset: Option<BitSet>,
|
||||
) -> SegmentEntry {
|
||||
SegmentEntry {
|
||||
meta: segment_meta,
|
||||
delete_bitset,
|
||||
alive_bitset,
|
||||
delete_cursor,
|
||||
}
|
||||
}
|
||||
@@ -41,8 +39,8 @@ impl SegmentEntry {
|
||||
/// Return a reference to the segment entry deleted bitset.
|
||||
///
|
||||
/// `DocId` in this bitset are flagged as deleted.
|
||||
pub fn delete_bitset(&self) -> Option<&BitSet> {
|
||||
self.delete_bitset.as_ref()
|
||||
pub fn alive_bitset(&self) -> Option<&BitSet> {
|
||||
self.alive_bitset.as_ref()
|
||||
}
|
||||
|
||||
/// Set the `SegmentMeta` for this segment.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::docset::DocSet;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::positions::PositionReader;
|
||||
use crate::postings::branchless_binary_search;
|
||||
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||
@@ -34,7 +34,7 @@ impl SegmentPostings {
|
||||
///
|
||||
/// This method will clone and scan through the posting lists.
|
||||
/// (this is a rather expensive operation).
|
||||
pub fn doc_freq_given_deletes(&self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
let mut docset = self.clone();
|
||||
let mut doc_freq = 0;
|
||||
loop {
|
||||
@@ -42,7 +42,7 @@ impl SegmentPostings {
|
||||
if doc == TERMINATED {
|
||||
return doc_freq;
|
||||
}
|
||||
if delete_bitset.is_alive(doc) {
|
||||
if alive_bitset.is_alive(doc) {
|
||||
doc_freq += 1u32;
|
||||
}
|
||||
docset.advance();
|
||||
@@ -268,7 +268,7 @@ mod tests {
|
||||
use common::HasLen;
|
||||
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::postings::postings::Postings;
|
||||
|
||||
#[test]
|
||||
@@ -296,9 +296,9 @@ mod tests {
|
||||
fn test_doc_freq() {
|
||||
let docs = SegmentPostings::create_from_docs(&[0, 2, 10]);
|
||||
assert_eq!(docs.doc_freq(), 3);
|
||||
let delete_bitset = DeleteBitSet::for_test(&[2], 12);
|
||||
assert_eq!(docs.doc_freq_given_deletes(&delete_bitset), 2);
|
||||
let all_deleted = DeleteBitSet::for_test(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 12);
|
||||
let alive_bitset = AliveBitSet::for_test(&[2], 12);
|
||||
assert_eq!(docs.doc_freq_given_deletes(&alive_bitset), 2);
|
||||
let all_deleted = AliveBitSet::for_test(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 12);
|
||||
assert_eq!(docs.doc_freq_given_deletes(&all_deleted), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::query::explanation::does_not_match;
|
||||
use crate::query::{Explanation, Query, Scorer, Weight};
|
||||
use crate::{DocId, DocSet, Score, Searcher, SegmentReader, Term};
|
||||
@@ -118,8 +118,8 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
|
||||
self.underlying.size_hint()
|
||||
}
|
||||
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
self.underlying.count(delete_bitset)
|
||||
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
self.underlying.count(alive_bitset)
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
|
||||
@@ -40,8 +40,8 @@ impl Weight for TermWeight {
|
||||
}
|
||||
|
||||
fn count(&self, reader: &SegmentReader) -> crate::Result<u32> {
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
Ok(self.scorer(reader, 1.0)?.count(delete_bitset))
|
||||
if let Some(alive_bitset) = reader.alive_bitset() {
|
||||
Ok(self.scorer(reader, 1.0)?.count(alive_bitset))
|
||||
} else {
|
||||
let field = self.term.field();
|
||||
let inv_index = reader.inverted_index(field)?;
|
||||
|
||||
@@ -59,8 +59,8 @@ pub trait Weight: Send + Sync + 'static {
|
||||
/// Returns the number documents within the given `SegmentReader`.
|
||||
fn count(&self, reader: &SegmentReader) -> crate::Result<u32> {
|
||||
let mut scorer = self.scorer(reader, 1.0)?;
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
Ok(scorer.count(delete_bitset))
|
||||
if let Some(alive_bitset) = reader.alive_bitset() {
|
||||
Ok(scorer.count(alive_bitset))
|
||||
} else {
|
||||
Ok(scorer.count_including_deleted())
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ pub mod tests {
|
||||
use futures::executor::block_on;
|
||||
|
||||
use super::*;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::schema::{self, FieldValue, TextFieldIndexing, STORED, TEXT};
|
||||
use crate::schema::{Document, TextOptions};
|
||||
use crate::{
|
||||
@@ -113,7 +113,7 @@ pub mod tests {
|
||||
fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> {
|
||||
// this will cover deletion of the first element in a checkpoint
|
||||
let deleted_docids = (200..300).collect::<Vec<_>>();
|
||||
let delete_bitset = DeleteBitSet::for_test(&deleted_docids, NUM_DOCS as u32);
|
||||
let alive_bitset = AliveBitSet::for_test(&deleted_docids, NUM_DOCS as u32);
|
||||
|
||||
let path = Path::new("store");
|
||||
let directory = RamDirectory::create();
|
||||
@@ -134,7 +134,7 @@ pub mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
for (_, doc) in store.iter(Some(&delete_bitset)).enumerate() {
|
||||
for (_, doc) in store.iter(Some(&alive_bitset)).enumerate() {
|
||||
let doc = doc?;
|
||||
let title_content = doc.get_first(field_title).unwrap().text().unwrap();
|
||||
if !title_content.starts_with("Doc ") {
|
||||
@@ -146,7 +146,7 @@ pub mod tests {
|
||||
.unwrap()
|
||||
.parse::<u32>()
|
||||
.unwrap();
|
||||
if delete_bitset.is_deleted(id) {
|
||||
if alive_bitset.is_deleted(id) {
|
||||
panic!("unexpected deleted document {}", id);
|
||||
}
|
||||
}
|
||||
@@ -230,7 +230,7 @@ pub mod tests {
|
||||
let searcher = index.reader().unwrap().searcher();
|
||||
let reader = searcher.segment_reader(0);
|
||||
let store = reader.get_store_reader().unwrap();
|
||||
for doc in store.iter(reader.delete_bitset()) {
|
||||
for doc in store.iter(reader.alive_bitset()) {
|
||||
assert_eq!(
|
||||
*doc?.get_first(text_field).unwrap().text().unwrap(),
|
||||
"deletemenot".to_string()
|
||||
@@ -288,7 +288,7 @@ pub mod tests {
|
||||
let reader = searcher.segment_readers().iter().last().unwrap();
|
||||
let store = reader.get_store_reader().unwrap();
|
||||
|
||||
for doc in store.iter(reader.delete_bitset()).take(50) {
|
||||
for doc in store.iter(reader.alive_bitset()).take(50) {
|
||||
assert_eq!(
|
||||
*doc?.get_first(text_field).unwrap().text().unwrap(),
|
||||
LOREM.to_string()
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::schema::Document;
|
||||
use crate::space_usage::StoreSpaceUsage;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
use crate::{error::DataCorruption, fastfield::DeleteBitSet};
|
||||
use crate::{error::DataCorruption, fastfield::AliveBitSet};
|
||||
use common::{BinarySerializable, HasLen, VInt};
|
||||
use lru::LruCache;
|
||||
use std::io;
|
||||
@@ -133,12 +133,12 @@ impl StoreReader {
|
||||
|
||||
/// Iterator over all Documents in their order as they are stored in the doc store.
|
||||
/// Use this, if you want to extract all Documents from the doc store.
|
||||
/// The delete_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
|
||||
/// The alive_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
|
||||
pub fn iter<'a: 'b, 'b>(
|
||||
&'b self,
|
||||
delete_bitset: Option<&'a DeleteBitSet>,
|
||||
alive_bitset: Option<&'a AliveBitSet>,
|
||||
) -> impl Iterator<Item = crate::Result<Document>> + 'b {
|
||||
self.iter_raw(delete_bitset).map(|doc_bytes_res| {
|
||||
self.iter_raw(alive_bitset).map(|doc_bytes_res| {
|
||||
let mut doc_bytes = doc_bytes_res?;
|
||||
Ok(Document::deserialize(&mut doc_bytes)?)
|
||||
})
|
||||
@@ -146,10 +146,10 @@ impl StoreReader {
|
||||
|
||||
/// Iterator over all RawDocuments in their order as they are stored in the doc store.
|
||||
/// Use this, if you want to extract all Documents from the doc store.
|
||||
/// The delete_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
|
||||
/// The alive_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
|
||||
pub(crate) fn iter_raw<'a: 'b, 'b>(
|
||||
&'b self,
|
||||
delete_bitset: Option<&'a DeleteBitSet>,
|
||||
alive_bitset: Option<&'a AliveBitSet>,
|
||||
) -> impl Iterator<Item = crate::Result<OwnedBytes>> + 'b {
|
||||
let last_docid = self
|
||||
.block_checkpoints()
|
||||
@@ -179,7 +179,7 @@ impl StoreReader {
|
||||
num_skipped = 0;
|
||||
}
|
||||
|
||||
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
|
||||
let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
|
||||
if alive {
|
||||
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
|
||||
// the map block will move over the num_skipped, so we reset to 0
|
||||
|
||||
Reference in New Issue
Block a user