mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-07 09:32:54 +00:00
Compare commits
11 Commits
columnread
...
fix_estima
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba3215b469 | ||
|
|
dac7da780e | ||
|
|
20c87903b2 | ||
|
|
f9c3947803 | ||
|
|
e9a384bb15 | ||
|
|
d231671fe2 | ||
|
|
fa3d786a2f | ||
|
|
75aafeeb9b | ||
|
|
6f066c7f65 | ||
|
|
22e56aaee3 | ||
|
|
d641979127 |
@@ -259,11 +259,7 @@ impl BitSet {
|
|||||||
// we do not check saturated els.
|
// we do not check saturated els.
|
||||||
let higher = el / 64u32;
|
let higher = el / 64u32;
|
||||||
let lower = el % 64u32;
|
let lower = el % 64u32;
|
||||||
self.len += if self.tinysets[higher as usize].insert_mut(lower) {
|
self.len += u64::from(self.tinysets[higher as usize].insert_mut(lower));
|
||||||
1
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inserts an element in the `BitSet`
|
/// Inserts an element in the `BitSet`
|
||||||
@@ -272,11 +268,7 @@ impl BitSet {
|
|||||||
// we do not check saturated els.
|
// we do not check saturated els.
|
||||||
let higher = el / 64u32;
|
let higher = el / 64u32;
|
||||||
let lower = el % 64u32;
|
let lower = el % 64u32;
|
||||||
self.len -= if self.tinysets[higher as usize].remove_mut(lower) {
|
self.len -= u64::from(self.tinysets[higher as usize].remove_mut(lower));
|
||||||
1
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true iff the elements is in the `BitSet`.
|
/// Returns true iff the elements is in the `BitSet`.
|
||||||
|
|||||||
@@ -161,8 +161,7 @@ impl FixedSize for u8 {
|
|||||||
|
|
||||||
impl BinarySerializable for bool {
|
impl BinarySerializable for bool {
|
||||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||||
let val = if *self { 1 } else { 0 };
|
writer.write_u8(u8::from(*self))
|
||||||
writer.write_u8(val)
|
|
||||||
}
|
}
|
||||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
|
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
|
||||||
let val = reader.read_u8()?;
|
let val = reader.read_u8()?;
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use std::io::{self, Write};
|
|||||||
use ownedbytes::OwnedBytes;
|
use ownedbytes::OwnedBytes;
|
||||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||||
|
|
||||||
|
use crate::column::EstimateColumn;
|
||||||
use crate::serialize::NormalizedHeader;
|
use crate::serialize::NormalizedHeader;
|
||||||
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
||||||
|
|
||||||
@@ -75,7 +76,7 @@ impl FastFieldCodec for BitpackedCodec {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn estimate(column: &impl Column) -> Option<f32> {
|
fn estimate(column: &EstimateColumn) -> Option<f32> {
|
||||||
let num_bits = compute_num_bits(column.max_value());
|
let num_bits = compute_num_bits(column.max_value());
|
||||||
let num_bits_uncompressed = 64;
|
let num_bits_uncompressed = 64;
|
||||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use common::{BinarySerializable, CountingWriter, DeserializeFrom};
|
|||||||
use ownedbytes::OwnedBytes;
|
use ownedbytes::OwnedBytes;
|
||||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||||
|
|
||||||
|
use crate::column::EstimateColumn;
|
||||||
use crate::line::Line;
|
use crate::line::Line;
|
||||||
use crate::serialize::NormalizedHeader;
|
use crate::serialize::NormalizedHeader;
|
||||||
use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn};
|
use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn};
|
||||||
@@ -71,7 +72,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Estimate first_chunk and extrapolate
|
// Estimate first_chunk and extrapolate
|
||||||
fn estimate(column: &impl crate::Column) -> Option<f32> {
|
fn estimate(column: &EstimateColumn) -> Option<f32> {
|
||||||
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
|
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -100,7 +101,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
|||||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize(column: &dyn crate::Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
fn serialize(column: &dyn Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
||||||
// The BitpackedReader assumes a normalized vector.
|
// The BitpackedReader assumes a normalized vector.
|
||||||
assert_eq!(column.min_value(), 0);
|
assert_eq!(column.min_value(), 0);
|
||||||
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
|
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
|
||||||
|
|||||||
@@ -137,6 +137,57 @@ where V: AsRef<[T]> + ?Sized
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Creates a view over a Column with a limited number of vals. Stats like min max are unchanged
|
||||||
|
pub struct EstimateColumn<'a> {
|
||||||
|
column: &'a dyn Column,
|
||||||
|
num_vals: u64,
|
||||||
|
}
|
||||||
|
impl<'a> EstimateColumn<'a> {
|
||||||
|
pub(crate) fn new(column: &'a dyn Column) -> Self {
|
||||||
|
let limit_num_vals = column.num_vals().min(100_000);
|
||||||
|
Self {
|
||||||
|
column,
|
||||||
|
num_vals: limit_num_vals,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Column for EstimateColumn<'a> {
|
||||||
|
fn get_val(&self, idx: u64) -> u64 {
|
||||||
|
(*self.column).get_val(idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn min_value(&self) -> u64 {
|
||||||
|
(*self.column).min_value()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_value(&self) -> u64 {
|
||||||
|
(*self.column).max_value()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn num_vals(&self) -> u64 {
|
||||||
|
self.num_vals
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
|
||||||
|
Box::new((*self.column).iter().take(self.num_vals as usize))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_range(&self, start: u64, output: &mut [u64]) {
|
||||||
|
(*self.column).get_range(start, output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> From<&'a dyn Column> for EstimateColumn<'a> {
|
||||||
|
fn from(column: &'a dyn Column) -> Self {
|
||||||
|
let limit_num_vals = column.num_vals().min(100_000);
|
||||||
|
Self {
|
||||||
|
column,
|
||||||
|
num_vals: limit_num_vals,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct MonotonicMappingColumn<C, T, Input> {
|
struct MonotonicMappingColumn<C, T, Input> {
|
||||||
from_column: C,
|
from_column: C,
|
||||||
monotonic_mapping: T,
|
monotonic_mapping: T,
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ use std::io;
|
|||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use column::EstimateColumn;
|
||||||
use common::BinarySerializable;
|
use common::BinarySerializable;
|
||||||
use compact_space::CompactSpaceDecompressor;
|
use compact_space::CompactSpaceDecompressor;
|
||||||
use ownedbytes::OwnedBytes;
|
use ownedbytes::OwnedBytes;
|
||||||
@@ -123,7 +124,7 @@ trait FastFieldCodec: 'static {
|
|||||||
///
|
///
|
||||||
/// The column iterator should be preferred over using column `get_val` method for
|
/// The column iterator should be preferred over using column `get_val` method for
|
||||||
/// performance reasons.
|
/// performance reasons.
|
||||||
fn serialize(column: &dyn Column<u64>, write: &mut impl Write) -> io::Result<()>;
|
fn serialize(column: &dyn Column, write: &mut impl Write) -> io::Result<()>;
|
||||||
|
|
||||||
/// Returns an estimate of the compression ratio.
|
/// Returns an estimate of the compression ratio.
|
||||||
/// If the codec is not applicable, returns `None`.
|
/// If the codec is not applicable, returns `None`.
|
||||||
@@ -132,7 +133,7 @@ trait FastFieldCodec: 'static {
|
|||||||
///
|
///
|
||||||
/// It could make sense to also return a value representing
|
/// It could make sense to also return a value representing
|
||||||
/// computational complexity.
|
/// computational complexity.
|
||||||
fn estimate(column: &impl Column) -> Option<f32>;
|
fn estimate(column: &EstimateColumn) -> Option<f32>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
|
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
|
||||||
@@ -149,6 +150,7 @@ mod tests {
|
|||||||
|
|
||||||
use crate::bitpacked::BitpackedCodec;
|
use crate::bitpacked::BitpackedCodec;
|
||||||
use crate::blockwise_linear::BlockwiseLinearCodec;
|
use crate::blockwise_linear::BlockwiseLinearCodec;
|
||||||
|
use crate::column::EstimateColumn;
|
||||||
use crate::linear::LinearCodec;
|
use crate::linear::LinearCodec;
|
||||||
use crate::serialize::Header;
|
use crate::serialize::Header;
|
||||||
|
|
||||||
@@ -159,7 +161,9 @@ mod tests {
|
|||||||
let col = &VecColumn::from(data);
|
let col = &VecColumn::from(data);
|
||||||
let header = Header::compute_header(col, &[Codec::CODEC_TYPE])?;
|
let header = Header::compute_header(col, &[Codec::CODEC_TYPE])?;
|
||||||
let normalized_col = header.normalize_column(col);
|
let normalized_col = header.normalize_column(col);
|
||||||
let estimation = Codec::estimate(&normalized_col)?;
|
|
||||||
|
let limited_column = EstimateColumn::new(&normalized_col);
|
||||||
|
let estimation = Codec::estimate(&limited_column)?;
|
||||||
|
|
||||||
let mut out = Vec::new();
|
let mut out = Vec::new();
|
||||||
let col = VecColumn::from(data);
|
let col = VecColumn::from(data);
|
||||||
@@ -280,14 +284,16 @@ mod tests {
|
|||||||
let data = (10..=20000_u64).collect::<Vec<_>>();
|
let data = (10..=20000_u64).collect::<Vec<_>>();
|
||||||
let data: VecColumn = data.as_slice().into();
|
let data: VecColumn = data.as_slice().into();
|
||||||
|
|
||||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
let linear_interpol_estimation =
|
||||||
|
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_le!(linear_interpol_estimation, 0.01);
|
assert_le!(linear_interpol_estimation, 0.01);
|
||||||
|
|
||||||
let multi_linear_interpol_estimation = BlockwiseLinearCodec::estimate(&data).unwrap();
|
let multi_linear_interpol_estimation =
|
||||||
|
BlockwiseLinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_le!(multi_linear_interpol_estimation, 0.2);
|
assert_le!(multi_linear_interpol_estimation, 0.2);
|
||||||
assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation);
|
assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation);
|
||||||
|
|
||||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_lt!(linear_interpol_estimation, bitpacked_estimation);
|
assert_lt!(linear_interpol_estimation, bitpacked_estimation);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
@@ -295,18 +301,20 @@ mod tests {
|
|||||||
let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20];
|
let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20];
|
||||||
|
|
||||||
let data: VecColumn = data.into();
|
let data: VecColumn = data.into();
|
||||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
let linear_interpol_estimation =
|
||||||
|
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_le!(linear_interpol_estimation, 0.34);
|
assert_le!(linear_interpol_estimation, 0.34);
|
||||||
|
|
||||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn estimation_prefer_bitpacked() {
|
fn estimation_prefer_bitpacked() {
|
||||||
let data = VecColumn::from(&[10, 10, 10, 10]);
|
let data = VecColumn::from(&[10, 10, 10, 10]);
|
||||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
let linear_interpol_estimation =
|
||||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
|
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -318,10 +326,11 @@ mod tests {
|
|||||||
|
|
||||||
// in this case the linear interpolation can't in fact not be worse than bitpacking,
|
// in this case the linear interpolation can't in fact not be worse than bitpacking,
|
||||||
// but the estimator adds some threshold, which leads to estimated worse behavior
|
// but the estimator adds some threshold, which leads to estimated worse behavior
|
||||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
let linear_interpol_estimation =
|
||||||
|
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_le!(linear_interpol_estimation, 0.35);
|
assert_le!(linear_interpol_estimation, 0.35);
|
||||||
|
|
||||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||||
assert_le!(bitpacked_estimation, 0.32);
|
assert_le!(bitpacked_estimation, 0.32);
|
||||||
assert_le!(bitpacked_estimation, linear_interpol_estimation);
|
assert_le!(bitpacked_estimation, linear_interpol_estimation);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -67,13 +67,11 @@ impl Line {
|
|||||||
self.intercept.wrapping_add(linear_part)
|
self.intercept.wrapping_add(linear_part)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Same as train, but the intercept is only estimated from provided sample positions
|
|
||||||
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
|
|
||||||
Self::train_from(ys, sample_positions.iter().cloned())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Intercept is only computed from provided positions
|
// Intercept is only computed from provided positions
|
||||||
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self {
|
pub fn train_from(
|
||||||
|
ys: &dyn Column,
|
||||||
|
positions_and_values: impl Iterator<Item = (u64, u64)>,
|
||||||
|
) -> Self {
|
||||||
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
|
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
|
||||||
num_vals
|
num_vals
|
||||||
} else {
|
} else {
|
||||||
@@ -114,11 +112,8 @@ impl Line {
|
|||||||
intercept: 0,
|
intercept: 0,
|
||||||
};
|
};
|
||||||
let heuristic_shift = y0.wrapping_sub(MID_POINT);
|
let heuristic_shift = y0.wrapping_sub(MID_POINT);
|
||||||
line.intercept = positions
|
line.intercept = positions_and_values
|
||||||
.map(|pos| {
|
.map(|(pos, y)| y.wrapping_sub(line.eval(pos)))
|
||||||
let y = ys.get_val(pos);
|
|
||||||
y.wrapping_sub(line.eval(pos))
|
|
||||||
})
|
|
||||||
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))
|
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))
|
||||||
.unwrap_or(0u64); //< Never happens.
|
.unwrap_or(0u64); //< Never happens.
|
||||||
line
|
line
|
||||||
@@ -135,7 +130,10 @@ impl Line {
|
|||||||
/// This function is only invariable by translation if all of the
|
/// This function is only invariable by translation if all of the
|
||||||
/// `ys` are packaged into half of the space. (See heuristic below)
|
/// `ys` are packaged into half of the space. (See heuristic below)
|
||||||
pub fn train(ys: &dyn Column) -> Self {
|
pub fn train(ys: &dyn Column) -> Self {
|
||||||
Self::train_from(ys, 0..ys.num_vals())
|
Self::train_from(
|
||||||
|
ys,
|
||||||
|
ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use common::BinarySerializable;
|
|||||||
use ownedbytes::OwnedBytes;
|
use ownedbytes::OwnedBytes;
|
||||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||||
|
|
||||||
|
use crate::column::EstimateColumn;
|
||||||
use crate::line::Line;
|
use crate::line::Line;
|
||||||
use crate::serialize::NormalizedHeader;
|
use crate::serialize::NormalizedHeader;
|
||||||
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
||||||
@@ -121,23 +122,23 @@ impl FastFieldCodec for LinearCodec {
|
|||||||
/// where the local maxima for the deviation of the calculated value are and
|
/// where the local maxima for the deviation of the calculated value are and
|
||||||
/// the offset to shift all values to >=0 is also unknown.
|
/// the offset to shift all values to >=0 is also unknown.
|
||||||
#[allow(clippy::question_mark)]
|
#[allow(clippy::question_mark)]
|
||||||
fn estimate(column: &impl Column) -> Option<f32> {
|
fn estimate(column: &EstimateColumn) -> Option<f32> {
|
||||||
if column.num_vals() < 3 {
|
if column.num_vals() < 3 {
|
||||||
return None; // disable compressor for this case
|
return None; // disable compressor for this case
|
||||||
}
|
}
|
||||||
|
|
||||||
// let's sample at 0%, 5%, 10% .. 95%, 100%
|
// let's sample at 0%, 5%, 10% .. 95%, 100%
|
||||||
let num_vals = column.num_vals() as f32 / 100.0;
|
let num_vals = column.num_vals() as f32 / 100.0;
|
||||||
let sample_positions = (0..20)
|
let sample_positions_and_values = (0..20)
|
||||||
.map(|pos| (num_vals * pos as f32 * 5.0) as u64)
|
.map(|pos| (num_vals * pos as f32 * 5.0) as u64)
|
||||||
|
.map(|pos| (pos, column.get_val(pos)))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let line = Line::estimate(column, &sample_positions);
|
let line = { Line::train_from(column, sample_positions_and_values.iter().cloned()) };
|
||||||
|
|
||||||
let estimated_bit_width = sample_positions
|
let estimated_bit_width = sample_positions_and_values
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|pos| {
|
.map(|(pos, actual_value)| {
|
||||||
let actual_value = column.get_val(pos);
|
|
||||||
let interpolated_val = line.eval(pos as u64);
|
let interpolated_val = line.eval(pos as u64);
|
||||||
actual_value.wrapping_sub(interpolated_val)
|
actual_value.wrapping_sub(interpolated_val)
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -36,11 +36,7 @@ impl MonotonicallyMappableToU64 for i64 {
|
|||||||
impl MonotonicallyMappableToU64 for bool {
|
impl MonotonicallyMappableToU64 for bool {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn to_u64(self) -> u64 {
|
fn to_u64(self) -> u64 {
|
||||||
if self {
|
u64::from(self)
|
||||||
1
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ use ownedbytes::OwnedBytes;
|
|||||||
|
|
||||||
use crate::bitpacked::BitpackedCodec;
|
use crate::bitpacked::BitpackedCodec;
|
||||||
use crate::blockwise_linear::BlockwiseLinearCodec;
|
use crate::blockwise_linear::BlockwiseLinearCodec;
|
||||||
|
use crate::column::EstimateColumn;
|
||||||
use crate::compact_space::CompactSpaceCompressor;
|
use crate::compact_space::CompactSpaceCompressor;
|
||||||
use crate::linear::LinearCodec;
|
use crate::linear::LinearCodec;
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -125,23 +126,6 @@ impl BinarySerializable for Header {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn estimate<T: MonotonicallyMappableToU64>(
|
|
||||||
typed_column: impl Column<T>,
|
|
||||||
codec_type: FastFieldCodecType,
|
|
||||||
) -> Option<f32> {
|
|
||||||
let column = monotonic_map_column(typed_column, T::to_u64);
|
|
||||||
let min_value = column.min_value();
|
|
||||||
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
|
|
||||||
.filter(|gcd| gcd.get() > 1u64);
|
|
||||||
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
|
|
||||||
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
|
|
||||||
match codec_type {
|
|
||||||
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column),
|
|
||||||
FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column),
|
|
||||||
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&normalized_column),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn serialize_u128(
|
pub fn serialize_u128(
|
||||||
typed_column: impl Column<u128>,
|
typed_column: impl Column<u128>,
|
||||||
output: &mut impl io::Write,
|
output: &mut impl io::Write,
|
||||||
@@ -177,10 +161,29 @@ pub fn serialize<T: MonotonicallyMappableToU64>(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn estimate<T: MonotonicallyMappableToU64>(
|
||||||
|
typed_column: impl Column<T>,
|
||||||
|
codec_type: FastFieldCodecType,
|
||||||
|
) -> Option<f32> {
|
||||||
|
let column = monotonic_map_column(typed_column, T::to_u64);
|
||||||
|
let min_value = column.min_value();
|
||||||
|
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
|
||||||
|
.filter(|gcd| gcd.get() > 1u64);
|
||||||
|
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
|
||||||
|
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
|
||||||
|
let estimate_column = EstimateColumn::new(&normalized_column);
|
||||||
|
match codec_type {
|
||||||
|
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&estimate_column),
|
||||||
|
FastFieldCodecType::Linear => LinearCodec::estimate(&estimate_column),
|
||||||
|
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&estimate_column),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn detect_codec(
|
fn detect_codec(
|
||||||
column: impl Column<u64>,
|
column: impl Column<u64>,
|
||||||
codecs: &[FastFieldCodecType],
|
codecs: &[FastFieldCodecType],
|
||||||
) -> Option<FastFieldCodecType> {
|
) -> Option<FastFieldCodecType> {
|
||||||
|
let column: EstimateColumn = EstimateColumn::new(&column);
|
||||||
let mut estimations = Vec::new();
|
let mut estimations = Vec::new();
|
||||||
for &codec in codecs {
|
for &codec in codecs {
|
||||||
let estimation_opt = match codec {
|
let estimation_opt = match codec {
|
||||||
|
|||||||
@@ -425,7 +425,7 @@ impl SegmentHistogramCollector {
|
|||||||
let bucket = &mut self.buckets[bucket_pos];
|
let bucket = &mut self.buckets[bucket_pos];
|
||||||
bucket.doc_count += 1;
|
bucket.doc_count += 1;
|
||||||
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
|
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
|
||||||
(&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?;
|
sub_aggregation[bucket_pos].collect(doc, bucket_with_accessor)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ impl SegmentId {
|
|||||||
/// Picking the first 8 chars is ok to identify
|
/// Picking the first 8 chars is ok to identify
|
||||||
/// segments in a display message (e.g. a5c4dfcb).
|
/// segments in a display message (e.g. a5c4dfcb).
|
||||||
pub fn short_uuid_string(&self) -> String {
|
pub fn short_uuid_string(&self) -> String {
|
||||||
(&self.0.as_simple().to_string()[..8]).to_string()
|
self.0.as_simple().to_string()[..8].to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a segment uuid string.
|
/// Returns a segment uuid string.
|
||||||
|
|||||||
@@ -472,6 +472,8 @@ mod tests {
|
|||||||
// There are more tests in directory/mod.rs
|
// There are more tests in directory/mod.rs
|
||||||
// The following tests are specific to the MmapDirectory
|
// The following tests are specific to the MmapDirectory
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use common::HasLen;
|
use common::HasLen;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -610,7 +612,14 @@ mod tests {
|
|||||||
mmap_directory.get_cache_info().mmapped.len()
|
mmap_directory.get_cache_info().mmapped.len()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
assert!(mmap_directory.get_cache_info().mmapped.is_empty());
|
// This test failed on CI. The last Mmap is dropped from the merging thread so there might
|
||||||
Ok(())
|
// be a race condition indeed.
|
||||||
|
for _ in 0..10 {
|
||||||
|
if mmap_directory.get_cache_info().mmapped.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
std::thread::sleep(Duration::from_millis(200));
|
||||||
|
}
|
||||||
|
panic!("The cache still contains information. One of the Mmap has not been dropped.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,6 +136,20 @@ impl RamDirectory {
|
|||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deep clones the directory.
|
||||||
|
///
|
||||||
|
/// Ulterior writes on one of the copy
|
||||||
|
/// will not affect the other copy.
|
||||||
|
pub fn deep_clone(&self) -> RamDirectory {
|
||||||
|
let inner_clone = InnerDirectory {
|
||||||
|
fs: self.fs.read().unwrap().fs.clone(),
|
||||||
|
watch_router: Default::default(),
|
||||||
|
};
|
||||||
|
RamDirectory {
|
||||||
|
fs: Arc::new(RwLock::new(inner_clone)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the sum of the size of the different files
|
/// Returns the sum of the size of the different files
|
||||||
/// in the [`RamDirectory`].
|
/// in the [`RamDirectory`].
|
||||||
pub fn total_mem_usage(&self) -> usize {
|
pub fn total_mem_usage(&self) -> usize {
|
||||||
@@ -256,4 +270,23 @@ mod tests {
|
|||||||
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
|
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
|
||||||
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
|
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_ram_directory_deep_clone() {
|
||||||
|
let dir = RamDirectory::default();
|
||||||
|
let test = Path::new("test");
|
||||||
|
let test2 = Path::new("test2");
|
||||||
|
dir.atomic_write(test, b"firstwrite").unwrap();
|
||||||
|
let dir_clone = dir.deep_clone();
|
||||||
|
assert_eq!(
|
||||||
|
dir_clone.atomic_read(test).unwrap(),
|
||||||
|
dir.atomic_read(test).unwrap()
|
||||||
|
);
|
||||||
|
dir.atomic_write(test, b"original").unwrap();
|
||||||
|
dir_clone.atomic_write(test, b"clone").unwrap();
|
||||||
|
dir_clone.atomic_write(test2, b"clone2").unwrap();
|
||||||
|
assert_eq!(dir.atomic_read(test).unwrap(), b"original");
|
||||||
|
assert_eq!(&dir_clone.atomic_read(test).unwrap(), b"clone");
|
||||||
|
assert_eq!(&dir_clone.atomic_read(test2).unwrap(), b"clone2");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -402,6 +402,74 @@ mod bench {
|
|||||||
use crate::schema::{Cardinality, NumericOptions, Schema};
|
use crate::schema::{Cardinality, NumericOptions, Schema};
|
||||||
use crate::Document;
|
use crate::Document;
|
||||||
|
|
||||||
|
fn bench_multi_value_ff_merge_opt(
|
||||||
|
num_docs: usize,
|
||||||
|
segments_every_n_docs: usize,
|
||||||
|
merge_policy: impl crate::indexer::MergePolicy + 'static,
|
||||||
|
) {
|
||||||
|
let mut builder = crate::schema::SchemaBuilder::new();
|
||||||
|
|
||||||
|
let fast_multi =
|
||||||
|
crate::schema::NumericOptions::default().set_fast(Cardinality::MultiValues);
|
||||||
|
let multi_field = builder.add_f64_field("f64s", fast_multi);
|
||||||
|
|
||||||
|
let index = crate::Index::create_in_ram(builder.build());
|
||||||
|
|
||||||
|
let mut writer = index.writer_for_tests().unwrap();
|
||||||
|
writer.set_merge_policy(Box::new(merge_policy));
|
||||||
|
|
||||||
|
for i in 0..num_docs {
|
||||||
|
let mut doc = crate::Document::new();
|
||||||
|
doc.add_f64(multi_field, 0.24);
|
||||||
|
doc.add_f64(multi_field, 0.27);
|
||||||
|
doc.add_f64(multi_field, 0.37);
|
||||||
|
if i % 3 == 0 {
|
||||||
|
doc.add_f64(multi_field, 0.44);
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.add_document(doc).unwrap();
|
||||||
|
if i % segments_every_n_docs == 0 {
|
||||||
|
writer.commit().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
writer.wait_merging_threads().unwrap();
|
||||||
|
let mut writer = index.writer_for_tests().unwrap();
|
||||||
|
let segment_ids = index.searchable_segment_ids().unwrap();
|
||||||
|
writer.merge(&segment_ids).wait().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a merging thread fails, we should end up with more
|
||||||
|
// than one segment here
|
||||||
|
assert_eq!(1, index.searchable_segments().unwrap().len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_multi_value_ff_merge_many_segments(b: &mut Bencher) {
|
||||||
|
let num_docs = 100_000;
|
||||||
|
b.iter(|| {
|
||||||
|
bench_multi_value_ff_merge_opt(num_docs, 1_000, crate::indexer::NoMergePolicy);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_multi_value_ff_merge_many_segments_log_merge(b: &mut Bencher) {
|
||||||
|
let num_docs = 100_000;
|
||||||
|
b.iter(|| {
|
||||||
|
let merge_policy = crate::indexer::LogMergePolicy::default();
|
||||||
|
bench_multi_value_ff_merge_opt(num_docs, 1_000, merge_policy);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_multi_value_ff_merge_few_segments(b: &mut Bencher) {
|
||||||
|
let num_docs = 100_000;
|
||||||
|
b.iter(|| {
|
||||||
|
bench_multi_value_ff_merge_opt(num_docs, 33_000, crate::indexer::NoMergePolicy);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> {
|
fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> {
|
||||||
let mut vals = vec![];
|
let mut vals = vec![];
|
||||||
for _i in 0..num_docs {
|
for _i in 0..num_docs {
|
||||||
|
|||||||
@@ -246,18 +246,27 @@ impl DeleteCursor {
|
|||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
use super::{DeleteOperation, DeleteQueue};
|
use super::{DeleteOperation, DeleteQueue};
|
||||||
use crate::schema::{Field, Term};
|
use crate::query::{Explanation, Scorer, Weight};
|
||||||
|
use crate::{DocId, Score, SegmentReader};
|
||||||
|
|
||||||
|
struct DummyWeight;
|
||||||
|
impl Weight for DummyWeight {
|
||||||
|
fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||||
|
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
|
||||||
|
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_deletequeue() {
|
fn test_deletequeue() {
|
||||||
let delete_queue = DeleteQueue::new();
|
let delete_queue = DeleteQueue::new();
|
||||||
|
|
||||||
let make_op = |i: usize| {
|
let make_op = |i: usize| DeleteOperation {
|
||||||
let field = Field::from_field_id(1u32);
|
opstamp: i as u64,
|
||||||
DeleteOperation {
|
target: Box::new(DummyWeight),
|
||||||
opstamp: i as u64,
|
|
||||||
term: Term::from_field_u64(field, i as u64),
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
delete_queue.push(make_op(1));
|
delete_queue.push(make_op(1));
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ use super::segment_updater::SegmentUpdater;
|
|||||||
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
|
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
|
||||||
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
|
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
|
||||||
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
|
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
|
||||||
use crate::docset::{DocSet, TERMINATED};
|
|
||||||
use crate::error::TantivyError;
|
use crate::error::TantivyError;
|
||||||
use crate::fastfield::write_alive_bitset;
|
use crate::fastfield::write_alive_bitset;
|
||||||
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||||
@@ -20,8 +19,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus;
|
|||||||
use crate::indexer::operation::DeleteOperation;
|
use crate::indexer::operation::DeleteOperation;
|
||||||
use crate::indexer::stamper::Stamper;
|
use crate::indexer::stamper::Stamper;
|
||||||
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
|
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
|
||||||
|
use crate::query::{Query, TermQuery};
|
||||||
use crate::schema::{Document, IndexRecordOption, Term};
|
use crate::schema::{Document, IndexRecordOption, Term};
|
||||||
use crate::{FutureResult, Opstamp};
|
use crate::{FutureResult, IndexReader, Opstamp};
|
||||||
|
|
||||||
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
|
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
|
||||||
// in the `memory_arena` goes below MARGIN_IN_BYTES.
|
// in the `memory_arena` goes below MARGIN_IN_BYTES.
|
||||||
@@ -57,6 +57,7 @@ pub struct IndexWriter {
|
|||||||
_directory_lock: Option<DirectoryLock>,
|
_directory_lock: Option<DirectoryLock>,
|
||||||
|
|
||||||
index: Index,
|
index: Index,
|
||||||
|
index_reader: IndexReader,
|
||||||
|
|
||||||
memory_arena_in_bytes_per_thread: usize,
|
memory_arena_in_bytes_per_thread: usize,
|
||||||
|
|
||||||
@@ -92,19 +93,14 @@ fn compute_deleted_bitset(
|
|||||||
|
|
||||||
// A delete operation should only affect
|
// A delete operation should only affect
|
||||||
// document that were inserted before it.
|
// document that were inserted before it.
|
||||||
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
|
delete_op
|
||||||
if let Some(mut docset) =
|
.target
|
||||||
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
|
.for_each(segment_reader, &mut |doc_matching_delete_query, _| {
|
||||||
{
|
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
|
||||||
let mut doc_matching_deleted_term = docset.doc();
|
alive_bitset.remove(doc_matching_delete_query);
|
||||||
while doc_matching_deleted_term != TERMINATED {
|
|
||||||
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
|
|
||||||
alive_bitset.remove(doc_matching_deleted_term);
|
|
||||||
might_have_changed = true;
|
might_have_changed = true;
|
||||||
}
|
}
|
||||||
doc_matching_deleted_term = docset.advance();
|
})?;
|
||||||
}
|
|
||||||
}
|
|
||||||
delete_cursor.advance();
|
delete_cursor.advance();
|
||||||
}
|
}
|
||||||
Ok(might_have_changed)
|
Ok(might_have_changed)
|
||||||
@@ -302,6 +298,7 @@ impl IndexWriter {
|
|||||||
|
|
||||||
memory_arena_in_bytes_per_thread,
|
memory_arena_in_bytes_per_thread,
|
||||||
index: index.clone(),
|
index: index.clone(),
|
||||||
|
index_reader: index.reader()?,
|
||||||
|
|
||||||
index_writer_status: IndexWriterStatus::from(document_receiver),
|
index_writer_status: IndexWriterStatus::from(document_receiver),
|
||||||
operation_sender: document_sender,
|
operation_sender: document_sender,
|
||||||
@@ -666,10 +663,33 @@ impl IndexWriter {
|
|||||||
/// Like adds, the deletion itself will be visible
|
/// Like adds, the deletion itself will be visible
|
||||||
/// only after calling `commit()`.
|
/// only after calling `commit()`.
|
||||||
pub fn delete_term(&self, term: Term) -> Opstamp {
|
pub fn delete_term(&self, term: Term) -> Opstamp {
|
||||||
|
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||||
|
// For backward compatibility, if Term is invalid for the index, do nothing but return an
|
||||||
|
// Opstamp
|
||||||
|
self.delete_query(Box::new(query))
|
||||||
|
.unwrap_or_else(|_| self.stamper.stamp())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete all documents matching a given query.
|
||||||
|
/// Returns an `Err` if the query can't be executed.
|
||||||
|
///
|
||||||
|
/// Delete operation only affects documents that
|
||||||
|
/// were added in previous commits, and documents
|
||||||
|
/// that were added previously in the same commit.
|
||||||
|
///
|
||||||
|
/// Like adds, the deletion itself will be visible
|
||||||
|
/// only after calling `commit()`.
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
|
||||||
|
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||||
|
|
||||||
let opstamp = self.stamper.stamp();
|
let opstamp = self.stamper.stamp();
|
||||||
let delete_operation = DeleteOperation { opstamp, term };
|
let delete_operation = DeleteOperation {
|
||||||
|
opstamp,
|
||||||
|
target: weight,
|
||||||
|
};
|
||||||
self.delete_queue.push(delete_operation);
|
self.delete_queue.push(delete_operation);
|
||||||
opstamp
|
Ok(opstamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the opstamp of the last successful commit.
|
/// Returns the opstamp of the last successful commit.
|
||||||
@@ -738,10 +758,17 @@ impl IndexWriter {
|
|||||||
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
|
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
|
||||||
|
|
||||||
let mut adds = AddBatch::default();
|
let mut adds = AddBatch::default();
|
||||||
|
|
||||||
for (user_op, opstamp) in user_operations_it.zip(stamps) {
|
for (user_op, opstamp) in user_operations_it.zip(stamps) {
|
||||||
match user_op {
|
match user_op {
|
||||||
UserOperation::Delete(term) => {
|
UserOperation::Delete(term) => {
|
||||||
let delete_operation = DeleteOperation { opstamp, term };
|
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||||
|
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||||
|
|
||||||
|
let delete_operation = DeleteOperation {
|
||||||
|
opstamp,
|
||||||
|
target: weight,
|
||||||
|
};
|
||||||
self.delete_queue.push(delete_operation);
|
self.delete_queue.push(delete_operation);
|
||||||
}
|
}
|
||||||
UserOperation::Add(document) => {
|
UserOperation::Add(document) => {
|
||||||
@@ -786,7 +813,7 @@ mod tests {
|
|||||||
use crate::directory::error::LockError;
|
use crate::directory::error::LockError;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::indexer::NoMergePolicy;
|
use crate::indexer::NoMergePolicy;
|
||||||
use crate::query::{QueryParser, TermQuery};
|
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
|
||||||
use crate::schema::{
|
use crate::schema::{
|
||||||
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
|
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
|
||||||
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
|
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
|
||||||
@@ -1418,10 +1445,72 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
|
||||||
|
let mut schema_builder = schema::Schema::builder();
|
||||||
|
let id_field =
|
||||||
|
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
|
||||||
|
let settings = IndexSettings {
|
||||||
|
sort_by_field: Some(IndexSortByField {
|
||||||
|
field: "id".to_string(),
|
||||||
|
order: Order::Desc,
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let index = Index::builder()
|
||||||
|
.schema(schema)
|
||||||
|
.settings(settings)
|
||||||
|
.create_in_ram()?;
|
||||||
|
let index_reader = index.reader()?;
|
||||||
|
let mut index_writer = index.writer_for_tests()?;
|
||||||
|
|
||||||
|
// create and delete docs in same commit
|
||||||
|
for id in 0u64..5u64 {
|
||||||
|
index_writer.add_document(doc!(id_field => id))?;
|
||||||
|
}
|
||||||
|
for id in 1u64..4u64 {
|
||||||
|
let term = Term::from_field_u64(id_field, id);
|
||||||
|
let not_term = Term::from_field_u64(id_field, 2);
|
||||||
|
let term = Box::new(TermQuery::new(term, Default::default()));
|
||||||
|
let not_term = Box::new(TermQuery::new(not_term, Default::default()));
|
||||||
|
|
||||||
|
let query: BooleanQuery = vec![
|
||||||
|
(Occur::Must, term as Box<dyn Query>),
|
||||||
|
(Occur::MustNot, not_term as Box<dyn Query>),
|
||||||
|
]
|
||||||
|
.into();
|
||||||
|
|
||||||
|
index_writer.delete_query(Box::new(query))?;
|
||||||
|
}
|
||||||
|
for id in 5u64..10u64 {
|
||||||
|
index_writer.add_document(doc!(id_field => id))?;
|
||||||
|
}
|
||||||
|
index_writer.commit()?;
|
||||||
|
index_reader.reload()?;
|
||||||
|
|
||||||
|
let searcher = index_reader.searcher();
|
||||||
|
assert_eq!(searcher.segment_readers().len(), 1);
|
||||||
|
|
||||||
|
let segment_reader = searcher.segment_reader(0);
|
||||||
|
assert_eq!(segment_reader.num_docs(), 8);
|
||||||
|
assert_eq!(segment_reader.max_doc(), 10);
|
||||||
|
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
|
||||||
|
let in_order_alive_ids: Vec<u64> = segment_reader
|
||||||
|
.doc_ids_alive()
|
||||||
|
.map(|doc| fast_field_reader.get_val(doc as u64))
|
||||||
|
.collect();
|
||||||
|
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
enum IndexingOp {
|
enum IndexingOp {
|
||||||
AddDoc { id: u64 },
|
AddDoc { id: u64 },
|
||||||
DeleteDoc { id: u64 },
|
DeleteDoc { id: u64 },
|
||||||
|
DeleteDocQuery { id: u64 },
|
||||||
Commit,
|
Commit,
|
||||||
Merge,
|
Merge,
|
||||||
}
|
}
|
||||||
@@ -1429,6 +1518,7 @@ mod tests {
|
|||||||
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||||
prop_oneof![
|
prop_oneof![
|
||||||
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||||
|
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||||
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||||
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||||
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||||
@@ -1437,7 +1527,8 @@ mod tests {
|
|||||||
|
|
||||||
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||||
prop_oneof![
|
prop_oneof![
|
||||||
10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||||
|
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||||
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||||
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||||
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||||
@@ -1457,6 +1548,10 @@ mod tests {
|
|||||||
existing_ids.remove(&id);
|
existing_ids.remove(&id);
|
||||||
deleted_ids.insert(id);
|
deleted_ids.insert(id);
|
||||||
}
|
}
|
||||||
|
IndexingOp::DeleteDocQuery { id } => {
|
||||||
|
existing_ids.remove(&id);
|
||||||
|
deleted_ids.insert(id);
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1539,6 +1634,11 @@ mod tests {
|
|||||||
IndexingOp::DeleteDoc { id } => {
|
IndexingOp::DeleteDoc { id } => {
|
||||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||||
}
|
}
|
||||||
|
IndexingOp::DeleteDocQuery { id } => {
|
||||||
|
let term = Term::from_field_u64(id_field, id);
|
||||||
|
let query = TermQuery::new(term, Default::default());
|
||||||
|
index_writer.delete_query(Box::new(query))?;
|
||||||
|
}
|
||||||
IndexingOp::Commit => {
|
IndexingOp::Commit => {
|
||||||
index_writer.commit()?;
|
index_writer.commit()?;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,20 +1,11 @@
|
|||||||
|
use crate::query::Weight;
|
||||||
use crate::schema::{Document, Term};
|
use crate::schema::{Document, Term};
|
||||||
use crate::Opstamp;
|
use crate::Opstamp;
|
||||||
|
|
||||||
/// Timestamped Delete operation.
|
/// Timestamped Delete operation.
|
||||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
|
||||||
pub struct DeleteOperation {
|
pub struct DeleteOperation {
|
||||||
pub opstamp: Opstamp,
|
pub opstamp: Opstamp,
|
||||||
pub term: Term,
|
pub target: Box<dyn Weight>,
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for DeleteOperation {
|
|
||||||
fn default() -> Self {
|
|
||||||
DeleteOperation {
|
|
||||||
opstamp: 0u64,
|
|
||||||
term: Term::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Timestamped Add operation.
|
/// Timestamped Add operation.
|
||||||
|
|||||||
Reference in New Issue
Block a user