mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-05 08:42:54 +00:00
Compare commits
12 Commits
column-rea
...
debug_time
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dbd3aed24a | ||
|
|
7707b8a6e1 | ||
|
|
dac7da780e | ||
|
|
20c87903b2 | ||
|
|
f9c3947803 | ||
|
|
e9a384bb15 | ||
|
|
d231671fe2 | ||
|
|
fa3d786a2f | ||
|
|
75aafeeb9b | ||
|
|
6f066c7f65 | ||
|
|
22e56aaee3 | ||
|
|
d641979127 |
@@ -259,11 +259,7 @@ impl BitSet {
|
|||||||
// we do not check saturated els.
|
// we do not check saturated els.
|
||||||
let higher = el / 64u32;
|
let higher = el / 64u32;
|
||||||
let lower = el % 64u32;
|
let lower = el % 64u32;
|
||||||
self.len += if self.tinysets[higher as usize].insert_mut(lower) {
|
self.len += u64::from(self.tinysets[higher as usize].insert_mut(lower));
|
||||||
1
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inserts an element in the `BitSet`
|
/// Inserts an element in the `BitSet`
|
||||||
@@ -272,11 +268,7 @@ impl BitSet {
|
|||||||
// we do not check saturated els.
|
// we do not check saturated els.
|
||||||
let higher = el / 64u32;
|
let higher = el / 64u32;
|
||||||
let lower = el % 64u32;
|
let lower = el % 64u32;
|
||||||
self.len -= if self.tinysets[higher as usize].remove_mut(lower) {
|
self.len -= u64::from(self.tinysets[higher as usize].remove_mut(lower));
|
||||||
1
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true iff the elements is in the `BitSet`.
|
/// Returns true iff the elements is in the `BitSet`.
|
||||||
|
|||||||
@@ -161,8 +161,7 @@ impl FixedSize for u8 {
|
|||||||
|
|
||||||
impl BinarySerializable for bool {
|
impl BinarySerializable for bool {
|
||||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||||
let val = if *self { 1 } else { 0 };
|
writer.write_u8(u8::from(*self))
|
||||||
writer.write_u8(val)
|
|
||||||
}
|
}
|
||||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
|
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
|
||||||
let val = reader.read_u8()?;
|
let val = reader.read_u8()?;
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ rand = {version="0.8.3", optional= true}
|
|||||||
fastdivide = "0.4"
|
fastdivide = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
itertools = { version = "0.10.3" }
|
itertools = { version = "0.10.3" }
|
||||||
measure_time = { version="0.8.2", optional=true}
|
measure_time = { version="0.8.2" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
more-asserts = "0.3.0"
|
more-asserts = "0.3.0"
|
||||||
@@ -25,7 +25,7 @@ proptest = "1.0.0"
|
|||||||
rand = "0.8.3"
|
rand = "0.8.3"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
bin = ["prettytable-rs", "rand", "measure_time"]
|
bin = ["prettytable-rs", "rand"]
|
||||||
default = ["bin"]
|
default = ["bin"]
|
||||||
unstable = []
|
unstable = []
|
||||||
|
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ impl FastFieldCodec for BitpackedCodec {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn estimate(column: &impl Column) -> Option<f32> {
|
fn estimate(column: &dyn Column) -> Option<f32> {
|
||||||
let num_bits = compute_num_bits(column.max_value());
|
let num_bits = compute_num_bits(column.max_value());
|
||||||
let num_bits_uncompressed = 64;
|
let num_bits_uncompressed = 64;
|
||||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Estimate first_chunk and extrapolate
|
// Estimate first_chunk and extrapolate
|
||||||
fn estimate(column: &impl crate::Column) -> Option<f32> {
|
fn estimate(column: &dyn crate::Column) -> Option<f32> {
|
||||||
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
|
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -100,7 +100,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
|||||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize(column: &dyn crate::Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
fn serialize(column: &dyn Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
||||||
// The BitpackedReader assumes a normalized vector.
|
// The BitpackedReader assumes a normalized vector.
|
||||||
assert_eq!(column.min_value(), 0);
|
assert_eq!(column.min_value(), 0);
|
||||||
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
|
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
|
||||||
|
|||||||
@@ -123,7 +123,7 @@ trait FastFieldCodec: 'static {
|
|||||||
///
|
///
|
||||||
/// The column iterator should be preferred over using column `get_val` method for
|
/// The column iterator should be preferred over using column `get_val` method for
|
||||||
/// performance reasons.
|
/// performance reasons.
|
||||||
fn serialize(column: &dyn Column<u64>, write: &mut impl Write) -> io::Result<()>;
|
fn serialize(column: &dyn Column, write: &mut impl Write) -> io::Result<()>;
|
||||||
|
|
||||||
/// Returns an estimate of the compression ratio.
|
/// Returns an estimate of the compression ratio.
|
||||||
/// If the codec is not applicable, returns `None`.
|
/// If the codec is not applicable, returns `None`.
|
||||||
@@ -132,7 +132,7 @@ trait FastFieldCodec: 'static {
|
|||||||
///
|
///
|
||||||
/// It could make sense to also return a value representing
|
/// It could make sense to also return a value representing
|
||||||
/// computational complexity.
|
/// computational complexity.
|
||||||
fn estimate(column: &impl Column) -> Option<f32>;
|
fn estimate(column: &dyn Column) -> Option<f32>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
|
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
|
||||||
|
|||||||
@@ -69,11 +69,17 @@ impl Line {
|
|||||||
|
|
||||||
// Same as train, but the intercept is only estimated from provided sample positions
|
// Same as train, but the intercept is only estimated from provided sample positions
|
||||||
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
|
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
|
||||||
Self::train_from(ys, sample_positions.iter().cloned())
|
Self::train_from(
|
||||||
|
ys,
|
||||||
|
sample_positions
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(|pos| (pos, ys.get_val(pos))),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Intercept is only computed from provided positions
|
// Intercept is only computed from provided positions
|
||||||
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self {
|
fn train_from(ys: &dyn Column, positions_and_values: impl Iterator<Item = (u64, u64)>) -> Self {
|
||||||
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
|
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
|
||||||
num_vals
|
num_vals
|
||||||
} else {
|
} else {
|
||||||
@@ -114,11 +120,8 @@ impl Line {
|
|||||||
intercept: 0,
|
intercept: 0,
|
||||||
};
|
};
|
||||||
let heuristic_shift = y0.wrapping_sub(MID_POINT);
|
let heuristic_shift = y0.wrapping_sub(MID_POINT);
|
||||||
line.intercept = positions
|
line.intercept = positions_and_values
|
||||||
.map(|pos| {
|
.map(|(pos, y)| y.wrapping_sub(line.eval(pos)))
|
||||||
let y = ys.get_val(pos);
|
|
||||||
y.wrapping_sub(line.eval(pos))
|
|
||||||
})
|
|
||||||
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))
|
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))
|
||||||
.unwrap_or(0u64); //< Never happens.
|
.unwrap_or(0u64); //< Never happens.
|
||||||
line
|
line
|
||||||
@@ -135,7 +138,10 @@ impl Line {
|
|||||||
/// This function is only invariable by translation if all of the
|
/// This function is only invariable by translation if all of the
|
||||||
/// `ys` are packaged into half of the space. (See heuristic below)
|
/// `ys` are packaged into half of the space. (See heuristic below)
|
||||||
pub fn train(ys: &dyn Column) -> Self {
|
pub fn train(ys: &dyn Column) -> Self {
|
||||||
Self::train_from(ys, 0..ys.num_vals())
|
Self::train_from(
|
||||||
|
ys,
|
||||||
|
ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ impl FastFieldCodec for LinearCodec {
|
|||||||
/// where the local maxima for the deviation of the calculated value are and
|
/// where the local maxima for the deviation of the calculated value are and
|
||||||
/// the offset to shift all values to >=0 is also unknown.
|
/// the offset to shift all values to >=0 is also unknown.
|
||||||
#[allow(clippy::question_mark)]
|
#[allow(clippy::question_mark)]
|
||||||
fn estimate(column: &impl Column) -> Option<f32> {
|
fn estimate(column: &dyn Column) -> Option<f32> {
|
||||||
if column.num_vals() < 3 {
|
if column.num_vals() < 3 {
|
||||||
return None; // disable compressor for this case
|
return None; // disable compressor for this case
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,11 +36,7 @@ impl MonotonicallyMappableToU64 for i64 {
|
|||||||
impl MonotonicallyMappableToU64 for bool {
|
impl MonotonicallyMappableToU64 for bool {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn to_u64(self) -> u64 {
|
fn to_u64(self) -> u64 {
|
||||||
if self {
|
u64::from(self)
|
||||||
1
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
|
|||||||
@@ -23,7 +23,8 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use common::{BinarySerializable, VInt};
|
use common::{BinarySerializable, VInt};
|
||||||
use fastdivide::DividerU64;
|
use fastdivide::DividerU64;
|
||||||
use log::warn;
|
use log::{trace, warn};
|
||||||
|
use measure_time::trace_time;
|
||||||
use ownedbytes::OwnedBytes;
|
use ownedbytes::OwnedBytes;
|
||||||
|
|
||||||
use crate::bitpacked::BitpackedCodec;
|
use crate::bitpacked::BitpackedCodec;
|
||||||
@@ -183,6 +184,7 @@ fn detect_codec(
|
|||||||
) -> Option<FastFieldCodecType> {
|
) -> Option<FastFieldCodecType> {
|
||||||
let mut estimations = Vec::new();
|
let mut estimations = Vec::new();
|
||||||
for &codec in codecs {
|
for &codec in codecs {
|
||||||
|
trace_time!("estimate time for codec: {:?}", codec);
|
||||||
let estimation_opt = match codec {
|
let estimation_opt = match codec {
|
||||||
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&column),
|
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&column),
|
||||||
FastFieldCodecType::Linear => LinearCodec::estimate(&column),
|
FastFieldCodecType::Linear => LinearCodec::estimate(&column),
|
||||||
@@ -202,6 +204,7 @@ fn detect_codec(
|
|||||||
// codecs
|
// codecs
|
||||||
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
|
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
|
||||||
estimations.sort_by(|(score_left, _), (score_right, _)| score_left.total_cmp(score_right));
|
estimations.sort_by(|(score_left, _), (score_right, _)| score_left.total_cmp(score_right));
|
||||||
|
trace!("Chosen Codec {:?}", estimations.first()?.1);
|
||||||
Some(estimations.first()?.1)
|
Some(estimations.first()?.1)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -210,6 +213,12 @@ fn serialize_given_codec(
|
|||||||
codec_type: FastFieldCodecType,
|
codec_type: FastFieldCodecType,
|
||||||
output: &mut impl io::Write,
|
output: &mut impl io::Write,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
|
trace_time!(
|
||||||
|
"Serialize time for codec: {:?}, num_vals {}",
|
||||||
|
codec_type,
|
||||||
|
column.num_vals()
|
||||||
|
);
|
||||||
|
|
||||||
match codec_type {
|
match codec_type {
|
||||||
FastFieldCodecType::Bitpacked => {
|
FastFieldCodecType::Bitpacked => {
|
||||||
BitpackedCodec::serialize(&column, output)?;
|
BitpackedCodec::serialize(&column, output)?;
|
||||||
|
|||||||
@@ -425,7 +425,7 @@ impl SegmentHistogramCollector {
|
|||||||
let bucket = &mut self.buckets[bucket_pos];
|
let bucket = &mut self.buckets[bucket_pos];
|
||||||
bucket.doc_count += 1;
|
bucket.doc_count += 1;
|
||||||
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
|
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
|
||||||
(&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?;
|
sub_aggregation[bucket_pos].collect(doc, bucket_with_accessor)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ impl SegmentId {
|
|||||||
/// Picking the first 8 chars is ok to identify
|
/// Picking the first 8 chars is ok to identify
|
||||||
/// segments in a display message (e.g. a5c4dfcb).
|
/// segments in a display message (e.g. a5c4dfcb).
|
||||||
pub fn short_uuid_string(&self) -> String {
|
pub fn short_uuid_string(&self) -> String {
|
||||||
(&self.0.as_simple().to_string()[..8]).to_string()
|
self.0.as_simple().to_string()[..8].to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a segment uuid string.
|
/// Returns a segment uuid string.
|
||||||
|
|||||||
@@ -472,6 +472,8 @@ mod tests {
|
|||||||
// There are more tests in directory/mod.rs
|
// There are more tests in directory/mod.rs
|
||||||
// The following tests are specific to the MmapDirectory
|
// The following tests are specific to the MmapDirectory
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use common::HasLen;
|
use common::HasLen;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -610,7 +612,14 @@ mod tests {
|
|||||||
mmap_directory.get_cache_info().mmapped.len()
|
mmap_directory.get_cache_info().mmapped.len()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
assert!(mmap_directory.get_cache_info().mmapped.is_empty());
|
// This test failed on CI. The last Mmap is dropped from the merging thread so there might
|
||||||
Ok(())
|
// be a race condition indeed.
|
||||||
|
for _ in 0..10 {
|
||||||
|
if mmap_directory.get_cache_info().mmapped.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
std::thread::sleep(Duration::from_millis(200));
|
||||||
|
}
|
||||||
|
panic!("The cache still contains information. One of the Mmap has not been dropped.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,6 +136,20 @@ impl RamDirectory {
|
|||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deep clones the directory.
|
||||||
|
///
|
||||||
|
/// Ulterior writes on one of the copy
|
||||||
|
/// will not affect the other copy.
|
||||||
|
pub fn deep_clone(&self) -> RamDirectory {
|
||||||
|
let inner_clone = InnerDirectory {
|
||||||
|
fs: self.fs.read().unwrap().fs.clone(),
|
||||||
|
watch_router: Default::default(),
|
||||||
|
};
|
||||||
|
RamDirectory {
|
||||||
|
fs: Arc::new(RwLock::new(inner_clone)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the sum of the size of the different files
|
/// Returns the sum of the size of the different files
|
||||||
/// in the [`RamDirectory`].
|
/// in the [`RamDirectory`].
|
||||||
pub fn total_mem_usage(&self) -> usize {
|
pub fn total_mem_usage(&self) -> usize {
|
||||||
@@ -256,4 +270,23 @@ mod tests {
|
|||||||
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
|
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
|
||||||
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
|
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_ram_directory_deep_clone() {
|
||||||
|
let dir = RamDirectory::default();
|
||||||
|
let test = Path::new("test");
|
||||||
|
let test2 = Path::new("test2");
|
||||||
|
dir.atomic_write(test, b"firstwrite").unwrap();
|
||||||
|
let dir_clone = dir.deep_clone();
|
||||||
|
assert_eq!(
|
||||||
|
dir_clone.atomic_read(test).unwrap(),
|
||||||
|
dir.atomic_read(test).unwrap()
|
||||||
|
);
|
||||||
|
dir.atomic_write(test, b"original").unwrap();
|
||||||
|
dir_clone.atomic_write(test, b"clone").unwrap();
|
||||||
|
dir_clone.atomic_write(test2, b"clone2").unwrap();
|
||||||
|
assert_eq!(dir.atomic_read(test).unwrap(), b"original");
|
||||||
|
assert_eq!(&dir_clone.atomic_read(test).unwrap(), b"clone");
|
||||||
|
assert_eq!(&dir_clone.atomic_read(test2).unwrap(), b"clone2");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -402,6 +402,74 @@ mod bench {
|
|||||||
use crate::schema::{Cardinality, NumericOptions, Schema};
|
use crate::schema::{Cardinality, NumericOptions, Schema};
|
||||||
use crate::Document;
|
use crate::Document;
|
||||||
|
|
||||||
|
fn bench_multi_value_ff_merge_opt(
|
||||||
|
num_docs: usize,
|
||||||
|
segments_every_n_docs: usize,
|
||||||
|
merge_policy: impl crate::indexer::MergePolicy + 'static,
|
||||||
|
) {
|
||||||
|
let mut builder = crate::schema::SchemaBuilder::new();
|
||||||
|
|
||||||
|
let fast_multi =
|
||||||
|
crate::schema::NumericOptions::default().set_fast(Cardinality::MultiValues);
|
||||||
|
let multi_field = builder.add_f64_field("f64s", fast_multi);
|
||||||
|
|
||||||
|
let index = crate::Index::create_in_ram(builder.build());
|
||||||
|
|
||||||
|
let mut writer = index.writer_for_tests().unwrap();
|
||||||
|
writer.set_merge_policy(Box::new(merge_policy));
|
||||||
|
|
||||||
|
for i in 0..num_docs {
|
||||||
|
let mut doc = crate::Document::new();
|
||||||
|
doc.add_f64(multi_field, 0.24);
|
||||||
|
doc.add_f64(multi_field, 0.27);
|
||||||
|
doc.add_f64(multi_field, 0.37);
|
||||||
|
if i % 3 == 0 {
|
||||||
|
doc.add_f64(multi_field, 0.44);
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.add_document(doc).unwrap();
|
||||||
|
if i % segments_every_n_docs == 0 {
|
||||||
|
writer.commit().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
writer.wait_merging_threads().unwrap();
|
||||||
|
let mut writer = index.writer_for_tests().unwrap();
|
||||||
|
let segment_ids = index.searchable_segment_ids().unwrap();
|
||||||
|
writer.merge(&segment_ids).wait().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a merging thread fails, we should end up with more
|
||||||
|
// than one segment here
|
||||||
|
assert_eq!(1, index.searchable_segments().unwrap().len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_multi_value_ff_merge_many_segments(b: &mut Bencher) {
|
||||||
|
let num_docs = 100_000;
|
||||||
|
b.iter(|| {
|
||||||
|
bench_multi_value_ff_merge_opt(num_docs, 1_000, crate::indexer::NoMergePolicy);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_multi_value_ff_merge_many_segments_log_merge(b: &mut Bencher) {
|
||||||
|
let num_docs = 100_000;
|
||||||
|
b.iter(|| {
|
||||||
|
let merge_policy = crate::indexer::LogMergePolicy::default();
|
||||||
|
bench_multi_value_ff_merge_opt(num_docs, 1_000, merge_policy);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_multi_value_ff_merge_few_segments(b: &mut Bencher) {
|
||||||
|
let num_docs = 100_000;
|
||||||
|
b.iter(|| {
|
||||||
|
bench_multi_value_ff_merge_opt(num_docs, 33_000, crate::indexer::NoMergePolicy);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> {
|
fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> {
|
||||||
let mut vals = vec![];
|
let mut vals = vec![];
|
||||||
for _i in 0..num_docs {
|
for _i in 0..num_docs {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use std::sync::Mutex;
|
|||||||
|
|
||||||
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
|
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
|
use measure_time::{debug_time, trace_time};
|
||||||
|
|
||||||
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
|
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
|
||||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||||
@@ -146,6 +147,13 @@ impl MultiValuedFastFieldWriter {
|
|||||||
{
|
{
|
||||||
self.doc_index.push(self.vals.len() as u64);
|
self.doc_index.push(self.vals.len() as u64);
|
||||||
let col = VecColumn::from(&self.doc_index[..]);
|
let col = VecColumn::from(&self.doc_index[..]);
|
||||||
|
|
||||||
|
trace_time!(
|
||||||
|
"segment-serialize-multi-fast-field-idx, num_vals {}, field_id {:?}",
|
||||||
|
col.num_vals(),
|
||||||
|
self.field()
|
||||||
|
);
|
||||||
|
|
||||||
if let Some(doc_id_map) = doc_id_map {
|
if let Some(doc_id_map) = doc_id_map {
|
||||||
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
|
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
|
||||||
serializer.create_auto_detect_u64_fast_field_with_idx(
|
serializer.create_auto_detect_u64_fast_field_with_idx(
|
||||||
@@ -158,6 +166,12 @@ impl MultiValuedFastFieldWriter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
trace_time!(
|
||||||
|
"segment-serialize-multi-fast-field-values, num_vals {}, field_id {:?}",
|
||||||
|
self.vals.len(),
|
||||||
|
self.field()
|
||||||
|
);
|
||||||
|
|
||||||
// Writing the values themselves.
|
// Writing the values themselves.
|
||||||
// TODO FIXME: Use less memory.
|
// TODO FIXME: Use less memory.
|
||||||
let mut values: Vec<u64> = Vec::new();
|
let mut values: Vec<u64> = Vec::new();
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use std::io;
|
|||||||
use common;
|
use common;
|
||||||
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
|
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
|
use measure_time::{debug_time, trace_time};
|
||||||
use tantivy_bitpacker::BlockedBitpacker;
|
use tantivy_bitpacker::BlockedBitpacker;
|
||||||
|
|
||||||
use super::multivalued::MultiValuedFastFieldWriter;
|
use super::multivalued::MultiValuedFastFieldWriter;
|
||||||
@@ -215,6 +216,7 @@ impl FastFieldsWriter {
|
|||||||
mapping: &HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>,
|
mapping: &HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>,
|
||||||
doc_id_map: Option<&DocIdMapping>,
|
doc_id_map: Option<&DocIdMapping>,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
|
debug_time!("segment-serialize-all-fast-fields",);
|
||||||
for field_writer in self.term_id_writers {
|
for field_writer in self.term_id_writers {
|
||||||
let field = field_writer.field();
|
let field = field_writer.field();
|
||||||
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
|
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
|
||||||
@@ -367,6 +369,11 @@ impl IntFastFieldWriter {
|
|||||||
num_vals: self.val_count as u64,
|
num_vals: self.val_count as u64,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
trace_time!(
|
||||||
|
"segment-serialize-single-value-field, field_id {:?}",
|
||||||
|
self.field()
|
||||||
|
);
|
||||||
|
|
||||||
serializer.create_auto_detect_u64_fast_field(self.field, fastfield_accessor)?;
|
serializer.create_auto_detect_u64_fast_field(self.field, fastfield_accessor)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -246,18 +246,27 @@ impl DeleteCursor {
|
|||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
use super::{DeleteOperation, DeleteQueue};
|
use super::{DeleteOperation, DeleteQueue};
|
||||||
use crate::schema::{Field, Term};
|
use crate::query::{Explanation, Scorer, Weight};
|
||||||
|
use crate::{DocId, Score, SegmentReader};
|
||||||
|
|
||||||
|
struct DummyWeight;
|
||||||
|
impl Weight for DummyWeight {
|
||||||
|
fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||||
|
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
|
||||||
|
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_deletequeue() {
|
fn test_deletequeue() {
|
||||||
let delete_queue = DeleteQueue::new();
|
let delete_queue = DeleteQueue::new();
|
||||||
|
|
||||||
let make_op = |i: usize| {
|
let make_op = |i: usize| DeleteOperation {
|
||||||
let field = Field::from_field_id(1u32);
|
opstamp: i as u64,
|
||||||
DeleteOperation {
|
target: Box::new(DummyWeight),
|
||||||
opstamp: i as u64,
|
|
||||||
term: Term::from_field_u64(field, i as u64),
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
delete_queue.push(make_op(1));
|
delete_queue.push(make_op(1));
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ use super::segment_updater::SegmentUpdater;
|
|||||||
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
|
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
|
||||||
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
|
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
|
||||||
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
|
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
|
||||||
use crate::docset::{DocSet, TERMINATED};
|
|
||||||
use crate::error::TantivyError;
|
use crate::error::TantivyError;
|
||||||
use crate::fastfield::write_alive_bitset;
|
use crate::fastfield::write_alive_bitset;
|
||||||
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||||
@@ -20,8 +19,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus;
|
|||||||
use crate::indexer::operation::DeleteOperation;
|
use crate::indexer::operation::DeleteOperation;
|
||||||
use crate::indexer::stamper::Stamper;
|
use crate::indexer::stamper::Stamper;
|
||||||
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
|
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
|
||||||
|
use crate::query::{Query, TermQuery};
|
||||||
use crate::schema::{Document, IndexRecordOption, Term};
|
use crate::schema::{Document, IndexRecordOption, Term};
|
||||||
use crate::{FutureResult, Opstamp};
|
use crate::{FutureResult, IndexReader, Opstamp};
|
||||||
|
|
||||||
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
|
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
|
||||||
// in the `memory_arena` goes below MARGIN_IN_BYTES.
|
// in the `memory_arena` goes below MARGIN_IN_BYTES.
|
||||||
@@ -57,6 +57,7 @@ pub struct IndexWriter {
|
|||||||
_directory_lock: Option<DirectoryLock>,
|
_directory_lock: Option<DirectoryLock>,
|
||||||
|
|
||||||
index: Index,
|
index: Index,
|
||||||
|
index_reader: IndexReader,
|
||||||
|
|
||||||
memory_arena_in_bytes_per_thread: usize,
|
memory_arena_in_bytes_per_thread: usize,
|
||||||
|
|
||||||
@@ -92,19 +93,14 @@ fn compute_deleted_bitset(
|
|||||||
|
|
||||||
// A delete operation should only affect
|
// A delete operation should only affect
|
||||||
// document that were inserted before it.
|
// document that were inserted before it.
|
||||||
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
|
delete_op
|
||||||
if let Some(mut docset) =
|
.target
|
||||||
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
|
.for_each(segment_reader, &mut |doc_matching_delete_query, _| {
|
||||||
{
|
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
|
||||||
let mut doc_matching_deleted_term = docset.doc();
|
alive_bitset.remove(doc_matching_delete_query);
|
||||||
while doc_matching_deleted_term != TERMINATED {
|
|
||||||
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
|
|
||||||
alive_bitset.remove(doc_matching_deleted_term);
|
|
||||||
might_have_changed = true;
|
might_have_changed = true;
|
||||||
}
|
}
|
||||||
doc_matching_deleted_term = docset.advance();
|
})?;
|
||||||
}
|
|
||||||
}
|
|
||||||
delete_cursor.advance();
|
delete_cursor.advance();
|
||||||
}
|
}
|
||||||
Ok(might_have_changed)
|
Ok(might_have_changed)
|
||||||
@@ -302,6 +298,7 @@ impl IndexWriter {
|
|||||||
|
|
||||||
memory_arena_in_bytes_per_thread,
|
memory_arena_in_bytes_per_thread,
|
||||||
index: index.clone(),
|
index: index.clone(),
|
||||||
|
index_reader: index.reader()?,
|
||||||
|
|
||||||
index_writer_status: IndexWriterStatus::from(document_receiver),
|
index_writer_status: IndexWriterStatus::from(document_receiver),
|
||||||
operation_sender: document_sender,
|
operation_sender: document_sender,
|
||||||
@@ -666,10 +663,33 @@ impl IndexWriter {
|
|||||||
/// Like adds, the deletion itself will be visible
|
/// Like adds, the deletion itself will be visible
|
||||||
/// only after calling `commit()`.
|
/// only after calling `commit()`.
|
||||||
pub fn delete_term(&self, term: Term) -> Opstamp {
|
pub fn delete_term(&self, term: Term) -> Opstamp {
|
||||||
|
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||||
|
// For backward compatibility, if Term is invalid for the index, do nothing but return an
|
||||||
|
// Opstamp
|
||||||
|
self.delete_query(Box::new(query))
|
||||||
|
.unwrap_or_else(|_| self.stamper.stamp())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete all documents matching a given query.
|
||||||
|
/// Returns an `Err` if the query can't be executed.
|
||||||
|
///
|
||||||
|
/// Delete operation only affects documents that
|
||||||
|
/// were added in previous commits, and documents
|
||||||
|
/// that were added previously in the same commit.
|
||||||
|
///
|
||||||
|
/// Like adds, the deletion itself will be visible
|
||||||
|
/// only after calling `commit()`.
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
|
||||||
|
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||||
|
|
||||||
let opstamp = self.stamper.stamp();
|
let opstamp = self.stamper.stamp();
|
||||||
let delete_operation = DeleteOperation { opstamp, term };
|
let delete_operation = DeleteOperation {
|
||||||
|
opstamp,
|
||||||
|
target: weight,
|
||||||
|
};
|
||||||
self.delete_queue.push(delete_operation);
|
self.delete_queue.push(delete_operation);
|
||||||
opstamp
|
Ok(opstamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the opstamp of the last successful commit.
|
/// Returns the opstamp of the last successful commit.
|
||||||
@@ -738,10 +758,17 @@ impl IndexWriter {
|
|||||||
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
|
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
|
||||||
|
|
||||||
let mut adds = AddBatch::default();
|
let mut adds = AddBatch::default();
|
||||||
|
|
||||||
for (user_op, opstamp) in user_operations_it.zip(stamps) {
|
for (user_op, opstamp) in user_operations_it.zip(stamps) {
|
||||||
match user_op {
|
match user_op {
|
||||||
UserOperation::Delete(term) => {
|
UserOperation::Delete(term) => {
|
||||||
let delete_operation = DeleteOperation { opstamp, term };
|
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||||
|
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||||
|
|
||||||
|
let delete_operation = DeleteOperation {
|
||||||
|
opstamp,
|
||||||
|
target: weight,
|
||||||
|
};
|
||||||
self.delete_queue.push(delete_operation);
|
self.delete_queue.push(delete_operation);
|
||||||
}
|
}
|
||||||
UserOperation::Add(document) => {
|
UserOperation::Add(document) => {
|
||||||
@@ -786,7 +813,7 @@ mod tests {
|
|||||||
use crate::directory::error::LockError;
|
use crate::directory::error::LockError;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::indexer::NoMergePolicy;
|
use crate::indexer::NoMergePolicy;
|
||||||
use crate::query::{QueryParser, TermQuery};
|
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
|
||||||
use crate::schema::{
|
use crate::schema::{
|
||||||
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
|
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
|
||||||
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
|
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
|
||||||
@@ -1418,10 +1445,72 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
|
||||||
|
let mut schema_builder = schema::Schema::builder();
|
||||||
|
let id_field =
|
||||||
|
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
|
||||||
|
let settings = IndexSettings {
|
||||||
|
sort_by_field: Some(IndexSortByField {
|
||||||
|
field: "id".to_string(),
|
||||||
|
order: Order::Desc,
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let index = Index::builder()
|
||||||
|
.schema(schema)
|
||||||
|
.settings(settings)
|
||||||
|
.create_in_ram()?;
|
||||||
|
let index_reader = index.reader()?;
|
||||||
|
let mut index_writer = index.writer_for_tests()?;
|
||||||
|
|
||||||
|
// create and delete docs in same commit
|
||||||
|
for id in 0u64..5u64 {
|
||||||
|
index_writer.add_document(doc!(id_field => id))?;
|
||||||
|
}
|
||||||
|
for id in 1u64..4u64 {
|
||||||
|
let term = Term::from_field_u64(id_field, id);
|
||||||
|
let not_term = Term::from_field_u64(id_field, 2);
|
||||||
|
let term = Box::new(TermQuery::new(term, Default::default()));
|
||||||
|
let not_term = Box::new(TermQuery::new(not_term, Default::default()));
|
||||||
|
|
||||||
|
let query: BooleanQuery = vec![
|
||||||
|
(Occur::Must, term as Box<dyn Query>),
|
||||||
|
(Occur::MustNot, not_term as Box<dyn Query>),
|
||||||
|
]
|
||||||
|
.into();
|
||||||
|
|
||||||
|
index_writer.delete_query(Box::new(query))?;
|
||||||
|
}
|
||||||
|
for id in 5u64..10u64 {
|
||||||
|
index_writer.add_document(doc!(id_field => id))?;
|
||||||
|
}
|
||||||
|
index_writer.commit()?;
|
||||||
|
index_reader.reload()?;
|
||||||
|
|
||||||
|
let searcher = index_reader.searcher();
|
||||||
|
assert_eq!(searcher.segment_readers().len(), 1);
|
||||||
|
|
||||||
|
let segment_reader = searcher.segment_reader(0);
|
||||||
|
assert_eq!(segment_reader.num_docs(), 8);
|
||||||
|
assert_eq!(segment_reader.max_doc(), 10);
|
||||||
|
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
|
||||||
|
let in_order_alive_ids: Vec<u64> = segment_reader
|
||||||
|
.doc_ids_alive()
|
||||||
|
.map(|doc| fast_field_reader.get_val(doc as u64))
|
||||||
|
.collect();
|
||||||
|
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
enum IndexingOp {
|
enum IndexingOp {
|
||||||
AddDoc { id: u64 },
|
AddDoc { id: u64 },
|
||||||
DeleteDoc { id: u64 },
|
DeleteDoc { id: u64 },
|
||||||
|
DeleteDocQuery { id: u64 },
|
||||||
Commit,
|
Commit,
|
||||||
Merge,
|
Merge,
|
||||||
}
|
}
|
||||||
@@ -1429,6 +1518,7 @@ mod tests {
|
|||||||
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||||
prop_oneof![
|
prop_oneof![
|
||||||
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||||
|
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||||
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||||
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||||
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||||
@@ -1437,7 +1527,8 @@ mod tests {
|
|||||||
|
|
||||||
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||||
prop_oneof![
|
prop_oneof![
|
||||||
10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||||
|
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||||
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||||
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||||
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||||
@@ -1457,6 +1548,10 @@ mod tests {
|
|||||||
existing_ids.remove(&id);
|
existing_ids.remove(&id);
|
||||||
deleted_ids.insert(id);
|
deleted_ids.insert(id);
|
||||||
}
|
}
|
||||||
|
IndexingOp::DeleteDocQuery { id } => {
|
||||||
|
existing_ids.remove(&id);
|
||||||
|
deleted_ids.insert(id);
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1539,6 +1634,11 @@ mod tests {
|
|||||||
IndexingOp::DeleteDoc { id } => {
|
IndexingOp::DeleteDoc { id } => {
|
||||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||||
}
|
}
|
||||||
|
IndexingOp::DeleteDocQuery { id } => {
|
||||||
|
let term = Term::from_field_u64(id_field, id);
|
||||||
|
let query = TermQuery::new(term, Default::default());
|
||||||
|
index_writer.delete_query(Box::new(query))?;
|
||||||
|
}
|
||||||
IndexingOp::Commit => {
|
IndexingOp::Commit => {
|
||||||
index_writer.commit()?;
|
index_writer.commit()?;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use fastfield_codecs::VecColumn;
|
use fastfield_codecs::VecColumn;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use measure_time::debug_time;
|
use measure_time::{debug_time, trace_time};
|
||||||
|
|
||||||
use crate::core::{Segment, SegmentReader};
|
use crate::core::{Segment, SegmentReader};
|
||||||
use crate::docset::{DocSet, TERMINATED};
|
use crate::docset::{DocSet, TERMINATED};
|
||||||
@@ -250,7 +250,11 @@ impl IndexMerger {
|
|||||||
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
|
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
|
||||||
doc_id_mapping: &SegmentDocIdMapping,
|
doc_id_mapping: &SegmentDocIdMapping,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
debug_time!("write-fast-fields");
|
debug_time!(
|
||||||
|
"merge-all-fast-fields, num_segments {}, num docs new segment {}",
|
||||||
|
self.readers.len(),
|
||||||
|
doc_id_mapping.len()
|
||||||
|
);
|
||||||
|
|
||||||
for (field, field_entry) in self.schema.fields() {
|
for (field, field_entry) in self.schema.fields() {
|
||||||
let field_type = field_entry.field_type();
|
let field_type = field_entry.field_type();
|
||||||
@@ -311,6 +315,12 @@ impl IndexMerger {
|
|||||||
doc_id_mapping: &SegmentDocIdMapping,
|
doc_id_mapping: &SegmentDocIdMapping,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field);
|
let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field);
|
||||||
|
trace_time!(
|
||||||
|
"merge-single-fast-field, num_vals {}, num_segments {}, field_id {:?}",
|
||||||
|
fast_field_accessor.num_vals(),
|
||||||
|
self.readers.len(),
|
||||||
|
field
|
||||||
|
);
|
||||||
fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?;
|
fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -458,6 +468,12 @@ impl IndexMerger {
|
|||||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||||
doc_id_mapping: &SegmentDocIdMapping,
|
doc_id_mapping: &SegmentDocIdMapping,
|
||||||
) -> crate::Result<Vec<u64>> {
|
) -> crate::Result<Vec<u64>> {
|
||||||
|
trace_time!(
|
||||||
|
"merge-multi-fast-field-idx, num_segments {}, field_id {:?}",
|
||||||
|
self.readers.len(),
|
||||||
|
field
|
||||||
|
);
|
||||||
|
|
||||||
let reader_ordinal_and_field_accessors = self
|
let reader_ordinal_and_field_accessors = self
|
||||||
.readers
|
.readers
|
||||||
.iter()
|
.iter()
|
||||||
@@ -488,7 +504,7 @@ impl IndexMerger {
|
|||||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||||
doc_id_mapping: &SegmentDocIdMapping,
|
doc_id_mapping: &SegmentDocIdMapping,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
debug_time!("write-term-id-fast-field");
|
trace_time!("write-term-id-fast-field");
|
||||||
|
|
||||||
// Multifastfield consists of 2 fastfields.
|
// Multifastfield consists of 2 fastfields.
|
||||||
// The first serves as an index into the second one and is strictly increasing.
|
// The first serves as an index into the second one and is strictly increasing.
|
||||||
@@ -571,6 +587,13 @@ impl IndexMerger {
|
|||||||
|
|
||||||
let fastfield_accessor =
|
let fastfield_accessor =
|
||||||
SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field);
|
SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field);
|
||||||
|
trace_time!(
|
||||||
|
"merge-multi-fast-field-values, num_vals {}, num_segments {}, field_id {:?}",
|
||||||
|
fastfield_accessor.num_vals(),
|
||||||
|
self.readers.len(),
|
||||||
|
field
|
||||||
|
);
|
||||||
|
|
||||||
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
|
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
|
||||||
field,
|
field,
|
||||||
fastfield_accessor,
|
fastfield_accessor,
|
||||||
@@ -624,7 +647,7 @@ impl IndexMerger {
|
|||||||
fieldnorm_reader: Option<FieldNormReader>,
|
fieldnorm_reader: Option<FieldNormReader>,
|
||||||
doc_id_mapping: &SegmentDocIdMapping,
|
doc_id_mapping: &SegmentDocIdMapping,
|
||||||
) -> crate::Result<Option<TermOrdinalMapping>> {
|
) -> crate::Result<Option<TermOrdinalMapping>> {
|
||||||
debug_time!("write-postings-for-field");
|
debug_time!("write-postings-for-field {:?}", indexed_field);
|
||||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||||
let mut delta_computer = DeltaComputer::new();
|
let mut delta_computer = DeltaComputer::new();
|
||||||
|
|
||||||
@@ -827,7 +850,7 @@ impl IndexMerger {
|
|||||||
debug!("write-storable-field");
|
debug!("write-storable-field");
|
||||||
|
|
||||||
if !doc_id_mapping.is_trivial() {
|
if !doc_id_mapping.is_trivial() {
|
||||||
debug!("non-trivial-doc-id-mapping");
|
debug!("non-trivial-doc-id-mapping (index is sorted)");
|
||||||
|
|
||||||
let store_readers: Vec<_> = self
|
let store_readers: Vec<_> = self
|
||||||
.readers
|
.readers
|
||||||
@@ -855,7 +878,7 @@ impl IndexMerger {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug!("trivial-doc-id-mapping");
|
debug!("trivial-doc-id-mapping (index is not sorted)");
|
||||||
for reader in &self.readers {
|
for reader in &self.readers {
|
||||||
let store_reader = reader.get_store_reader(1)?;
|
let store_reader = reader.get_store_reader(1)?;
|
||||||
if reader.has_deletes()
|
if reader.has_deletes()
|
||||||
|
|||||||
@@ -1,20 +1,11 @@
|
|||||||
|
use crate::query::Weight;
|
||||||
use crate::schema::{Document, Term};
|
use crate::schema::{Document, Term};
|
||||||
use crate::Opstamp;
|
use crate::Opstamp;
|
||||||
|
|
||||||
/// Timestamped Delete operation.
|
/// Timestamped Delete operation.
|
||||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
|
||||||
pub struct DeleteOperation {
|
pub struct DeleteOperation {
|
||||||
pub opstamp: Opstamp,
|
pub opstamp: Opstamp,
|
||||||
pub term: Term,
|
pub target: Box<dyn Weight>,
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for DeleteOperation {
|
|
||||||
fn default() -> Self {
|
|
||||||
DeleteOperation {
|
|
||||||
opstamp: 0u64,
|
|
||||||
term: Term::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Timestamped Add operation.
|
/// Timestamped Add operation.
|
||||||
|
|||||||
Reference in New Issue
Block a user