Compare commits

...

30 Commits

Author SHA1 Message Date
Paul Masurel
89f91b1b58 first stab 2021-10-06 12:10:16 +09:00
Paul Masurel
19965c46bc Added wasm-mt 2021-10-06 10:45:17 +09:00
dependabot[bot]
4d05b26e7a Update lru requirement from 0.6.5 to 0.7.0 (#1165)
Updates the requirements on [lru](https://github.com/jeromefroe/lru-rs) to permit the latest version.
- [Release notes](https://github.com/jeromefroe/lru-rs/releases)
- [Changelog](https://github.com/jeromefroe/lru-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/jeromefroe/lru-rs/compare/0.6.5...0.7.0)

---
updated-dependencies:
- dependency-name: lru
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-10-06 05:50:24 +09:00
Paul Masurel
0855649986 Leaning more on the alive (vs delete) semantics. (#1164) 2021-10-05 18:53:29 +09:00
PSeitz
d828e58903 Merge pull request #1163 from PSeitz/reduce_mem_usage
reduce mem usage
2021-10-01 08:03:41 +02:00
Pascal Seitz
aa0396fe27 fix variable names 2021-10-01 13:48:51 +08:00
Pascal Seitz
8d8315f8d0 prealloc vec in postinglist 2021-09-29 09:02:38 +08:00
Pascal Seitz
078c0a2e2e reserve vec 2021-09-29 08:45:04 +08:00
Pascal Seitz
f21e8dd875 use only segment ordinal in docidmapping 2021-09-29 08:44:56 +08:00
Tomoko Uchida
74e36c7e97 Add unit tests for tokenizers and filters (#1156)
* add unit test for SimpleTokenizer
* add unit tests for tokenizers and filters.
2021-09-27 10:22:01 +09:00
PSeitz
f27ae04282 fix slope calculation in multilinear interpol (#1161)
add test to check for compression
2021-09-27 10:14:03 +09:00
PSeitz
0ce49c9dd4 use lz4_flex 0.9.0 (#1160) 2021-09-27 10:12:20 +09:00
PSeitz
fe8e58e078 Merge pull request #1154 from PSeitz/delete_bitset
add DeleteBitSet iterator
2021-09-24 09:37:39 +02:00
Pascal Seitz
efc0d8341b fix comment 2021-09-24 15:09:21 +08:00
Pascal Seitz
22bcc83d10 fix padding in initialization 2021-09-24 14:43:04 +08:00
Pascal Seitz
5ee5037934 create and use ReadSerializedBitSet 2021-09-24 12:53:33 +08:00
Pascal Seitz
c217bfed1e cargo fmt 2021-09-23 21:02:19 +08:00
Pascal Seitz
c27ccd3e24 improve naming 2021-09-23 21:02:09 +08:00
Paul Masurel
367f5da782 Fixed comment to the index accessor 2021-09-23 21:53:48 +09:00
Mestery
b256df6599 add index accessor for index writer (#1159)
* add index accessor for index writer

* Update src/indexer/index_writer.rs

Co-authored-by: Paul Masurel <paul@quickwit.io>
2021-09-23 21:49:20 +09:00
Pascal Seitz
d7a6a409a1 renames 2021-09-23 20:33:11 +08:00
Pascal Seitz
a1f5cead96 AliveBitSet instead of DeleteBitSet 2021-09-23 20:03:57 +08:00
dependabot[bot]
37c5fe3c86 Update memmap2 requirement from 0.4 to 0.5 (#1157)
Updates the requirements on [memmap2](https://github.com/RazrFalcon/memmap2-rs) to permit the latest version.
- [Release notes](https://github.com/RazrFalcon/memmap2-rs/releases)
- [Changelog](https://github.com/RazrFalcon/memmap2-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/RazrFalcon/memmap2-rs/compare/v0.4.0...v0.5.0)

---
updated-dependencies:
- dependency-name: memmap2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-09-23 20:18:27 +09:00
Pascal Seitz
4583fa270b fixes 2021-09-23 10:39:53 +08:00
Pascal Seitz
beb3a5bd73 fix len 2021-09-18 17:58:15 +08:00
Pascal Seitz
93cbd52bf0 move code to biset, add inline, add benchmark 2021-09-18 17:35:22 +08:00
Pascal Seitz
c22177a005 add iterator 2021-09-17 15:29:27 +08:00
Pascal Seitz
4da71273e1 add de/serialization for bitset
remove len footgun
2021-09-17 10:28:12 +08:00
dependabot[bot]
2c78b31aab Update memmap2 requirement from 0.3 to 0.4 (#1155)
Updates the requirements on [memmap2](https://github.com/RazrFalcon/memmap2-rs) to permit the latest version.
- [Release notes](https://github.com/RazrFalcon/memmap2-rs/releases)
- [Changelog](https://github.com/RazrFalcon/memmap2-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/RazrFalcon/memmap2-rs/compare/v.0.3.0...v0.4.0)
2021-09-17 08:52:52 +09:00
Pascal Seitz
4ae1d87632 add DeleteBitSet iterator 2021-09-15 23:10:04 +08:00
36 changed files with 962 additions and 490 deletions

View File

@@ -19,13 +19,14 @@ crc32fast = "1.2.1"
once_cell = "1.7.2"
regex ={ version = "1.5.4", default-features = false, features = ["std"] }
tantivy-fst = "0.3"
memmap2 = {version = "0.3", optional=true}
lz4_flex = { version = "0.8.0", default-features = false, features = ["checked-decode"], optional = true }
memmap2 = {version = "0.5", optional=true}
lz4_flex = { version = "0.9.0", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3", optional = true }
snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.2", optional = true }
log = "0.4.14"
serde = { version = "1.0.126", features = ["derive"] }
serde_closure = "0.3"
serde_json = "1.0.64"
num_cpus = "1.13"
fs2={ version = "0.4.3", optional = true }
@@ -50,11 +51,12 @@ fail = "0.4"
murmurhash32 = "0.2"
chrono = "0.4.19"
smallvec = "1.6.1"
rayon = "1.5"
lru = "0.6.5"
lru = "0.7.0"
fastdivide = "0.3"
itertools = "0.10.0"
measure_time = "0.7.0"
wasm-mt = "0.1"
wasm-mt-pool = "0.1"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"

View File

@@ -10,6 +10,7 @@ description = "common traits and utility functions used by multiple tantivy subc
[dependencies]
byteorder = "1.4.3"
ownedbytes = { version="0.1", path="../ownedbytes" }
[dev-dependencies]
proptest = "1.0.0"

View File

@@ -1,5 +1,8 @@
use std::fmt;
use ownedbytes::OwnedBytes;
use std::convert::TryInto;
use std::io::Write;
use std::u64;
use std::{fmt, io};
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct TinySet(u64);
@@ -14,6 +17,7 @@ pub struct TinySetIterator(TinySet);
impl Iterator for TinySetIterator {
type Item = u32;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.0.pop_lowest()
}
@@ -28,30 +32,54 @@ impl IntoIterator for TinySet {
}
impl TinySet {
pub fn serialize<T: Write>(&self, writer: &mut T) -> io::Result<()> {
writer.write_all(self.0.to_le_bytes().as_ref())
}
#[inline]
pub fn deserialize(data: [u8; 8]) -> io::Result<Self> {
let val: u64 = u64::from_le_bytes(data);
Ok(TinySet(val))
}
/// Returns an empty `TinySet`.
#[inline]
pub fn empty() -> TinySet {
TinySet(0u64)
}
/// Returns a full `TinySet`.
#[inline]
pub fn full() -> TinySet {
TinySet::empty().complement()
}
pub fn clear(&mut self) {
self.0 = 0u64;
}
#[inline]
/// Returns the complement of the set in `[0, 64[`.
///
/// Careful on making this function public, as it will break the padding handling in the last
/// bucket.
fn complement(self) -> TinySet {
TinySet(!self.0)
}
#[inline]
/// Returns true iff the `TinySet` contains the element `el`.
pub fn contains(self, el: u32) -> bool {
!self.intersect(TinySet::singleton(el)).is_empty()
}
#[inline]
/// Returns the number of elements in the TinySet.
pub fn len(self) -> u32 {
self.0.count_ones()
}
#[inline]
/// Returns the intersection of `self` and `other`
pub fn intersect(self, other: TinySet) -> TinySet {
TinySet(self.0 & other.0)
@@ -64,13 +92,21 @@ impl TinySet {
TinySet(1u64 << u64::from(el))
}
/// Insert a new element within [0..64[
/// Insert a new element within [0..64)
#[inline]
pub fn insert(self, el: u32) -> TinySet {
self.union(TinySet::singleton(el))
}
/// Insert a new element within [0..64[
/// Removes an element within [0..64)
#[inline]
pub fn remove(self, el: u32) -> TinySet {
self.intersect(TinySet::singleton(el).complement())
}
/// Insert a new element within [0..64)
///
/// returns true if the set changed
#[inline]
pub fn insert_mut(&mut self, el: u32) -> bool {
let old = *self;
@@ -78,6 +114,16 @@ impl TinySet {
old != *self
}
/// Remove a element within [0..64)
///
/// returns true if the set changed
#[inline]
pub fn remove_mut(&mut self, el: u32) -> bool {
let old = *self;
*self = old.remove(el);
old != *self
}
/// Returns the union of two tinysets
#[inline]
pub fn union(self, other: TinySet) -> TinySet {
@@ -123,7 +169,7 @@ impl TinySet {
#[derive(Clone)]
pub struct BitSet {
tinysets: Box<[TinySet]>,
len: usize,
len: u64,
max_value: u32,
}
@@ -132,8 +178,41 @@ fn num_buckets(max_val: u32) -> u32 {
}
impl BitSet {
/// serialize a `BitSet`.
///
pub fn serialize<T: Write>(&self, writer: &mut T) -> io::Result<()> {
writer.write_all(self.max_value.to_le_bytes().as_ref())?;
for tinyset in self.tinysets.iter() {
tinyset.serialize(writer)?;
}
writer.flush()?;
Ok(())
}
/// Deserialize a `BitSet`.
///
#[cfg(test)]
pub fn deserialize(mut data: &[u8]) -> io::Result<Self> {
let max_value: u32 = u32::from_le_bytes(data[..4].try_into().unwrap());
data = &data[4..];
let mut len: u64 = 0;
let mut tinysets = vec![];
for chunk in data.chunks_exact(8) {
let tinyset = TinySet::deserialize(chunk.try_into().unwrap())?;
len += tinyset.len() as u64;
tinysets.push(tinyset);
}
Ok(BitSet {
tinysets: tinysets.into_boxed_slice(),
len,
max_value,
})
}
/// Create a new `BitSet` that may contain elements
/// within `[0, max_val[`.
/// within `[0, max_val)`.
pub fn with_max_value(max_value: u32) -> BitSet {
let num_buckets = num_buckets(max_value);
let tinybisets = vec![TinySet::empty(); num_buckets as usize].into_boxed_slice();
@@ -144,6 +223,23 @@ impl BitSet {
}
}
/// Create a new `BitSet` that may contain elements. Initially all values will be set.
/// within `[0, max_val)`.
pub fn with_max_value_and_full(max_value: u32) -> BitSet {
let num_buckets = num_buckets(max_value);
let mut tinybisets = vec![TinySet::full(); num_buckets as usize].into_boxed_slice();
// Fix padding
let lower = max_value % 64u32;
tinybisets[tinybisets.len() - 1] = TinySet::range_lower(lower);
BitSet {
tinysets: tinybisets,
len: max_value as u64,
max_value,
}
}
/// Removes all elements from the `BitSet`.
pub fn clear(&mut self) {
for tinyset in self.tinysets.iter_mut() {
@@ -153,10 +249,11 @@ impl BitSet {
/// Returns the number of elements in the `BitSet`.
pub fn len(&self) -> usize {
self.len
self.len as usize
}
/// Inserts an element in the `BitSet`
#[inline]
pub fn insert(&mut self, el: u32) {
// we do not check saturated els.
let higher = el / 64u32;
@@ -168,7 +265,21 @@ impl BitSet {
};
}
/// Inserts an element in the `BitSet`
#[inline]
pub fn remove(&mut self, el: u32) {
// we do not check saturated els.
let higher = el / 64u32;
let lower = el % 64u32;
self.len -= if self.tinysets[higher as usize].remove_mut(lower) {
1
} else {
0
};
}
/// Returns true iff the elements is in the `BitSet`.
#[inline]
pub fn contains(&self, el: u32) -> bool {
self.tinyset(el / 64u32).contains(el % 64)
}
@@ -198,16 +309,144 @@ impl BitSet {
}
}
/// Serialized BitSet.
#[derive(Clone)]
pub struct ReadSerializedBitSet {
data: OwnedBytes,
max_value: u32,
}
impl ReadSerializedBitSet {
pub fn open(data: OwnedBytes) -> Self {
let (max_value_data, data) = data.split(4);
let max_value: u32 = u32::from_le_bytes(max_value_data.as_ref().try_into().unwrap());
ReadSerializedBitSet { data, max_value }
}
/// Number of elements in the bitset.
#[inline]
pub fn len(&self) -> usize {
self.iter_tinysets()
.map(|tinyset| tinyset.len() as usize)
.sum()
}
/// Iterate the tinyset on the fly from serialized data.
///
#[inline]
fn iter_tinysets<'a>(&'a self) -> impl Iterator<Item = TinySet> + 'a {
assert!((self.data.len()) % 8 == 0);
self.data.chunks_exact(8).map(move |chunk| {
let tinyset: TinySet = TinySet::deserialize(chunk.try_into().unwrap()).unwrap();
tinyset
})
}
/// Iterate over the positions of the elements.
///
#[inline]
pub fn iter<'a>(&'a self) -> impl Iterator<Item = u32> + 'a {
self.iter_tinysets()
.enumerate()
.flat_map(move |(chunk_num, tinyset)| {
let chunk_base_val = chunk_num as u32 * 64;
tinyset
.into_iter()
.map(move |val| val + chunk_base_val)
.take_while(move |doc| *doc < self.max_value)
})
}
/// Returns true iff the elements is in the `BitSet`.
#[inline]
pub fn contains(&self, el: u32) -> bool {
let byte_offset = el / 8u32;
let b: u8 = self.data[byte_offset as usize];
let shift = (el % 8) as u8;
b & (1u8 << shift) != 0
}
/// Maximum value the bitset may contain.
/// (Note this is not the maximum value contained in the set.)
///
/// A bitset has an intrinsic capacity.
/// It only stores elements within [0..max_value).
#[inline]
pub fn max_value(&self) -> u32 {
self.max_value
}
}
#[cfg(test)]
mod tests {
use super::BitSet;
use super::ReadSerializedBitSet;
use super::TinySet;
use ownedbytes::OwnedBytes;
use rand::distributions::Bernoulli;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::collections::HashSet;
use std::convert::TryInto;
#[test]
fn test_read_serialized_bitset_full() {
let mut bitset = BitSet::with_max_value_and_full(5);
bitset.remove(3);
let mut out = vec![];
bitset.serialize(&mut out).unwrap();
let bitset = ReadSerializedBitSet::open(OwnedBytes::new(out));
assert_eq!(bitset.len(), 4);
}
#[test]
fn test_read_serialized_bitset_empty() {
let mut bitset = BitSet::with_max_value(5);
bitset.insert(3);
let mut out = vec![];
bitset.serialize(&mut out).unwrap();
let bitset = ReadSerializedBitSet::open(OwnedBytes::new(out));
assert_eq!(bitset.len(), 1);
{
let bitset = BitSet::with_max_value(5);
let mut out = vec![];
bitset.serialize(&mut out).unwrap();
let bitset = ReadSerializedBitSet::open(OwnedBytes::new(out));
assert_eq!(bitset.len(), 0);
}
}
#[test]
fn test_tiny_set_remove() {
{
let mut u = TinySet::empty().insert(63u32).insert(5).remove(63u32);
assert_eq!(u.pop_lowest(), Some(5u32));
assert!(u.pop_lowest().is_none());
}
{
let mut u = TinySet::empty()
.insert(63u32)
.insert(1)
.insert(5)
.remove(63u32);
assert_eq!(u.pop_lowest(), Some(1u32));
assert_eq!(u.pop_lowest(), Some(5u32));
assert!(u.pop_lowest().is_none());
}
{
let mut u = TinySet::empty().insert(1).remove(63u32);
assert_eq!(u.pop_lowest(), Some(1u32));
assert!(u.pop_lowest().is_none());
}
{
let mut u = TinySet::empty().insert(1).remove(1u32);
assert!(u.pop_lowest().is_none());
}
}
#[test]
fn test_tiny_set() {
assert!(TinySet::empty().is_empty());
@@ -233,6 +472,21 @@ mod tests {
assert_eq!(u.pop_lowest(), Some(63u32));
assert!(u.pop_lowest().is_none());
}
{
let mut u = TinySet::empty().insert(63u32).insert(5);
assert_eq!(u.pop_lowest(), Some(5u32));
assert_eq!(u.pop_lowest(), Some(63u32));
assert!(u.pop_lowest().is_none());
}
{
let u = TinySet::empty().insert(63u32).insert(5);
let mut data = vec![];
u.serialize(&mut data).unwrap();
let mut u = TinySet::deserialize(data[..8].try_into().unwrap()).unwrap();
assert_eq!(u.pop_lowest(), Some(5u32));
assert_eq!(u.pop_lowest(), Some(63u32));
assert!(u.pop_lowest().is_none());
}
}
#[test]
@@ -249,6 +503,16 @@ mod tests {
assert_eq!(hashset.contains(&el), bitset.contains(el));
}
assert_eq!(bitset.max_value(), max_value);
// test deser
let mut data = vec![];
bitset.serialize(&mut data).unwrap();
let bitset = BitSet::deserialize(&data).unwrap();
for el in 0..max_value {
assert_eq!(hashset.contains(&el), bitset.contains(el));
}
assert_eq!(bitset.max_value(), max_value);
assert_eq!(bitset.len(), els.len());
};
test_against_hashset(&[], 0);
@@ -313,6 +577,14 @@ mod tests {
assert_eq!(bitset.len(), 2);
bitset.insert(104u32);
assert_eq!(bitset.len(), 3);
bitset.remove(105u32);
assert_eq!(bitset.len(), 3);
bitset.remove(104u32);
assert_eq!(bitset.len(), 2);
bitset.remove(3u32);
assert_eq!(bitset.len(), 1);
bitset.remove(103u32);
assert_eq!(bitset.len(), 0);
}
pub fn sample_with_seed(n: u32, ratio: f64, seed_val: u8) -> Vec<u32> {

View File

@@ -118,7 +118,7 @@ mod tests {
);
}
}
let actual_compression = data.len() as f32 / out.len() as f32;
let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0);
(estimation, actual_compression)
}
pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {

View File

@@ -239,11 +239,21 @@ mod tests {
use super::*;
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) {
fn create_and_validate(data: &[u64], name: &str) -> (f32, f32) {
crate::tests::create_and_validate::<
LinearInterpolFastFieldSerializer,
LinearInterpolFastFieldReader,
>(data, name);
>(data, name)
}
#[test]
fn test_compression() {
let data = (10..=6_000_u64).collect::<Vec<_>>();
let (estimate, actual_compression) =
create_and_validate(&data, "simple monotonically large");
assert!(actual_compression < 0.01);
assert!(estimate < 0.01);
}
#[test]

View File

@@ -57,7 +57,7 @@ struct Function {
impl Function {
fn calc_slope(&mut self) {
let num_vals = self.end_pos - self.start_pos;
get_slope(self.value_start_pos, self.value_end_pos, num_vals);
self.slope = get_slope(self.value_start_pos, self.value_end_pos, num_vals);
}
// split the interpolation into two function, change self and return the second split
fn split(&mut self, split_pos: u64, split_pos_value: u64) -> Function {
@@ -378,11 +378,22 @@ mod tests {
use super::*;
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) {
fn create_and_validate(data: &[u64], name: &str) -> (f32, f32) {
crate::tests::create_and_validate::<
MultiLinearInterpolFastFieldSerializer,
MultiLinearInterpolFastFieldReader,
>(data, name);
>(data, name)
}
#[test]
fn test_compression() {
let data = (10..=6_000_u64).collect::<Vec<_>>();
let (estimate, actual_compression) =
create_and_validate(&data, "simple monotonically large");
assert!(actual_compression < 0.2);
assert!(estimate < 0.20);
assert!(estimate > 0.15);
assert!(actual_compression > 0.01);
}
#[test]
@@ -414,9 +425,11 @@ mod tests {
fn rand() {
for _ in 0..10 {
let mut data = (5_000..20_000)
.map(|_| rand::random::<u64>() as u64)
.map(|_| rand::random::<u32>() as u64)
.collect::<Vec<_>>();
create_and_validate(&data, "random");
let (estimate, actual_compression) = create_and_validate(&data, "random");
dbg!(estimate);
dbg!(actual_compression);
data.reverse();
create_and_validate(&data, "random");

View File

@@ -178,9 +178,9 @@ pub trait Collector: Sync + Send {
) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
let mut segment_collector = self.for_segment(segment_ord as u32, reader)?;
if let Some(delete_bitset) = reader.delete_bitset() {
if let Some(alive_bitset) = reader.alive_bitset() {
weight.for_each(reader, &mut |doc, score| {
if delete_bitset.is_alive(doc) {
if alive_bitset.is_alive(doc) {
segment_collector.collect(doc, score);
}
})?;

View File

@@ -629,10 +629,10 @@ impl Collector for TopDocs {
let heap_len = self.0.limit + self.0.offset;
let mut heap: BinaryHeap<ComparableDoc<Score, DocId>> = BinaryHeap::with_capacity(heap_len);
if let Some(delete_bitset) = reader.delete_bitset() {
if let Some(alive_bitset) = reader.alive_bitset() {
let mut threshold = Score::MIN;
weight.for_each_pruning(threshold, reader, &mut |doc, score| {
if delete_bitset.is_deleted(doc) {
if alive_bitset.is_deleted(doc) {
return threshold;
}
let heap_item = ComparableDoc {

View File

@@ -1,5 +1,4 @@
use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder};
/// Search executor whether search request are single thread or multithread.
///
@@ -11,8 +10,6 @@ use rayon::{ThreadPool, ThreadPoolBuilder};
pub enum Executor {
/// Single thread variant of an Executor
SingleThread,
/// Thread pool variant of an Executor
ThreadPool(ThreadPool),
}
impl Executor {
@@ -21,15 +18,6 @@ impl Executor {
Executor::SingleThread
}
/// Creates an Executor that dispatches the tasks in a thread pool.
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result<Executor> {
let pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(move |num| format!("{}{}", prefix, num))
.build()?;
Ok(Executor::ThreadPool(pool))
}
/// Perform a map in the thread pool.
///
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
@@ -46,40 +34,6 @@ impl Executor {
) -> crate::Result<Vec<R>> {
match self {
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
Executor::ThreadPool(pool) => {
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
let num_fruits = args_with_indices.len();
let fruit_receiver = {
let (fruit_sender, fruit_receiver) = channel::unbounded();
pool.scope(|scope| {
for arg_with_idx in args_with_indices {
scope.spawn(|_| {
let (idx, arg) = arg_with_idx;
let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx, fruit)) {
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
}
});
}
});
fruit_receiver
// This ends the scope of fruit_sender.
// This is important as it makes it possible for the fruit_receiver iteration to
// terminate.
};
// This is lame, but safe.
let mut results_with_position = Vec::with_capacity(num_fruits);
for (pos, fruit_res) in fruit_receiver {
let fruit = fruit_res?;
results_with_position.push((pos, fruit));
}
results_with_position.sort_by_key(|(pos, _)| *pos);
assert_eq!(results_with_position.len(), num_fruits);
Ok(results_with_position
.into_iter()
.map(|(_, fruit)| fruit)
.collect::<Vec<_>>())
}
}
}
}

View File

@@ -5,7 +5,7 @@ use crate::core::SegmentId;
use crate::directory::CompositeFile;
use crate::directory::FileSlice;
use crate::error::DataCorruption;
use crate::fastfield::DeleteBitSet;
use crate::fastfield::AliveBitSet;
use crate::fastfield::FacetReader;
use crate::fastfield::FastFieldReaders;
use crate::fieldnorm::{FieldNormReader, FieldNormReaders};
@@ -47,7 +47,7 @@ pub struct SegmentReader {
fieldnorm_readers: FieldNormReaders,
store_file: FileSlice,
delete_bitset_opt: Option<DeleteBitSet>,
alive_bitset_opt: Option<AliveBitSet>,
schema: Schema,
}
@@ -72,14 +72,12 @@ impl SegmentReader {
/// Return the number of documents that have been
/// deleted in the segment.
pub fn num_deleted_docs(&self) -> DocId {
self.delete_bitset()
.map(|delete_set| delete_set.num_deleted() as DocId)
.unwrap_or(0u32)
self.max_doc - self.num_docs
}
/// Returns true iff some of the documents of the segment have been deleted.
pub fn has_deletes(&self) -> bool {
self.delete_bitset().is_some()
self.num_deleted_docs() > 0
}
/// Accessor to a segment's fast field reader given a field.
@@ -170,10 +168,10 @@ impl SegmentReader {
let fieldnorm_data = segment.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
let delete_bitset_opt = if segment.meta().has_deletes() {
let delete_data = segment.open_read(SegmentComponent::Delete)?;
let delete_bitset = DeleteBitSet::open(delete_data)?;
Some(delete_bitset)
let alive_bitset_opt = if segment.meta().has_deletes() {
let alive_bitset_bytes = segment.open_read(SegmentComponent::Delete)?.read_bytes()?;
let alive_bitset = AliveBitSet::open(alive_bitset_bytes);
Some(alive_bitset)
} else {
None
};
@@ -188,7 +186,7 @@ impl SegmentReader {
fieldnorm_readers,
segment_id: segment.id(),
store_file,
delete_bitset_opt,
alive_bitset_opt,
positions_composite,
schema,
})
@@ -274,21 +272,25 @@ impl SegmentReader {
/// Returns the bitset representing
/// the documents that have been deleted.
pub fn delete_bitset(&self) -> Option<&DeleteBitSet> {
self.delete_bitset_opt.as_ref()
pub fn alive_bitset(&self) -> Option<&AliveBitSet> {
self.alive_bitset_opt.as_ref()
}
/// Returns true iff the `doc` is marked
/// as deleted.
pub fn is_deleted(&self, doc: DocId) -> bool {
self.delete_bitset()
self.alive_bitset()
.map(|delete_set| delete_set.is_deleted(doc))
.unwrap_or(false)
}
/// Returns an iterator that will iterate over the alive document ids
pub fn doc_ids_alive(&self) -> impl Iterator<Item = DocId> + '_ {
(0u32..self.max_doc).filter(move |doc| !self.is_deleted(*doc))
pub fn doc_ids_alive(&self) -> Box<dyn Iterator<Item = DocId> + '_> {
if let Some(alive_bitset) = &self.alive_bitset_opt {
Box::new(alive_bitset.iter_alive())
} else {
Box::new(0u32..self.max_doc)
}
}
/// Summarize total space usage of this segment.
@@ -301,9 +303,9 @@ impl SegmentReader {
self.fast_fields_readers.space_usage(),
self.fieldnorm_readers.space_usage(),
self.get_store_reader()?.space_usage(),
self.delete_bitset_opt
self.alive_bitset_opt
.as_ref()
.map(DeleteBitSet::space_usage)
.map(AliveBitSet::space_usage)
.unwrap_or(0),
))
}

View File

@@ -1,4 +1,4 @@
use crate::fastfield::DeleteBitSet;
use crate::fastfield::AliveBitSet;
use crate::DocId;
use std::borrow::Borrow;
use std::borrow::BorrowMut;
@@ -85,11 +85,11 @@ pub trait DocSet: Send {
/// Returns the number documents matching.
/// Calling this method consumes the `DocSet`.
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
let mut count = 0u32;
let mut doc = self.doc();
while doc != TERMINATED {
if !delete_bitset.is_deleted(doc) {
if alive_bitset.is_alive(doc) {
count += 1u32;
}
doc = self.advance();
@@ -130,8 +130,8 @@ impl<'a> DocSet for &'a mut dyn DocSet {
(**self).size_hint()
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
(**self).count(delete_bitset)
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
(**self).count(alive_bitset)
}
fn count_including_deleted(&mut self) -> u32 {
@@ -160,9 +160,9 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.size_hint()
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.count(delete_bitset)
unboxed.count(alive_bitset)
}
fn count_including_deleted(&mut self) -> u32 {

View File

@@ -0,0 +1,202 @@
use crate::space_usage::ByteCount;
use crate::DocId;
use common::BitSet;
use common::ReadSerializedBitSet;
use ownedbytes::OwnedBytes;
use std::io;
use std::io::Write;
/// Write a alive `BitSet`
///
/// where `alive_bitset` is the set of alive `DocId`.
/// Warning: this function does not call terminate. The caller is in charge of
/// closing the writer properly.
pub fn write_alive_bitset<T: Write>(alive_bitset: &BitSet, writer: &mut T) -> io::Result<()> {
alive_bitset.serialize(writer)?;
Ok(())
}
/// Set of alive `DocId`s.
#[derive(Clone)]
pub struct AliveBitSet {
num_alive_docs: usize,
bitset: ReadSerializedBitSet,
num_bytes: ByteCount,
}
impl AliveBitSet {
#[cfg(test)]
pub(crate) fn for_test_from_deleted_docs(deleted_docs: &[DocId], max_doc: u32) -> AliveBitSet {
assert!(deleted_docs.iter().all(|&doc| doc < max_doc));
let mut bitset = BitSet::with_max_value_and_full(max_doc);
for &doc in deleted_docs {
bitset.remove(doc);
}
let mut alive_bitset_buffer = Vec::new();
write_alive_bitset(&bitset, &mut alive_bitset_buffer).unwrap();
let alive_bitset_bytes = OwnedBytes::new(alive_bitset_buffer);
Self::open(alive_bitset_bytes)
}
/// Opens a delete bitset given its file.
pub fn open(bytes: OwnedBytes) -> AliveBitSet {
let num_bytes = bytes.len();
let bitset = ReadSerializedBitSet::open(bytes);
AliveBitSet {
num_alive_docs: bitset.len(),
bitset,
num_bytes,
}
}
/// Returns true iff the document is still "alive". In other words, if it has not been deleted.
#[inline]
pub fn is_alive(&self, doc: DocId) -> bool {
self.bitset.contains(doc)
}
/// Returns true iff the document has been marked as deleted.
#[inline]
pub fn is_deleted(&self, doc: DocId) -> bool {
!self.is_alive(doc)
}
/// Iterate over the alive docids.
#[inline]
pub fn iter_alive(&self) -> impl Iterator<Item = DocId> + '_ {
self.bitset.iter()
}
/// Get underlying bitset
#[inline]
pub fn bitset(&self) -> &ReadSerializedBitSet {
&self.bitset
}
/// The number of deleted docs
pub fn num_alive_docs(&self) -> usize {
self.num_alive_docs
}
/// Summarize total space usage of this bitset.
pub fn space_usage(&self) -> ByteCount {
self.num_bytes
}
}
#[cfg(test)]
mod tests {
use super::AliveBitSet;
#[test]
fn test_alive_bitset_empty() {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[], 10);
for doc in 0..10 {
assert_eq!(alive_bitset.is_deleted(doc), !alive_bitset.is_alive(doc));
assert!(!alive_bitset.is_deleted(doc));
}
assert_eq!(alive_bitset.num_alive_docs(), 10);
}
#[test]
fn test_alive_bitset() {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[1, 9], 10);
assert!(alive_bitset.is_alive(0));
assert!(alive_bitset.is_deleted(1));
assert!(alive_bitset.is_alive(2));
assert!(alive_bitset.is_alive(3));
assert!(alive_bitset.is_alive(4));
assert!(alive_bitset.is_alive(5));
assert!(alive_bitset.is_alive(6));
assert!(alive_bitset.is_alive(6));
assert!(alive_bitset.is_alive(7));
assert!(alive_bitset.is_alive(8));
assert!(alive_bitset.is_deleted(9));
for doc in 0..10 {
assert_eq!(alive_bitset.is_deleted(doc), !alive_bitset.is_alive(doc));
}
assert_eq!(alive_bitset.num_alive_docs(), 8);
}
#[test]
fn test_alive_bitset_iter_minimal() {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[7], 8);
let data: Vec<_> = alive_bitset.iter_alive().collect();
assert_eq!(data, vec![0, 1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_alive_bitset_iter_small() {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[0, 2, 3, 6], 7);
let data: Vec<_> = alive_bitset.iter_alive().collect();
assert_eq!(data, vec![1, 4, 5]);
}
#[test]
fn test_alive_bitset_iter() {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[0, 1, 1000], 1001);
let data: Vec<_> = alive_bitset.iter_alive().collect();
assert_eq!(data, (2..=999).collect::<Vec<_>>());
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::AliveBitSet;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use test::Bencher;
fn get_alive() -> Vec<u32> {
let mut data = (0..1_000_000_u32).collect::<Vec<u32>>();
for _ in 0..(1_000_000) * 1 / 8 {
remove_rand(&mut data);
}
data
}
fn remove_rand(raw: &mut Vec<u32>) {
let i = (0..raw.len()).choose(&mut thread_rng()).unwrap();
raw.remove(i);
}
#[bench]
fn bench_deletebitset_iter_deser_on_fly(bench: &mut Bencher) {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[0, 1, 1000, 10000], 1_000_000);
bench.iter(|| alive_bitset.iter_alive().collect::<Vec<_>>());
}
#[bench]
fn bench_deletebitset_access(bench: &mut Bencher) {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[0, 1, 1000, 10000], 1_000_000);
bench.iter(|| {
(0..1_000_000_u32)
.filter(|doc| alive_bitset.is_alive(*doc))
.collect::<Vec<_>>()
});
}
#[bench]
fn bench_deletebitset_iter_deser_on_fly_1_8_alive(bench: &mut Bencher) {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&get_alive(), 1_000_000);
bench.iter(|| alive_bitset.iter_alive().collect::<Vec<_>>());
}
#[bench]
fn bench_deletebitset_access_1_8_alive(bench: &mut Bencher) {
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&get_alive(), 1_000_000);
bench.iter(|| {
(0..1_000_000_u32)
.filter(|doc| alive_bitset.is_alive(*doc))
.collect::<Vec<_>>()
});
}
}

View File

@@ -1,144 +0,0 @@
use crate::directory::FileSlice;
use crate::directory::OwnedBytes;
use crate::directory::WritePtr;
use crate::space_usage::ByteCount;
use crate::DocId;
use common::BitSet;
use common::HasLen;
use std::io;
use std::io::Write;
/// Write a delete `BitSet`
///
/// where `delete_bitset` is the set of deleted `DocId`.
/// Warning: this function does not call terminate. The caller is in charge of
/// closing the writer properly.
pub fn write_delete_bitset(
delete_bitset: &BitSet,
max_doc: u32,
writer: &mut WritePtr,
) -> io::Result<()> {
let mut byte = 0u8;
let mut shift = 0u8;
for doc in 0..max_doc {
if delete_bitset.contains(doc) {
byte |= 1 << shift;
}
if shift == 7 {
writer.write_all(&[byte])?;
shift = 0;
byte = 0;
} else {
shift += 1;
}
}
if max_doc % 8 > 0 {
writer.write_all(&[byte])?;
}
Ok(())
}
/// Set of deleted `DocId`s.
#[derive(Clone)]
pub struct DeleteBitSet {
data: OwnedBytes,
num_deleted: usize,
}
impl DeleteBitSet {
#[cfg(test)]
pub(crate) fn for_test(docs: &[DocId], max_doc: u32) -> DeleteBitSet {
use crate::directory::{Directory, RamDirectory, TerminatingWrite};
use std::path::Path;
assert!(docs.iter().all(|&doc| doc < max_doc));
let mut bitset = BitSet::with_max_value(max_doc);
for &doc in docs {
bitset.insert(doc);
}
let directory = RamDirectory::create();
let path = Path::new("dummydeletebitset");
let mut wrt = directory.open_write(path).unwrap();
write_delete_bitset(&bitset, max_doc, &mut wrt).unwrap();
wrt.terminate().unwrap();
let file = directory.open_read(path).unwrap();
Self::open(file).unwrap()
}
/// Opens a delete bitset given its file.
pub fn open(file: FileSlice) -> crate::Result<DeleteBitSet> {
let bytes = file.read_bytes()?;
let num_deleted: usize = bytes
.as_slice()
.iter()
.map(|b| b.count_ones() as usize)
.sum();
Ok(DeleteBitSet {
data: bytes,
num_deleted,
})
}
/// Returns true iff the document is still "alive". In other words, if it has not been deleted.
pub fn is_alive(&self, doc: DocId) -> bool {
!self.is_deleted(doc)
}
/// Returns true iff the document has been marked as deleted.
#[inline]
pub fn is_deleted(&self, doc: DocId) -> bool {
let byte_offset = doc / 8u32;
let b: u8 = self.data.as_slice()[byte_offset as usize];
let shift = (doc & 7u32) as u8;
b & (1u8 << shift) != 0
}
/// The number of deleted docs
pub fn num_deleted(&self) -> usize {
self.num_deleted
}
/// Summarize total space usage of this bitset.
pub fn space_usage(&self) -> ByteCount {
self.data.len()
}
}
impl HasLen for DeleteBitSet {
fn len(&self) -> usize {
self.num_deleted
}
}
#[cfg(test)]
mod tests {
use super::DeleteBitSet;
use common::HasLen;
#[test]
fn test_delete_bitset_empty() {
let delete_bitset = DeleteBitSet::for_test(&[], 10);
for doc in 0..10 {
assert_eq!(delete_bitset.is_deleted(doc), !delete_bitset.is_alive(doc));
}
assert_eq!(delete_bitset.len(), 0);
}
#[test]
fn test_delete_bitset() {
let delete_bitset = DeleteBitSet::for_test(&[1, 9], 10);
assert!(delete_bitset.is_alive(0));
assert!(delete_bitset.is_deleted(1));
assert!(delete_bitset.is_alive(2));
assert!(delete_bitset.is_alive(3));
assert!(delete_bitset.is_alive(4));
assert!(delete_bitset.is_alive(5));
assert!(delete_bitset.is_alive(6));
assert!(delete_bitset.is_alive(6));
assert!(delete_bitset.is_alive(7));
assert!(delete_bitset.is_alive(8));
assert!(delete_bitset.is_deleted(9));
for doc in 0..10 {
assert_eq!(delete_bitset.is_deleted(doc), !delete_bitset.is_alive(doc));
}
assert_eq!(delete_bitset.len(), 2);
}
}

View File

@@ -23,9 +23,9 @@ values stored.
Read access performance is comparable to that of an array lookup.
*/
pub use self::alive_bitset::write_alive_bitset;
pub use self::alive_bitset::AliveBitSet;
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::delete::write_delete_bitset;
pub use self::delete::DeleteBitSet;
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
@@ -46,8 +46,8 @@ use crate::{
schema::Type,
};
mod alive_bitset;
mod bytes;
mod delete;
mod error;
mod facet_reader;
mod multivalued;

View File

@@ -2,23 +2,23 @@
//! to get mappings from old doc_id to new doc_id and vice versa, after sorting
//!
use super::{merger::SegmentReaderWithOrdinal, SegmentWriter};
use super::SegmentWriter;
use crate::{
schema::{Field, Schema},
DocId, IndexSortByField, Order, TantivyError,
DocId, IndexSortByField, Order, SegmentOrdinal, TantivyError,
};
use std::{cmp::Reverse, ops::Index};
/// Struct to provide mapping from new doc_id to old doc_id and segment.
#[derive(Clone)]
pub(crate) struct SegmentDocidMapping<'a> {
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
pub(crate) struct SegmentDocidMapping {
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
is_trivial: bool,
}
impl<'a> SegmentDocidMapping<'a> {
impl SegmentDocidMapping {
pub(crate) fn new(
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
is_trivial: bool,
) -> Self {
Self {
@@ -26,7 +26,7 @@ impl<'a> SegmentDocidMapping<'a> {
is_trivial,
}
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &(DocId, SegmentReaderWithOrdinal)> {
pub(crate) fn iter(&self) -> impl Iterator<Item = &(DocId, SegmentOrdinal)> {
self.new_doc_id_to_old_and_segment.iter()
}
pub(crate) fn len(&self) -> usize {
@@ -40,15 +40,15 @@ impl<'a> SegmentDocidMapping<'a> {
self.is_trivial
}
}
impl<'a> Index<usize> for SegmentDocidMapping<'a> {
type Output = (DocId, SegmentReaderWithOrdinal<'a>);
impl Index<usize> for SegmentDocidMapping {
type Output = (DocId, SegmentOrdinal);
fn index(&self, idx: usize) -> &Self::Output {
&self.new_doc_id_to_old_and_segment[idx]
}
}
impl<'a> IntoIterator for SegmentDocidMapping<'a> {
type Item = (DocId, SegmentReaderWithOrdinal<'a>);
impl IntoIterator for SegmentDocidMapping {
type Item = (DocId, SegmentOrdinal);
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {

View File

@@ -11,7 +11,7 @@ use crate::directory::TerminatingWrite;
use crate::directory::{DirectoryLock, GarbageCollectionResult};
use crate::docset::{DocSet, TERMINATED};
use crate::error::TantivyError;
use crate::fastfield::write_delete_bitset;
use crate::fastfield::write_alive_bitset;
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping;
use crate::indexer::operation::DeleteOperation;
@@ -29,9 +29,12 @@ use futures::executor::block_on;
use futures::future::Future;
use smallvec::smallvec;
use smallvec::SmallVec;
use wasm_mt_pool::pool_exec;
use wasm_mt::prelude::*;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
use wasm_mt_pool::prelude::*;
use std::thread;
use std::thread::JoinHandle;
@@ -75,7 +78,7 @@ pub struct IndexWriter {
heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
workers_join_handle: Vec<JoinHandle<Result<JsValue, JsValue>>>,
operation_receiver: OperationReceiver,
operation_sender: OperationSender,
@@ -90,10 +93,12 @@ pub struct IndexWriter {
stamper: Stamper,
committed_opstamp: Opstamp,
worker_pool: wasm_mt_pool::ThreadPool,
}
fn compute_deleted_bitset(
delete_bitset: &mut BitSet,
alive_bitset: &mut BitSet,
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
doc_opstamps: &DocToOpstampMapping,
@@ -114,7 +119,7 @@ fn compute_deleted_bitset(
let mut doc_matching_deleted_term = docset.doc();
while doc_matching_deleted_term != TERMINATED {
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
delete_bitset.insert(doc_matching_deleted_term);
alive_bitset.remove(doc_matching_deleted_term);
might_have_changed = true;
}
doc_matching_deleted_term = docset.advance();
@@ -141,7 +146,7 @@ pub(crate) fn advance_deletes(
return Ok(());
}
if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
if segment_entry.alive_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
// There has been no `DeleteOperation` between the segment status and `target_opstamp`.
return Ok(());
}
@@ -149,15 +154,15 @@ pub(crate) fn advance_deletes(
let segment_reader = SegmentReader::open(&segment)?;
let max_doc = segment_reader.max_doc();
let mut delete_bitset: BitSet = match segment_entry.delete_bitset() {
Some(previous_delete_bitset) => (*previous_delete_bitset).clone(),
None => BitSet::with_max_value(max_doc),
let mut alive_bitset: BitSet = match segment_entry.alive_bitset() {
Some(previous_alive_bitset) => (*previous_alive_bitset).clone(),
None => BitSet::with_max_value_and_full(max_doc),
};
let num_deleted_docs_before = segment.meta().num_deleted_docs();
compute_deleted_bitset(
&mut delete_bitset,
&mut alive_bitset,
&segment_reader,
segment_entry.delete_cursor(),
&DocToOpstampMapping::None,
@@ -167,20 +172,21 @@ pub(crate) fn advance_deletes(
// TODO optimize
// It should be possible to do something smarter by manipulation bitsets directly
// to compute this union.
if let Some(seg_delete_bitset) = segment_reader.delete_bitset() {
if let Some(seg_alive_bitset) = segment_reader.alive_bitset() {
for doc in 0u32..max_doc {
if seg_delete_bitset.is_deleted(doc) {
delete_bitset.insert(doc);
if seg_alive_bitset.is_deleted(doc) {
alive_bitset.remove(doc);
}
}
}
let num_deleted_docs: u32 = delete_bitset.len() as u32;
let num_alive_docs: u32 = alive_bitset.len() as u32;
let num_deleted_docs = max_doc - num_alive_docs;
if num_deleted_docs > num_deleted_docs_before {
// There are new deletes. We need to write a new delete file.
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::Delete)?;
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
write_alive_bitset(&alive_bitset, &mut delete_file)?;
delete_file.terminate()?;
}
@@ -226,13 +232,12 @@ fn index_documents(
let segment_with_max_doc = segment.with_max_doc(max_doc);
let delete_bitset_opt =
apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
let alive_bitset_opt = apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
let meta = segment_with_max_doc.meta().clone();
meta.untrack_temp_docstore();
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(meta, delete_cursor, delete_bitset_opt);
let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt);
block_on(segment_updater.schedule_add_segment(segment_entry))?;
Ok(true)
}
@@ -259,7 +264,7 @@ fn apply_deletes(
let doc_to_opstamps = DocToOpstampMapping::WithMap(doc_opstamps);
let max_doc = segment.meta().max_doc();
let mut deleted_bitset = BitSet::with_max_value(max_doc);
let mut deleted_bitset = BitSet::with_max_value_and_full(max_doc);
let may_have_deletes = compute_deleted_bitset(
&mut deleted_bitset,
&segment_reader,
@@ -318,6 +323,7 @@ impl IndexWriter {
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let worker_pool = block_on(wasm_mt_pool::ThreadPool::new(num_threads, crate::PKG_JS).and_init()).unwrap();
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
@@ -338,6 +344,7 @@ impl IndexWriter {
stamper,
worker_id: 0,
worker_pool,
};
index_writer.start_workers()?;
Ok(index_writer)
@@ -348,6 +355,11 @@ impl IndexWriter {
self.operation_sender = sender;
}
/// Accessor to the index.
pub fn index(&self) -> &Index {
&self.index
}
/// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> crate::Result<()> {
@@ -406,9 +418,8 @@ impl IndexWriter {
let mem_budget = self.heap_size_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
.spawn(move || {
let join_handle: JoinHandle<crate::Result<_>> = pool_exec!(self.worker_pool,
move || {
loop {
let mut document_iterator =
document_receiver_clone.clone().into_iter().peekable();
@@ -849,7 +860,7 @@ mod tests {
let reader = index.reader().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.segment_reader(0u32).num_deleted_docs(), 0);
assert_eq!(searcher.segment_reader(0u32).num_docs(), 2);
index_writer.delete_term(Term::from_field_text(text_field, "hello1"));
assert!(index_writer.commit().is_ok());
@@ -857,7 +868,7 @@ mod tests {
assert!(reader.reload().is_ok());
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.segment_reader(0u32).num_deleted_docs(), 1);
assert_eq!(searcher.segment_reader(0u32).num_docs(), 1);
let previous_delete_opstamp = index.load_metas().unwrap().segments[0].delete_opstamp();
@@ -869,7 +880,7 @@ mod tests {
assert!(reader.reload().is_ok());
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.segment_reader(0u32).num_deleted_docs(), 1);
assert_eq!(searcher.segment_reader(0u32).num_docs(), 1);
let after_delete_opstamp = index.load_metas().unwrap().segments[0].delete_opstamp();
assert_eq!(after_delete_opstamp, previous_delete_opstamp);
@@ -1513,7 +1524,7 @@ mod tests {
for segment_reader in searcher.segment_readers().iter() {
let store_reader = segment_reader.get_store_reader().unwrap();
// test store iterator
for doc in store_reader.iter(segment_reader.delete_bitset()) {
for doc in store_reader.iter(segment_reader.alive_bitset()) {
let id = doc
.unwrap()
.get_first(id_field)
@@ -1639,7 +1650,7 @@ mod tests {
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.max_doc(), 2);
assert_eq!(segment_reader.num_deleted_docs(), 1);
assert_eq!(segment_reader.num_docs(), 1);
Ok(())
}

View File

@@ -1,6 +1,5 @@
use crate::error::DataCorruption;
use crate::fastfield::CompositeFastFieldSerializer;
use crate::fastfield::DeleteBitSet;
use crate::fastfield::DynamicFastFieldReader;
use crate::fastfield::FastFieldDataAccess;
use crate::fastfield::FastFieldReader;
@@ -29,7 +28,6 @@ use crate::{
SegmentOrdinal,
};
use crate::{DocId, InvertedIndexReader, SegmentComponent};
use common::HasLen;
use itertools::Itertools;
use measure_time::debug_time;
use std::cmp;
@@ -69,58 +67,33 @@ fn compute_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::R
.sum::<u64>())
}
/// `ReaderWithOrdinal` is used to be able to easier associate
/// data with a `SegmentReader`. The ordinal is supposed to be
/// used as an index access.
///
/// The ordinal identifies the position within `Merger` readers.
#[derive(Clone, Copy)]
pub(crate) struct SegmentReaderWithOrdinal<'a> {
pub reader: &'a SegmentReader,
pub ordinal: SegmentOrdinal,
}
impl<'a> From<(usize, &'a SegmentReader)> for SegmentReaderWithOrdinal<'a> {
fn from(data: (usize, &'a SegmentReader)) -> Self {
SegmentReaderWithOrdinal {
reader: data.1,
ordinal: data.0 as u32,
}
}
}
pub struct IndexMerger {
index_settings: IndexSettings,
schema: Schema,
readers: Vec<SegmentReader>,
pub(crate) readers: Vec<SegmentReader>,
max_doc: u32,
}
fn compute_min_max_val(
u64_reader: &impl FastFieldReader<u64>,
max_doc: DocId,
delete_bitset_opt: Option<&DeleteBitSet>,
segment_reader: &SegmentReader,
) -> Option<(u64, u64)> {
if max_doc == 0 {
None
} else {
match delete_bitset_opt {
Some(delete_bitset) => {
// some deleted documents,
// we need to recompute the max / min
minmax(
(0..max_doc)
.filter(|doc_id| delete_bitset.is_alive(*doc_id))
.map(|doc_id| u64_reader.get(doc_id)),
)
}
None => {
// no deleted documents,
// we can use the previous min_val, max_val.
Some((u64_reader.min_value(), u64_reader.max_value()))
}
}
if segment_reader.max_doc() == 0 {
return None;
}
if segment_reader.alive_bitset().is_none() {
// no deleted documents,
// we can use the previous min_val, max_val.
return Some((u64_reader.min_value(), u64_reader.max_value()));
}
// some deleted documents,
// we need to recompute the max / min
minmax(
segment_reader
.doc_ids_alive()
.map(|doc_id| u64_reader.get(doc_id)),
)
}
struct TermOrdinalMapping {
@@ -252,8 +225,8 @@ impl IndexMerger {
.iter()
.map(|reader| reader.get_fieldnorms_reader(field))
.collect::<Result<_, _>>()?;
for (doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
let fieldnorms_reader = &fieldnorms_readers[reader_with_ordinal.ordinal as usize];
for (doc_id, reader_ordinal) in doc_id_mapping.iter() {
let fieldnorms_reader = &fieldnorms_readers[*reader_ordinal as usize];
let fieldnorm_id = fieldnorms_reader.fieldnorm_id(*doc_id);
fieldnorms_data.push(fieldnorm_id);
}
@@ -326,7 +299,7 @@ impl IndexMerger {
.fast_fields()
.typed_fast_field_reader(field)
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");
compute_min_max_val(&u64_reader, reader.max_doc(), reader.delete_bitset())
compute_min_max_val(&u64_reader, reader)
})
.flatten()
.reduce(|a, b| {
@@ -352,25 +325,25 @@ impl IndexMerger {
};
#[derive(Clone)]
struct SortedDocidFieldAccessProvider<'a> {
doc_id_mapping: &'a SegmentDocidMapping<'a>,
doc_id_mapping: &'a SegmentDocidMapping,
fast_field_readers: &'a Vec<DynamicFastFieldReader<u64>>,
}
impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> {
fn get_val(&self, doc: u64) -> u64 {
let (doc_id, reader_with_ordinal) = self.doc_id_mapping[doc as usize];
self.fast_field_readers[reader_with_ordinal.ordinal as usize].get(doc_id)
let (doc_id, reader_ordinal) = self.doc_id_mapping[doc as usize];
self.fast_field_readers[reader_ordinal as usize].get(doc_id)
}
}
let fastfield_accessor = SortedDocidFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter1 = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
let iter1 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
});
let iter2 = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
let iter2 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
});
fast_field_serializer.create_auto_detect_u64_fast_field(
@@ -390,9 +363,10 @@ impl IndexMerger {
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<bool> {
let reader_and_field_accessors = self.get_reader_with_sort_field_accessor(sort_by_field)?;
let reader_ordinal_and_field_accessors =
self.get_reader_with_sort_field_accessor(sort_by_field)?;
let everything_is_in_order = reader_and_field_accessors
let everything_is_in_order = reader_ordinal_and_field_accessors
.into_iter()
.map(|reader| reader.1)
.tuple_windows()
@@ -418,24 +392,21 @@ impl IndexMerger {
pub(crate) fn get_reader_with_sort_field_accessor<'a, 'b>(
&'a self,
sort_by_field: &'b IndexSortByField,
) -> crate::Result<
Vec<(
SegmentReaderWithOrdinal<'a>,
impl FastFieldReader<u64> + Clone,
)>,
> {
let reader_and_field_accessors = self
) -> crate::Result<Vec<(SegmentOrdinal, impl FastFieldReader<u64> + Clone)>> {
let reader_ordinal_and_field_accessors = self
.readers
.iter()
.enumerate()
.map(Into::into)
.map(|reader_with_ordinal: SegmentReaderWithOrdinal| {
let value_accessor =
Self::get_sort_field_accessor(reader_with_ordinal.reader, sort_by_field)?;
Ok((reader_with_ordinal, value_accessor))
.map(|(reader_ordinal, _)| reader_ordinal as SegmentOrdinal)
.map(|reader_ordinal: SegmentOrdinal| {
let value_accessor = Self::get_sort_field_accessor(
&self.readers[reader_ordinal as usize],
sort_by_field,
)?;
Ok((reader_ordinal, value_accessor))
})
.collect::<crate::Result<Vec<_>>>()?;
Ok(reader_and_field_accessors)
Ok(reader_ordinal_and_field_accessors)
}
/// Generates the doc_id mapping where position in the vec=new
@@ -446,50 +417,54 @@ impl IndexMerger {
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<SegmentDocidMapping> {
let reader_and_field_accessors = self.get_reader_with_sort_field_accessor(sort_by_field)?;
let reader_ordinal_and_field_accessors =
self.get_reader_with_sort_field_accessor(sort_by_field)?;
// Loading the field accessor on demand causes a 15x regression
// create iterators over segment/sort_accessor/doc_id tuple
let doc_id_reader_pair =
reader_and_field_accessors
reader_ordinal_and_field_accessors
.iter()
.map(|reader_and_field_accessor| {
reader_and_field_accessor
.0
.reader
.doc_ids_alive()
.map(move |doc_id| {
(
doc_id,
reader_and_field_accessor.0,
&reader_and_field_accessor.1,
)
})
let reader = &self.readers[reader_and_field_accessor.0 as usize];
reader.doc_ids_alive().map(move |doc_id| {
(
doc_id,
reader_and_field_accessor.0,
&reader_and_field_accessor.1,
)
})
});
let total_num_new_docs = self
.readers
.iter()
.map(|reader| reader.num_docs() as usize)
.sum();
let mut sorted_doc_ids = Vec::with_capacity(total_num_new_docs);
// create iterator tuple of (old doc_id, reader) in order of the new doc_ids
let sorted_doc_ids: Vec<(DocId, SegmentReaderWithOrdinal)> = doc_id_reader_pair
.into_iter()
.kmerge_by(|a, b| {
let val1 = a.2.get(a.0);
let val2 = b.2.get(b.0);
if sort_by_field.order == Order::Asc {
val1 < val2
} else {
val1 > val2
}
})
.map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id))
.collect::<Vec<_>>();
sorted_doc_ids.extend(
doc_id_reader_pair
.into_iter()
.kmerge_by(|a, b| {
let val1 = a.2.get(a.0);
let val2 = b.2.get(b.0);
if sort_by_field.order == Order::Asc {
val1 < val2
} else {
val1 > val2
}
})
.map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)),
);
Ok(SegmentDocidMapping::new(sorted_doc_ids, false))
}
// Creating the index file to point into the data, generic over `BytesFastFieldReader` and
// `MultiValuedFastFieldReader`
//
// Important: reader_and_field_accessor needs
// to have the same order as self.readers since ReaderWithOrdinal
// is used to index the reader_and_field_accessors vec.
fn write_1_n_fast_field_idx_generic<T: MultiValueLength>(
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
@@ -503,13 +478,11 @@ impl IndexMerger {
// what should be the bit length use for bitpacking.
let mut num_docs = 0;
for (reader, u64s_reader) in reader_and_field_accessors.iter() {
if let Some(delete_bitset) = reader.delete_bitset() {
num_docs += reader.max_doc() as u64 - delete_bitset.len() as u64;
for doc in 0u32..reader.max_doc() {
if delete_bitset.is_alive(doc) {
let num_vals = u64s_reader.get_len(doc) as u64;
total_num_vals += num_vals;
}
if let Some(alive_bitset) = reader.alive_bitset() {
num_docs += alive_bitset.num_alive_docs() as u64;
for doc in reader.doc_ids_alive() {
let num_vals = u64s_reader.get_len(doc) as u64;
total_num_vals += num_vals;
}
} else {
num_docs += reader.max_doc() as u64;
@@ -531,10 +504,10 @@ impl IndexMerger {
// acccess on the fly or 2. change the codec api to make random access optional, but
// they both have also major drawbacks.
let mut offsets = vec![];
let mut offsets = Vec::with_capacity(doc_id_mapping.len());
let mut offset = 0;
for (doc_id, reader) in doc_id_mapping.iter() {
let reader = &reader_and_field_accessors[reader.ordinal as usize].1;
let reader = &reader_and_field_accessors[*reader as usize].1;
offsets.push(offset);
offset += reader.get_len(*doc_id) as u64;
}
@@ -556,7 +529,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocidMapping,
) -> crate::Result<Vec<u64>> {
let reader_and_field_accessors = self.readers.iter().map(|reader|{
let reader_ordinal_and_field_accessors = self.readers.iter().map(|reader|{
let u64s_reader: MultiValuedFastFieldReader<u64> = reader.fast_fields()
.typed_fast_field_multi_reader(field)
.expect("Failed to find index for multivalued field. This is a bug in tantivy, please report.");
@@ -567,7 +540,7 @@ impl IndexMerger {
field,
fast_field_serializer,
doc_id_mapping,
&reader_and_field_accessors,
&reader_ordinal_and_field_accessors,
)
}
@@ -606,11 +579,11 @@ impl IndexMerger {
fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?;
let mut vals = Vec::with_capacity(100);
for (old_doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
let term_ordinal_mapping: &[TermOrdinal] =
term_ordinal_mappings.get_segment(reader_with_ordinal.ordinal as usize);
term_ordinal_mappings.get_segment(*reader_ordinal as usize);
let ff_reader = &fast_field_reader[reader_with_ordinal.ordinal as usize];
let ff_reader = &fast_field_reader[*reader_ordinal as usize];
ff_reader.get_vals(*old_doc_id, &mut vals);
for &prev_term_ord in &vals {
let new_term_ord = term_ordinal_mapping[prev_term_ord as usize];
@@ -626,21 +599,25 @@ impl IndexMerger {
/// Creates a mapping if the segments are stacked. this is helpful to merge codelines between index
/// sorting and the others
pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result<SegmentDocidMapping> {
let mapping: Vec<_> = self
let total_num_new_docs = self
.readers
.iter()
.enumerate()
.map(|(ordinal, reader)| {
let reader_with_ordinal = SegmentReaderWithOrdinal {
ordinal: ordinal as u32,
reader,
};
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_with_ordinal))
})
.flatten()
.collect();
.map(|reader| reader.num_docs() as usize)
.sum();
let mut mapping = Vec::with_capacity(total_num_new_docs);
mapping.extend(
self.readers
.iter()
.enumerate()
.map(|(reader_ordinal, reader)| {
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_ordinal as SegmentOrdinal))
})
.flatten(),
);
Ok(SegmentDocidMapping::new(mapping, true))
}
fn write_multi_fast_field(
@@ -704,7 +681,7 @@ impl IndexMerger {
};
struct SortedDocidMultiValueAccessProvider<'a> {
doc_id_mapping: &'a SegmentDocidMapping<'a>,
doc_id_mapping: &'a SegmentDocidMapping,
fast_field_readers: &'a Vec<MultiValuedFastFieldReader<u64>>,
offsets: Vec<u64>,
}
@@ -723,13 +700,11 @@ impl IndexMerger {
let num_pos_covered_until_now = self.offsets[new_docid];
let pos_in_values = pos - num_pos_covered_until_now;
let (old_doc_id, reader_with_ordinal) = self.doc_id_mapping[new_docid as usize];
let num_vals = self.fast_field_readers[reader_with_ordinal.ordinal as usize]
.get_len(old_doc_id);
let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_docid as usize];
let num_vals = self.fast_field_readers[reader_ordinal as usize].get_len(old_doc_id);
assert!(num_vals >= pos_in_values);
let mut vals = vec![];
self.fast_field_readers[reader_with_ordinal.ordinal as usize]
.get_vals(old_doc_id, &mut vals);
self.fast_field_readers[reader_ordinal as usize].get_vals(old_doc_id, &mut vals);
vals[pos_in_values as usize]
}
@@ -741,8 +716,8 @@ impl IndexMerger {
};
let iter1 = doc_id_mapping
.iter()
.map(|(doc_id, reader_with_ordinal)| {
let ff_reader = &ff_readers[reader_with_ordinal.ordinal as usize];
.map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
@@ -750,8 +725,8 @@ impl IndexMerger {
.flatten();
let iter2 = doc_id_mapping
.iter()
.map(|(doc_id, reader_with_ordinal)| {
let ff_reader = &ff_readers[reader_with_ordinal.ordinal as usize];
.map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
@@ -793,8 +768,8 @@ impl IndexMerger {
)?;
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1);
for (doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
let bytes_reader = &reader_and_field_accessors[reader_with_ordinal.ordinal as usize].1;
for (doc_id, reader_ordinal) in doc_id_mapping.iter() {
let bytes_reader = &reader_and_field_accessors[*reader_ordinal as usize].1;
let val = bytes_reader.get_bytes(*doc_id);
serialize_vals.write_all(val)?;
}
@@ -848,8 +823,8 @@ impl IndexMerger {
segment_local_map
})
.collect();
for (new_doc_id, (old_doc_id, segment_and_ordinal)) in doc_id_mapping.iter().enumerate() {
let segment_map = &mut merged_doc_id_map[segment_and_ordinal.ordinal as usize];
for (new_doc_id, (old_doc_id, segment_ordinal)) in doc_id_mapping.iter().enumerate() {
let segment_map = &mut merged_doc_id_map[*segment_ordinal as usize];
segment_map[*old_doc_id as usize] = Some(new_doc_id as DocId);
}
@@ -896,9 +871,9 @@ impl IndexMerger {
let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord];
let segment_postings = inverted_index
.read_postings_from_terminfo(&term_info, segment_postings_option)?;
let delete_bitset_opt = segment_reader.delete_bitset();
let doc_freq = if let Some(delete_bitset) = delete_bitset_opt {
segment_postings.doc_freq_given_deletes(delete_bitset)
let alive_bitset_opt = segment_reader.alive_bitset();
let doc_freq = if let Some(alive_bitset) = alive_bitset_opt {
segment_postings.doc_freq_given_deletes(alive_bitset)
} else {
segment_postings.doc_freq()
};
@@ -1018,11 +993,11 @@ impl IndexMerger {
let mut document_iterators: Vec<_> = store_readers
.iter()
.enumerate()
.map(|(i, store)| store.iter_raw(self.readers[i].delete_bitset()))
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();
if !doc_id_mapping.is_trivial() {
for (old_doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[reader_with_ordinal.ordinal as usize];
for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
@@ -1037,7 +1012,7 @@ impl IndexMerger {
} else {
for reader in &self.readers {
let store_reader = reader.get_store_reader()?;
if reader.num_deleted_docs() > 0
if reader.has_deletes()
// If there is not enough data in the store, we avoid stacking in order to
// avoid creating many small blocks in the doc store. Once we have 5 full blocks,
// we start stacking. In the worst case 2/7 of the blocks would be very small.
@@ -1054,7 +1029,7 @@ impl IndexMerger {
|| store_reader.block_checkpoints().take(7).count() < 6
|| store_reader.compressor() != store_writer.compressor()
{
for doc_bytes_res in store_reader.iter_raw(reader.delete_bitset()) {
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
}

View File

@@ -1,6 +1,6 @@
#[cfg(test)]
mod tests {
use crate::fastfield::{DeleteBitSet, FastFieldReader};
use crate::fastfield::{AliveBitSet, FastFieldReader};
use crate::schema::IndexRecordOption;
use crate::{
collector::TopDocs,
@@ -257,10 +257,10 @@ mod tests {
.unwrap();
assert_eq!(postings.doc_freq(), 2);
let fallback_bitset = DeleteBitSet::for_test(&[0], 100);
let fallback_bitset = AliveBitSet::for_test_from_deleted_docs(&[0], 100);
assert_eq!(
postings.doc_freq_given_deletes(
segment_reader.delete_bitset().unwrap_or(&fallback_bitset)
segment_reader.alive_bitset().unwrap_or(&fallback_bitset)
),
2
);
@@ -336,10 +336,10 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(postings.doc_freq(), 2);
let fallback_bitset = DeleteBitSet::for_test(&[0], 100);
let fallback_bitset = AliveBitSet::for_test_from_deleted_docs(&[0], 100);
assert_eq!(
postings.doc_freq_given_deletes(
segment_reader.delete_bitset().unwrap_or(&fallback_bitset)
segment_reader.alive_bitset().unwrap_or(&fallback_bitset)
),
2
);
@@ -446,10 +446,10 @@ mod tests {
.unwrap();
assert_eq!(postings.doc_freq(), 2);
let fallback_bitset = DeleteBitSet::for_test(&[0], 100);
let fallback_bitset = AliveBitSet::for_test_from_deleted_docs(&[0], 100);
assert_eq!(
postings.doc_freq_given_deletes(
segment_reader.delete_bitset().unwrap_or(&fallback_bitset)
segment_reader.alive_bitset().unwrap_or(&fallback_bitset)
),
2
);
@@ -546,8 +546,9 @@ mod bench_sorted_index_merge {
let doc_id_mapping = merger.generate_doc_id_mapping(&sort_by_field).unwrap();
b.iter(|| {
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader)|{
let u64_reader: DynamicFastFieldReader<u64> = reader.reader
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, ordinal)|{
let reader = &merger.readers[*ordinal as usize];
let u64_reader: DynamicFastFieldReader<u64> = reader
.fast_fields()
.typed_fast_field_reader(field)
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");

View File

@@ -9,18 +9,16 @@ use std::fmt;
///
/// In addition to segment `meta`,
/// it contains a few transient states
/// - `state` expresses whether the segment is already in the
/// middle of a merge
/// - `delete_bitset` is a bitset describing
/// documents that were deleted during the commit
/// - `alive_bitset` is a bitset describing
/// documents that were alive during the commit
/// itself.
/// - `delete_cursor` is the position in the delete queue.
/// Deletes happening before the cursor are reflected either
/// in the .del file or in the `delete_bitset`.
/// in the .del file or in the `alive_bitset`.
#[derive(Clone)]
pub struct SegmentEntry {
meta: SegmentMeta,
delete_bitset: Option<BitSet>,
alive_bitset: Option<BitSet>,
delete_cursor: DeleteCursor,
}
@@ -29,11 +27,11 @@ impl SegmentEntry {
pub fn new(
segment_meta: SegmentMeta,
delete_cursor: DeleteCursor,
delete_bitset: Option<BitSet>,
alive_bitset: Option<BitSet>,
) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
delete_bitset,
alive_bitset,
delete_cursor,
}
}
@@ -41,8 +39,8 @@ impl SegmentEntry {
/// Return a reference to the segment entry deleted bitset.
///
/// `DocId` in this bitset are flagged as deleted.
pub fn delete_bitset(&self) -> Option<&BitSet> {
self.delete_bitset.as_ref()
pub fn alive_bitset(&self) -> Option<&BitSet> {
self.alive_bitset.as_ref()
}
/// Set the `SegmentMeta` for this segment.

View File

@@ -11,6 +11,8 @@
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
#![warn(missing_docs)]
#![feature(async_closure)]
//! # `tantivy`
//!
//! Tantivy is a search engine library.
@@ -126,6 +128,8 @@ mod macros;
pub use crate::error::TantivyError;
pub use chrono;
pub const PKG_JS: &'static str = "./pkg/pool_exec.js"; // path to `wasm-bindgen`'s JS binding
/// Tantivy result.
///
/// Within tantivy, please avoid importing `Result` using `use crate::Result`

View File

@@ -133,7 +133,8 @@ impl MultiFieldPostingsWriter {
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> =
self.term_index.iter().collect();
Vec::with_capacity(self.term_index.len());
term_offsets.extend(self.term_index.iter());
term_offsets.sort_unstable_by_key(|&(k, _, _)| k);
let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =

View File

@@ -1,5 +1,5 @@
use crate::docset::DocSet;
use crate::fastfield::DeleteBitSet;
use crate::fastfield::AliveBitSet;
use crate::positions::PositionReader;
use crate::postings::branchless_binary_search;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
@@ -34,7 +34,7 @@ impl SegmentPostings {
///
/// This method will clone and scan through the posting lists.
/// (this is a rather expensive operation).
pub fn doc_freq_given_deletes(&self, delete_bitset: &DeleteBitSet) -> u32 {
pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 {
let mut docset = self.clone();
let mut doc_freq = 0;
loop {
@@ -42,7 +42,7 @@ impl SegmentPostings {
if doc == TERMINATED {
return doc_freq;
}
if delete_bitset.is_alive(doc) {
if alive_bitset.is_alive(doc) {
doc_freq += 1u32;
}
docset.advance();
@@ -268,7 +268,7 @@ mod tests {
use common::HasLen;
use crate::docset::{DocSet, TERMINATED};
use crate::fastfield::DeleteBitSet;
use crate::fastfield::AliveBitSet;
use crate::postings::postings::Postings;
#[test]
@@ -296,9 +296,10 @@ mod tests {
fn test_doc_freq() {
let docs = SegmentPostings::create_from_docs(&[0, 2, 10]);
assert_eq!(docs.doc_freq(), 3);
let delete_bitset = DeleteBitSet::for_test(&[2], 12);
assert_eq!(docs.doc_freq_given_deletes(&delete_bitset), 2);
let all_deleted = DeleteBitSet::for_test(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 12);
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[2], 12);
assert_eq!(docs.doc_freq_given_deletes(&alive_bitset), 2);
let all_deleted =
AliveBitSet::for_test_from_deleted_docs(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 12);
assert_eq!(docs.doc_freq_given_deletes(&all_deleted), 0);
}
}

View File

@@ -148,6 +148,10 @@ impl TermHashMap {
unordered_term_id
}
pub fn len(&self) -> usize {
self.len
}
pub fn iter(&self) -> Iter<'_> {
Iter {
inner: self.occupied.iter(),

View File

@@ -1,4 +1,4 @@
use crate::fastfield::DeleteBitSet;
use crate::fastfield::AliveBitSet;
use crate::query::explanation::does_not_match;
use crate::query::{Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, Searcher, SegmentReader, Term};
@@ -118,8 +118,8 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
self.underlying.size_hint()
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
self.underlying.count(delete_bitset)
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
self.underlying.count(alive_bitset)
}
fn count_including_deleted(&mut self) -> u32 {

View File

@@ -40,8 +40,8 @@ impl Weight for TermWeight {
}
fn count(&self, reader: &SegmentReader) -> crate::Result<u32> {
if let Some(delete_bitset) = reader.delete_bitset() {
Ok(self.scorer(reader, 1.0)?.count(delete_bitset))
if let Some(alive_bitset) = reader.alive_bitset() {
Ok(self.scorer(reader, 1.0)?.count(alive_bitset))
} else {
let field = self.term.field();
let inv_index = reader.inverted_index(field)?;

View File

@@ -59,8 +59,8 @@ pub trait Weight: Send + Sync + 'static {
/// Returns the number documents within the given `SegmentReader`.
fn count(&self, reader: &SegmentReader) -> crate::Result<u32> {
let mut scorer = self.scorer(reader, 1.0)?;
if let Some(delete_bitset) = reader.delete_bitset() {
Ok(scorer.count(delete_bitset))
if let Some(alive_bitset) = reader.alive_bitset() {
Ok(scorer.count(alive_bitset))
} else {
Ok(scorer.count_including_deleted())
}

View File

@@ -12,7 +12,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
unsafe {
compressed.set_len(maximum_ouput_size + 4);
}
let bytes_written = compress_into(uncompressed, compressed, 4)
let bytes_written = compress_into(uncompressed, &mut compressed[4..])
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
let num_bytes = uncompressed.len() as u32;
compressed[0..4].copy_from_slice(&num_bytes.to_le_bytes());
@@ -35,7 +35,7 @@ pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<(
unsafe {
decompressed.set_len(uncompressed_size);
}
let bytes_written = decompress_into(&compressed[4..], decompressed, 0)
let bytes_written = decompress_into(&compressed[4..], decompressed)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
if bytes_written != uncompressed_size {
return Err(io::Error::new(

View File

@@ -57,7 +57,7 @@ pub mod tests {
use futures::executor::block_on;
use super::*;
use crate::fastfield::DeleteBitSet;
use crate::fastfield::AliveBitSet;
use crate::schema::{self, FieldValue, TextFieldIndexing, STORED, TEXT};
use crate::schema::{Document, TextOptions};
use crate::{
@@ -113,7 +113,8 @@ pub mod tests {
fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> {
// this will cover deletion of the first element in a checkpoint
let deleted_docids = (200..300).collect::<Vec<_>>();
let delete_bitset = DeleteBitSet::for_test(&deleted_docids, NUM_DOCS as u32);
let alive_bitset =
AliveBitSet::for_test_from_deleted_docs(&deleted_docids, NUM_DOCS as u32);
let path = Path::new("store");
let directory = RamDirectory::create();
@@ -134,7 +135,7 @@ pub mod tests {
);
}
for (_, doc) in store.iter(Some(&delete_bitset)).enumerate() {
for (_, doc) in store.iter(Some(&alive_bitset)).enumerate() {
let doc = doc?;
let title_content = doc.get_first(field_title).unwrap().text().unwrap();
if !title_content.starts_with("Doc ") {
@@ -146,7 +147,7 @@ pub mod tests {
.unwrap()
.parse::<u32>()
.unwrap();
if delete_bitset.is_deleted(id) {
if alive_bitset.is_deleted(id) {
panic!("unexpected deleted document {}", id);
}
}
@@ -230,7 +231,7 @@ pub mod tests {
let searcher = index.reader().unwrap().searcher();
let reader = searcher.segment_reader(0);
let store = reader.get_store_reader().unwrap();
for doc in store.iter(reader.delete_bitset()) {
for doc in store.iter(reader.alive_bitset()) {
assert_eq!(
*doc?.get_first(text_field).unwrap().text().unwrap(),
"deletemenot".to_string()
@@ -288,7 +289,7 @@ pub mod tests {
let reader = searcher.segment_readers().iter().last().unwrap();
let store = reader.get_store_reader().unwrap();
for doc in store.iter(reader.delete_bitset()).take(50) {
for doc in store.iter(reader.alive_bitset()).take(50) {
assert_eq!(
*doc?.get_first(text_field).unwrap().text().unwrap(),
LOREM.to_string()

View File

@@ -5,7 +5,7 @@ use crate::schema::Document;
use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;
use crate::{error::DataCorruption, fastfield::DeleteBitSet};
use crate::{error::DataCorruption, fastfield::AliveBitSet};
use common::{BinarySerializable, HasLen, VInt};
use lru::LruCache;
use std::io;
@@ -133,12 +133,12 @@ impl StoreReader {
/// Iterator over all Documents in their order as they are stored in the doc store.
/// Use this, if you want to extract all Documents from the doc store.
/// The delete_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
/// The alive_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
pub fn iter<'a: 'b, 'b>(
&'b self,
delete_bitset: Option<&'a DeleteBitSet>,
alive_bitset: Option<&'a AliveBitSet>,
) -> impl Iterator<Item = crate::Result<Document>> + 'b {
self.iter_raw(delete_bitset).map(|doc_bytes_res| {
self.iter_raw(alive_bitset).map(|doc_bytes_res| {
let mut doc_bytes = doc_bytes_res?;
Ok(Document::deserialize(&mut doc_bytes)?)
})
@@ -146,10 +146,10 @@ impl StoreReader {
/// Iterator over all RawDocuments in their order as they are stored in the doc store.
/// Use this, if you want to extract all Documents from the doc store.
/// The delete_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
/// The alive_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong.
pub(crate) fn iter_raw<'a: 'b, 'b>(
&'b self,
delete_bitset: Option<&'a DeleteBitSet>,
alive_bitset: Option<&'a AliveBitSet>,
) -> impl Iterator<Item = crate::Result<OwnedBytes>> + 'b {
let last_docid = self
.block_checkpoints()
@@ -179,7 +179,7 @@ impl StoreReader {
num_skipped = 0;
}
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if alive {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
// the map block will move over the num_skipped, so we reset to 0

View File

@@ -61,3 +61,31 @@ impl<'a> TokenStream for AlphaNumOnlyFilterStream<'a> {
self.tail.token_mut()
}
}
#[cfg(test)]
mod tests {
use crate::tokenizer::tests::assert_token;
use crate::tokenizer::{AlphaNumOnlyFilter, SimpleTokenizer, TextAnalyzer, Token};
#[test]
fn test_alphanum_only() {
let tokens = token_stream_helper("I am a cat. 我輩は猫である。(1906)");
assert_eq!(tokens.len(), 5);
assert_token(&tokens[0], 0, "I", 0, 1);
assert_token(&tokens[1], 1, "am", 2, 4);
assert_token(&tokens[2], 2, "a", 5, 6);
assert_token(&tokens[3], 3, "cat", 7, 10);
assert_token(&tokens[4], 5, "1906", 37, 41);
}
fn token_stream_helper(text: &str) -> Vec<Token> {
let a = TextAnalyzer::from(SimpleTokenizer).filter(AlphaNumOnlyFilter);
let mut token_stream = a.token_stream(text);
let mut tokens: Vec<Token> = vec![];
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
token_stream.process(&mut add_token);
tokens
}
}

View File

@@ -56,31 +56,30 @@ impl<'a> TokenStream for LowerCaserTokenStream<'a> {
#[cfg(test)]
mod tests {
use crate::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer};
use crate::tokenizer::tests::assert_token;
use crate::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, Token};
#[test]
fn test_to_lower_case() {
assert_eq!(
lowercase_helper("Русский текст"),
vec!["русский".to_string(), "текст".to_string()]
);
let tokens = token_stream_helper("Tree");
assert_eq!(tokens.len(), 1);
assert_token(&tokens[0], 0, "tree", 0, 4);
let tokens = token_stream_helper("Русский текст");
assert_eq!(tokens.len(), 2);
assert_token(&tokens[0], 0, "русский", 0, 14);
assert_token(&tokens[1], 1, "текст", 15, 25);
}
fn lowercase_helper(text: &str) -> Vec<String> {
let mut tokens = vec![];
fn token_stream_helper(text: &str) -> Vec<Token> {
let mut token_stream = TextAnalyzer::from(SimpleTokenizer)
.filter(LowerCaser)
.token_stream(text);
while token_stream.advance() {
let token_text = token_stream.token().text.clone();
tokens.push(token_text);
}
let mut tokens = vec![];
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
token_stream.process(&mut add_token);
tokens
}
#[test]
fn test_lowercaser() {
assert_eq!(lowercase_helper("Tree"), vec!["tree".to_string()]);
assert_eq!(lowercase_helper("Русский"), vec!["русский".to_string()]);
}
}

View File

@@ -42,3 +42,27 @@ impl TokenStream for RawTokenStream {
&mut self.token
}
}
#[cfg(test)]
mod tests {
use crate::tokenizer::tests::assert_token;
use crate::tokenizer::{RawTokenizer, TextAnalyzer, Token};
#[test]
fn test_raw_tokenizer() {
let tokens = token_stream_helper("Hello, happy tax payer!");
assert_eq!(tokens.len(), 1);
assert_token(&tokens[0], 0, "Hello, happy tax payer!", 0, 23);
}
fn token_stream_helper(text: &str) -> Vec<Token> {
let a = TextAnalyzer::from(RawTokenizer);
let mut token_stream = a.token_stream(text);
let mut tokens: Vec<Token> = vec![];
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
token_stream.process(&mut add_token);
tokens
}
}

View File

@@ -70,3 +70,28 @@ impl<'a> TokenStream for RemoveLongFilterStream<'a> {
self.tail.token_mut()
}
}
#[cfg(test)]
mod tests {
use crate::tokenizer::tests::assert_token;
use crate::tokenizer::{RemoveLongFilter, SimpleTokenizer, TextAnalyzer, Token};
#[test]
fn test_remove_long() {
let tokens = token_stream_helper("hello tantivy, happy searching!");
assert_eq!(tokens.len(), 2);
assert_token(&tokens[0], 0, "hello", 0, 5);
assert_token(&tokens[1], 2, "happy", 15, 20);
}
fn token_stream_helper(text: &str) -> Vec<Token> {
let a = TextAnalyzer::from(SimpleTokenizer).filter(RemoveLongFilter::limit(6));
let mut token_stream = a.token_stream(text);
let mut tokens: Vec<Token> = vec![];
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
token_stream.process(&mut add_token);
tokens
}
}

View File

@@ -57,3 +57,30 @@ impl<'a> TokenStream for SimpleTokenStream<'a> {
&mut self.token
}
}
#[cfg(test)]
mod tests {
use crate::tokenizer::tests::assert_token;
use crate::tokenizer::{SimpleTokenizer, TextAnalyzer, Token};
#[test]
fn test_simple_tokenizer() {
let tokens = token_stream_helper("Hello, happy tax payer!");
assert_eq!(tokens.len(), 4);
assert_token(&tokens[0], 0, "Hello", 0, 5);
assert_token(&tokens[1], 1, "happy", 7, 12);
assert_token(&tokens[2], 2, "tax", 13, 16);
assert_token(&tokens[3], 3, "payer", 17, 22);
}
fn token_stream_helper(text: &str) -> Vec<Token> {
let a = TextAnalyzer::from(SimpleTokenizer);
let mut token_stream = a.token_stream(text);
let mut tokens: Vec<Token> = vec![];
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
token_stream.process(&mut add_token);
tokens
}
}

View File

@@ -93,3 +93,37 @@ impl Default for StopWordFilter {
StopWordFilter::english()
}
}
#[cfg(test)]
mod tests {
use crate::tokenizer::tests::assert_token;
use crate::tokenizer::{SimpleTokenizer, StopWordFilter, TextAnalyzer, Token};
#[test]
fn test_stop_word() {
let tokens = token_stream_helper("i am a cat. as yet i have no name.");
assert_eq!(tokens.len(), 5);
assert_token(&tokens[0], 3, "cat", 7, 10);
assert_token(&tokens[1], 5, "yet", 15, 18);
assert_token(&tokens[2], 7, "have", 21, 25);
assert_token(&tokens[3], 8, "no", 26, 28);
assert_token(&tokens[4], 9, "name", 29, 33);
}
fn token_stream_helper(text: &str) -> Vec<Token> {
let stops = vec![
"a".to_string(),
"as".to_string(),
"am".to_string(),
"i".to_string(),
];
let a = TextAnalyzer::from(SimpleTokenizer).filter(StopWordFilter::remove(stops));
let mut token_stream = a.token_stream(text);
let mut tokens: Vec<Token> = vec![];
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
token_stream.process(&mut add_token);
tokens
}
}

View File

@@ -57,3 +57,30 @@ impl<'a> TokenStream for WhitespaceTokenStream<'a> {
&mut self.token
}
}
#[cfg(test)]
mod tests {
use crate::tokenizer::tests::assert_token;
use crate::tokenizer::{TextAnalyzer, Token, WhitespaceTokenizer};
#[test]
fn test_whitespace_tokenizer() {
let tokens = token_stream_helper("Hello, happy tax payer!");
assert_eq!(tokens.len(), 4);
assert_token(&tokens[0], 0, "Hello,", 0, 6);
assert_token(&tokens[1], 1, "happy", 7, 12);
assert_token(&tokens[2], 2, "tax", 13, 16);
assert_token(&tokens[3], 3, "payer!", 17, 23);
}
fn token_stream_helper(text: &str) -> Vec<Token> {
let a = TextAnalyzer::from(WhitespaceTokenizer);
let mut token_stream = a.token_stream(text);
let mut tokens: Vec<Token> = vec![];
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
token_stream.process(&mut add_token);
tokens
}
}