mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
12 Commits
column-rea
...
debug_time
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dbd3aed24a | ||
|
|
7707b8a6e1 | ||
|
|
dac7da780e | ||
|
|
20c87903b2 | ||
|
|
f9c3947803 | ||
|
|
e9a384bb15 | ||
|
|
d231671fe2 | ||
|
|
fa3d786a2f | ||
|
|
75aafeeb9b | ||
|
|
6f066c7f65 | ||
|
|
22e56aaee3 | ||
|
|
d641979127 |
@@ -259,11 +259,7 @@ impl BitSet {
|
||||
// we do not check saturated els.
|
||||
let higher = el / 64u32;
|
||||
let lower = el % 64u32;
|
||||
self.len += if self.tinysets[higher as usize].insert_mut(lower) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
self.len += u64::from(self.tinysets[higher as usize].insert_mut(lower));
|
||||
}
|
||||
|
||||
/// Inserts an element in the `BitSet`
|
||||
@@ -272,11 +268,7 @@ impl BitSet {
|
||||
// we do not check saturated els.
|
||||
let higher = el / 64u32;
|
||||
let lower = el % 64u32;
|
||||
self.len -= if self.tinysets[higher as usize].remove_mut(lower) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
self.len -= u64::from(self.tinysets[higher as usize].remove_mut(lower));
|
||||
}
|
||||
|
||||
/// Returns true iff the elements is in the `BitSet`.
|
||||
|
||||
@@ -161,8 +161,7 @@ impl FixedSize for u8 {
|
||||
|
||||
impl BinarySerializable for bool {
|
||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let val = if *self { 1 } else { 0 };
|
||||
writer.write_u8(val)
|
||||
writer.write_u8(u8::from(*self))
|
||||
}
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
|
||||
let val = reader.read_u8()?;
|
||||
|
||||
@@ -17,7 +17,7 @@ rand = {version="0.8.3", optional= true}
|
||||
fastdivide = "0.4"
|
||||
log = "0.4"
|
||||
itertools = { version = "0.10.3" }
|
||||
measure_time = { version="0.8.2", optional=true}
|
||||
measure_time = { version="0.8.2" }
|
||||
|
||||
[dev-dependencies]
|
||||
more-asserts = "0.3.0"
|
||||
@@ -25,7 +25,7 @@ proptest = "1.0.0"
|
||||
rand = "0.8.3"
|
||||
|
||||
[features]
|
||||
bin = ["prettytable-rs", "rand", "measure_time"]
|
||||
bin = ["prettytable-rs", "rand"]
|
||||
default = ["bin"]
|
||||
unstable = []
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ impl FastFieldCodec for BitpackedCodec {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn estimate(column: &impl Column) -> Option<f32> {
|
||||
fn estimate(column: &dyn Column) -> Option<f32> {
|
||||
let num_bits = compute_num_bits(column.max_value());
|
||||
let num_bits_uncompressed = 64;
|
||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||
|
||||
@@ -71,7 +71,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
||||
}
|
||||
|
||||
// Estimate first_chunk and extrapolate
|
||||
fn estimate(column: &impl crate::Column) -> Option<f32> {
|
||||
fn estimate(column: &dyn crate::Column) -> Option<f32> {
|
||||
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
|
||||
return None;
|
||||
}
|
||||
@@ -100,7 +100,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||
}
|
||||
|
||||
fn serialize(column: &dyn crate::Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
||||
fn serialize(column: &dyn Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
||||
// The BitpackedReader assumes a normalized vector.
|
||||
assert_eq!(column.min_value(), 0);
|
||||
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
|
||||
|
||||
@@ -123,7 +123,7 @@ trait FastFieldCodec: 'static {
|
||||
///
|
||||
/// The column iterator should be preferred over using column `get_val` method for
|
||||
/// performance reasons.
|
||||
fn serialize(column: &dyn Column<u64>, write: &mut impl Write) -> io::Result<()>;
|
||||
fn serialize(column: &dyn Column, write: &mut impl Write) -> io::Result<()>;
|
||||
|
||||
/// Returns an estimate of the compression ratio.
|
||||
/// If the codec is not applicable, returns `None`.
|
||||
@@ -132,7 +132,7 @@ trait FastFieldCodec: 'static {
|
||||
///
|
||||
/// It could make sense to also return a value representing
|
||||
/// computational complexity.
|
||||
fn estimate(column: &impl Column) -> Option<f32>;
|
||||
fn estimate(column: &dyn Column) -> Option<f32>;
|
||||
}
|
||||
|
||||
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
|
||||
|
||||
@@ -69,11 +69,17 @@ impl Line {
|
||||
|
||||
// Same as train, but the intercept is only estimated from provided sample positions
|
||||
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
|
||||
Self::train_from(ys, sample_positions.iter().cloned())
|
||||
Self::train_from(
|
||||
ys,
|
||||
sample_positions
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|pos| (pos, ys.get_val(pos))),
|
||||
)
|
||||
}
|
||||
|
||||
// Intercept is only computed from provided positions
|
||||
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self {
|
||||
fn train_from(ys: &dyn Column, positions_and_values: impl Iterator<Item = (u64, u64)>) -> Self {
|
||||
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
|
||||
num_vals
|
||||
} else {
|
||||
@@ -114,11 +120,8 @@ impl Line {
|
||||
intercept: 0,
|
||||
};
|
||||
let heuristic_shift = y0.wrapping_sub(MID_POINT);
|
||||
line.intercept = positions
|
||||
.map(|pos| {
|
||||
let y = ys.get_val(pos);
|
||||
y.wrapping_sub(line.eval(pos))
|
||||
})
|
||||
line.intercept = positions_and_values
|
||||
.map(|(pos, y)| y.wrapping_sub(line.eval(pos)))
|
||||
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))
|
||||
.unwrap_or(0u64); //< Never happens.
|
||||
line
|
||||
@@ -135,7 +138,10 @@ impl Line {
|
||||
/// This function is only invariable by translation if all of the
|
||||
/// `ys` are packaged into half of the space. (See heuristic below)
|
||||
pub fn train(ys: &dyn Column) -> Self {
|
||||
Self::train_from(ys, 0..ys.num_vals())
|
||||
Self::train_from(
|
||||
ys,
|
||||
ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -121,7 +121,7 @@ impl FastFieldCodec for LinearCodec {
|
||||
/// where the local maxima for the deviation of the calculated value are and
|
||||
/// the offset to shift all values to >=0 is also unknown.
|
||||
#[allow(clippy::question_mark)]
|
||||
fn estimate(column: &impl Column) -> Option<f32> {
|
||||
fn estimate(column: &dyn Column) -> Option<f32> {
|
||||
if column.num_vals() < 3 {
|
||||
return None; // disable compressor for this case
|
||||
}
|
||||
|
||||
@@ -36,11 +36,7 @@ impl MonotonicallyMappableToU64 for i64 {
|
||||
impl MonotonicallyMappableToU64 for bool {
|
||||
#[inline(always)]
|
||||
fn to_u64(self) -> u64 {
|
||||
if self {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
u64::from(self)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
||||
@@ -23,7 +23,8 @@ use std::sync::Arc;
|
||||
|
||||
use common::{BinarySerializable, VInt};
|
||||
use fastdivide::DividerU64;
|
||||
use log::warn;
|
||||
use log::{trace, warn};
|
||||
use measure_time::trace_time;
|
||||
use ownedbytes::OwnedBytes;
|
||||
|
||||
use crate::bitpacked::BitpackedCodec;
|
||||
@@ -183,6 +184,7 @@ fn detect_codec(
|
||||
) -> Option<FastFieldCodecType> {
|
||||
let mut estimations = Vec::new();
|
||||
for &codec in codecs {
|
||||
trace_time!("estimate time for codec: {:?}", codec);
|
||||
let estimation_opt = match codec {
|
||||
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&column),
|
||||
FastFieldCodecType::Linear => LinearCodec::estimate(&column),
|
||||
@@ -202,6 +204,7 @@ fn detect_codec(
|
||||
// codecs
|
||||
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
|
||||
estimations.sort_by(|(score_left, _), (score_right, _)| score_left.total_cmp(score_right));
|
||||
trace!("Chosen Codec {:?}", estimations.first()?.1);
|
||||
Some(estimations.first()?.1)
|
||||
}
|
||||
|
||||
@@ -210,6 +213,12 @@ fn serialize_given_codec(
|
||||
codec_type: FastFieldCodecType,
|
||||
output: &mut impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
trace_time!(
|
||||
"Serialize time for codec: {:?}, num_vals {}",
|
||||
codec_type,
|
||||
column.num_vals()
|
||||
);
|
||||
|
||||
match codec_type {
|
||||
FastFieldCodecType::Bitpacked => {
|
||||
BitpackedCodec::serialize(&column, output)?;
|
||||
|
||||
@@ -425,7 +425,7 @@ impl SegmentHistogramCollector {
|
||||
let bucket = &mut self.buckets[bucket_pos];
|
||||
bucket.doc_count += 1;
|
||||
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
|
||||
(&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?;
|
||||
sub_aggregation[bucket_pos].collect(doc, bucket_with_accessor)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ impl SegmentId {
|
||||
/// Picking the first 8 chars is ok to identify
|
||||
/// segments in a display message (e.g. a5c4dfcb).
|
||||
pub fn short_uuid_string(&self) -> String {
|
||||
(&self.0.as_simple().to_string()[..8]).to_string()
|
||||
self.0.as_simple().to_string()[..8].to_string()
|
||||
}
|
||||
|
||||
/// Returns a segment uuid string.
|
||||
|
||||
@@ -472,6 +472,8 @@ mod tests {
|
||||
// There are more tests in directory/mod.rs
|
||||
// The following tests are specific to the MmapDirectory
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common::HasLen;
|
||||
|
||||
use super::*;
|
||||
@@ -610,7 +612,14 @@ mod tests {
|
||||
mmap_directory.get_cache_info().mmapped.len()
|
||||
);
|
||||
}
|
||||
assert!(mmap_directory.get_cache_info().mmapped.is_empty());
|
||||
Ok(())
|
||||
// This test failed on CI. The last Mmap is dropped from the merging thread so there might
|
||||
// be a race condition indeed.
|
||||
for _ in 0..10 {
|
||||
if mmap_directory.get_cache_info().mmapped.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
std::thread::sleep(Duration::from_millis(200));
|
||||
}
|
||||
panic!("The cache still contains information. One of the Mmap has not been dropped.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +136,20 @@ impl RamDirectory {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Deep clones the directory.
|
||||
///
|
||||
/// Ulterior writes on one of the copy
|
||||
/// will not affect the other copy.
|
||||
pub fn deep_clone(&self) -> RamDirectory {
|
||||
let inner_clone = InnerDirectory {
|
||||
fs: self.fs.read().unwrap().fs.clone(),
|
||||
watch_router: Default::default(),
|
||||
};
|
||||
RamDirectory {
|
||||
fs: Arc::new(RwLock::new(inner_clone)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the sum of the size of the different files
|
||||
/// in the [`RamDirectory`].
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
@@ -256,4 +270,23 @@ mod tests {
|
||||
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
|
||||
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ram_directory_deep_clone() {
|
||||
let dir = RamDirectory::default();
|
||||
let test = Path::new("test");
|
||||
let test2 = Path::new("test2");
|
||||
dir.atomic_write(test, b"firstwrite").unwrap();
|
||||
let dir_clone = dir.deep_clone();
|
||||
assert_eq!(
|
||||
dir_clone.atomic_read(test).unwrap(),
|
||||
dir.atomic_read(test).unwrap()
|
||||
);
|
||||
dir.atomic_write(test, b"original").unwrap();
|
||||
dir_clone.atomic_write(test, b"clone").unwrap();
|
||||
dir_clone.atomic_write(test2, b"clone2").unwrap();
|
||||
assert_eq!(dir.atomic_read(test).unwrap(), b"original");
|
||||
assert_eq!(&dir_clone.atomic_read(test).unwrap(), b"clone");
|
||||
assert_eq!(&dir_clone.atomic_read(test2).unwrap(), b"clone2");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -402,6 +402,74 @@ mod bench {
|
||||
use crate::schema::{Cardinality, NumericOptions, Schema};
|
||||
use crate::Document;
|
||||
|
||||
fn bench_multi_value_ff_merge_opt(
|
||||
num_docs: usize,
|
||||
segments_every_n_docs: usize,
|
||||
merge_policy: impl crate::indexer::MergePolicy + 'static,
|
||||
) {
|
||||
let mut builder = crate::schema::SchemaBuilder::new();
|
||||
|
||||
let fast_multi =
|
||||
crate::schema::NumericOptions::default().set_fast(Cardinality::MultiValues);
|
||||
let multi_field = builder.add_f64_field("f64s", fast_multi);
|
||||
|
||||
let index = crate::Index::create_in_ram(builder.build());
|
||||
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.set_merge_policy(Box::new(merge_policy));
|
||||
|
||||
for i in 0..num_docs {
|
||||
let mut doc = crate::Document::new();
|
||||
doc.add_f64(multi_field, 0.24);
|
||||
doc.add_f64(multi_field, 0.27);
|
||||
doc.add_f64(multi_field, 0.37);
|
||||
if i % 3 == 0 {
|
||||
doc.add_f64(multi_field, 0.44);
|
||||
}
|
||||
|
||||
writer.add_document(doc).unwrap();
|
||||
if i % segments_every_n_docs == 0 {
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
writer.wait_merging_threads().unwrap();
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
let segment_ids = index.searchable_segment_ids().unwrap();
|
||||
writer.merge(&segment_ids).wait().unwrap();
|
||||
}
|
||||
|
||||
// If a merging thread fails, we should end up with more
|
||||
// than one segment here
|
||||
assert_eq!(1, index.searchable_segments().unwrap().len());
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_multi_value_ff_merge_many_segments(b: &mut Bencher) {
|
||||
let num_docs = 100_000;
|
||||
b.iter(|| {
|
||||
bench_multi_value_ff_merge_opt(num_docs, 1_000, crate::indexer::NoMergePolicy);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_multi_value_ff_merge_many_segments_log_merge(b: &mut Bencher) {
|
||||
let num_docs = 100_000;
|
||||
b.iter(|| {
|
||||
let merge_policy = crate::indexer::LogMergePolicy::default();
|
||||
bench_multi_value_ff_merge_opt(num_docs, 1_000, merge_policy);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_multi_value_ff_merge_few_segments(b: &mut Bencher) {
|
||||
let num_docs = 100_000;
|
||||
b.iter(|| {
|
||||
bench_multi_value_ff_merge_opt(num_docs, 33_000, crate::indexer::NoMergePolicy);
|
||||
});
|
||||
}
|
||||
|
||||
fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> {
|
||||
let mut vals = vec![];
|
||||
for _i in 0..num_docs {
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::sync::Mutex;
|
||||
|
||||
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
|
||||
use fnv::FnvHashMap;
|
||||
use measure_time::{debug_time, trace_time};
|
||||
|
||||
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
@@ -146,6 +147,13 @@ impl MultiValuedFastFieldWriter {
|
||||
{
|
||||
self.doc_index.push(self.vals.len() as u64);
|
||||
let col = VecColumn::from(&self.doc_index[..]);
|
||||
|
||||
trace_time!(
|
||||
"segment-serialize-multi-fast-field-idx, num_vals {}, field_id {:?}",
|
||||
col.num_vals(),
|
||||
self.field()
|
||||
);
|
||||
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
|
||||
serializer.create_auto_detect_u64_fast_field_with_idx(
|
||||
@@ -158,6 +166,12 @@ impl MultiValuedFastFieldWriter {
|
||||
}
|
||||
}
|
||||
{
|
||||
trace_time!(
|
||||
"segment-serialize-multi-fast-field-values, num_vals {}, field_id {:?}",
|
||||
self.vals.len(),
|
||||
self.field()
|
||||
);
|
||||
|
||||
// Writing the values themselves.
|
||||
// TODO FIXME: Use less memory.
|
||||
let mut values: Vec<u64> = Vec::new();
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::io;
|
||||
use common;
|
||||
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
|
||||
use fnv::FnvHashMap;
|
||||
use measure_time::{debug_time, trace_time};
|
||||
use tantivy_bitpacker::BlockedBitpacker;
|
||||
|
||||
use super::multivalued::MultiValuedFastFieldWriter;
|
||||
@@ -215,6 +216,7 @@ impl FastFieldsWriter {
|
||||
mapping: &HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> io::Result<()> {
|
||||
debug_time!("segment-serialize-all-fast-fields",);
|
||||
for field_writer in self.term_id_writers {
|
||||
let field = field_writer.field();
|
||||
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
|
||||
@@ -367,6 +369,11 @@ impl IntFastFieldWriter {
|
||||
num_vals: self.val_count as u64,
|
||||
};
|
||||
|
||||
trace_time!(
|
||||
"segment-serialize-single-value-field, field_id {:?}",
|
||||
self.field()
|
||||
);
|
||||
|
||||
serializer.create_auto_detect_u64_fast_field(self.field, fastfield_accessor)?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -246,18 +246,27 @@ impl DeleteCursor {
|
||||
mod tests {
|
||||
|
||||
use super::{DeleteOperation, DeleteQueue};
|
||||
use crate::schema::{Field, Term};
|
||||
use crate::query::{Explanation, Scorer, Weight};
|
||||
use crate::{DocId, Score, SegmentReader};
|
||||
|
||||
struct DummyWeight;
|
||||
impl Weight for DummyWeight {
|
||||
fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||
}
|
||||
|
||||
fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
|
||||
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deletequeue() {
|
||||
let delete_queue = DeleteQueue::new();
|
||||
|
||||
let make_op = |i: usize| {
|
||||
let field = Field::from_field_id(1u32);
|
||||
DeleteOperation {
|
||||
opstamp: i as u64,
|
||||
term: Term::from_field_u64(field, i as u64),
|
||||
}
|
||||
let make_op = |i: usize| DeleteOperation {
|
||||
opstamp: i as u64,
|
||||
target: Box::new(DummyWeight),
|
||||
};
|
||||
|
||||
delete_queue.push(make_op(1));
|
||||
|
||||
@@ -11,7 +11,6 @@ use super::segment_updater::SegmentUpdater;
|
||||
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
|
||||
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
|
||||
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::error::TantivyError;
|
||||
use crate::fastfield::write_alive_bitset;
|
||||
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||
@@ -20,8 +19,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus;
|
||||
use crate::indexer::operation::DeleteOperation;
|
||||
use crate::indexer::stamper::Stamper;
|
||||
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
|
||||
use crate::query::{Query, TermQuery};
|
||||
use crate::schema::{Document, IndexRecordOption, Term};
|
||||
use crate::{FutureResult, Opstamp};
|
||||
use crate::{FutureResult, IndexReader, Opstamp};
|
||||
|
||||
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
|
||||
// in the `memory_arena` goes below MARGIN_IN_BYTES.
|
||||
@@ -57,6 +57,7 @@ pub struct IndexWriter {
|
||||
_directory_lock: Option<DirectoryLock>,
|
||||
|
||||
index: Index,
|
||||
index_reader: IndexReader,
|
||||
|
||||
memory_arena_in_bytes_per_thread: usize,
|
||||
|
||||
@@ -92,19 +93,14 @@ fn compute_deleted_bitset(
|
||||
|
||||
// A delete operation should only affect
|
||||
// document that were inserted before it.
|
||||
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
|
||||
if let Some(mut docset) =
|
||||
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
|
||||
{
|
||||
let mut doc_matching_deleted_term = docset.doc();
|
||||
while doc_matching_deleted_term != TERMINATED {
|
||||
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
|
||||
alive_bitset.remove(doc_matching_deleted_term);
|
||||
delete_op
|
||||
.target
|
||||
.for_each(segment_reader, &mut |doc_matching_delete_query, _| {
|
||||
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
|
||||
alive_bitset.remove(doc_matching_delete_query);
|
||||
might_have_changed = true;
|
||||
}
|
||||
doc_matching_deleted_term = docset.advance();
|
||||
}
|
||||
}
|
||||
})?;
|
||||
delete_cursor.advance();
|
||||
}
|
||||
Ok(might_have_changed)
|
||||
@@ -302,6 +298,7 @@ impl IndexWriter {
|
||||
|
||||
memory_arena_in_bytes_per_thread,
|
||||
index: index.clone(),
|
||||
index_reader: index.reader()?,
|
||||
|
||||
index_writer_status: IndexWriterStatus::from(document_receiver),
|
||||
operation_sender: document_sender,
|
||||
@@ -666,10 +663,33 @@ impl IndexWriter {
|
||||
/// Like adds, the deletion itself will be visible
|
||||
/// only after calling `commit()`.
|
||||
pub fn delete_term(&self, term: Term) -> Opstamp {
|
||||
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
// For backward compatibility, if Term is invalid for the index, do nothing but return an
|
||||
// Opstamp
|
||||
self.delete_query(Box::new(query))
|
||||
.unwrap_or_else(|_| self.stamper.stamp())
|
||||
}
|
||||
|
||||
/// Delete all documents matching a given query.
|
||||
/// Returns an `Err` if the query can't be executed.
|
||||
///
|
||||
/// Delete operation only affects documents that
|
||||
/// were added in previous commits, and documents
|
||||
/// that were added previously in the same commit.
|
||||
///
|
||||
/// Like adds, the deletion itself will be visible
|
||||
/// only after calling `commit()`.
|
||||
#[doc(hidden)]
|
||||
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
|
||||
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||
|
||||
let opstamp = self.stamper.stamp();
|
||||
let delete_operation = DeleteOperation { opstamp, term };
|
||||
let delete_operation = DeleteOperation {
|
||||
opstamp,
|
||||
target: weight,
|
||||
};
|
||||
self.delete_queue.push(delete_operation);
|
||||
opstamp
|
||||
Ok(opstamp)
|
||||
}
|
||||
|
||||
/// Returns the opstamp of the last successful commit.
|
||||
@@ -738,10 +758,17 @@ impl IndexWriter {
|
||||
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
|
||||
|
||||
let mut adds = AddBatch::default();
|
||||
|
||||
for (user_op, opstamp) in user_operations_it.zip(stamps) {
|
||||
match user_op {
|
||||
UserOperation::Delete(term) => {
|
||||
let delete_operation = DeleteOperation { opstamp, term };
|
||||
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||
|
||||
let delete_operation = DeleteOperation {
|
||||
opstamp,
|
||||
target: weight,
|
||||
};
|
||||
self.delete_queue.push(delete_operation);
|
||||
}
|
||||
UserOperation::Add(document) => {
|
||||
@@ -786,7 +813,7 @@ mod tests {
|
||||
use crate::directory::error::LockError;
|
||||
use crate::error::*;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::{QueryParser, TermQuery};
|
||||
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
|
||||
use crate::schema::{
|
||||
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
|
||||
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
|
||||
@@ -1418,10 +1445,72 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let id_field =
|
||||
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "id".to_string(),
|
||||
order: Order::Desc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let index_reader = index.reader()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
// create and delete docs in same commit
|
||||
for id in 0u64..5u64 {
|
||||
index_writer.add_document(doc!(id_field => id))?;
|
||||
}
|
||||
for id in 1u64..4u64 {
|
||||
let term = Term::from_field_u64(id_field, id);
|
||||
let not_term = Term::from_field_u64(id_field, 2);
|
||||
let term = Box::new(TermQuery::new(term, Default::default()));
|
||||
let not_term = Box::new(TermQuery::new(not_term, Default::default()));
|
||||
|
||||
let query: BooleanQuery = vec![
|
||||
(Occur::Must, term as Box<dyn Query>),
|
||||
(Occur::MustNot, not_term as Box<dyn Query>),
|
||||
]
|
||||
.into();
|
||||
|
||||
index_writer.delete_query(Box::new(query))?;
|
||||
}
|
||||
for id in 5u64..10u64 {
|
||||
index_writer.add_document(doc!(id_field => id))?;
|
||||
}
|
||||
index_writer.commit()?;
|
||||
index_reader.reload()?;
|
||||
|
||||
let searcher = index_reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
assert_eq!(segment_reader.num_docs(), 8);
|
||||
assert_eq!(segment_reader.max_doc(), 10);
|
||||
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
|
||||
let in_order_alive_ids: Vec<u64> = segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(|doc| fast_field_reader.get_val(doc as u64))
|
||||
.collect();
|
||||
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum IndexingOp {
|
||||
AddDoc { id: u64 },
|
||||
DeleteDoc { id: u64 },
|
||||
DeleteDocQuery { id: u64 },
|
||||
Commit,
|
||||
Merge,
|
||||
}
|
||||
@@ -1429,6 +1518,7 @@ mod tests {
|
||||
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||
prop_oneof![
|
||||
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||
@@ -1437,7 +1527,8 @@ mod tests {
|
||||
|
||||
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||
prop_oneof![
|
||||
10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||
@@ -1457,6 +1548,10 @@ mod tests {
|
||||
existing_ids.remove(&id);
|
||||
deleted_ids.insert(id);
|
||||
}
|
||||
IndexingOp::DeleteDocQuery { id } => {
|
||||
existing_ids.remove(&id);
|
||||
deleted_ids.insert(id);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1539,6 +1634,11 @@ mod tests {
|
||||
IndexingOp::DeleteDoc { id } => {
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||
}
|
||||
IndexingOp::DeleteDocQuery { id } => {
|
||||
let term = Term::from_field_u64(id_field, id);
|
||||
let query = TermQuery::new(term, Default::default());
|
||||
index_writer.delete_query(Box::new(query))?;
|
||||
}
|
||||
IndexingOp::Commit => {
|
||||
index_writer.commit()?;
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::sync::Arc;
|
||||
|
||||
use fastfield_codecs::VecColumn;
|
||||
use itertools::Itertools;
|
||||
use measure_time::debug_time;
|
||||
use measure_time::{debug_time, trace_time};
|
||||
|
||||
use crate::core::{Segment, SegmentReader};
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
@@ -250,7 +250,11 @@ impl IndexMerger {
|
||||
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("write-fast-fields");
|
||||
debug_time!(
|
||||
"merge-all-fast-fields, num_segments {}, num docs new segment {}",
|
||||
self.readers.len(),
|
||||
doc_id_mapping.len()
|
||||
);
|
||||
|
||||
for (field, field_entry) in self.schema.fields() {
|
||||
let field_type = field_entry.field_type();
|
||||
@@ -311,6 +315,12 @@ impl IndexMerger {
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field);
|
||||
trace_time!(
|
||||
"merge-single-fast-field, num_vals {}, num_segments {}, field_id {:?}",
|
||||
fast_field_accessor.num_vals(),
|
||||
self.readers.len(),
|
||||
field
|
||||
);
|
||||
fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?;
|
||||
|
||||
Ok(())
|
||||
@@ -458,6 +468,12 @@ impl IndexMerger {
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<Vec<u64>> {
|
||||
trace_time!(
|
||||
"merge-multi-fast-field-idx, num_segments {}, field_id {:?}",
|
||||
self.readers.len(),
|
||||
field
|
||||
);
|
||||
|
||||
let reader_ordinal_and_field_accessors = self
|
||||
.readers
|
||||
.iter()
|
||||
@@ -488,7 +504,7 @@ impl IndexMerger {
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("write-term-id-fast-field");
|
||||
trace_time!("write-term-id-fast-field");
|
||||
|
||||
// Multifastfield consists of 2 fastfields.
|
||||
// The first serves as an index into the second one and is strictly increasing.
|
||||
@@ -571,6 +587,13 @@ impl IndexMerger {
|
||||
|
||||
let fastfield_accessor =
|
||||
SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field);
|
||||
trace_time!(
|
||||
"merge-multi-fast-field-values, num_vals {}, num_segments {}, field_id {:?}",
|
||||
fastfield_accessor.num_vals(),
|
||||
self.readers.len(),
|
||||
field
|
||||
);
|
||||
|
||||
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
|
||||
field,
|
||||
fastfield_accessor,
|
||||
@@ -624,7 +647,7 @@ impl IndexMerger {
|
||||
fieldnorm_reader: Option<FieldNormReader>,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<Option<TermOrdinalMapping>> {
|
||||
debug_time!("write-postings-for-field");
|
||||
debug_time!("write-postings-for-field {:?}", indexed_field);
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
let mut delta_computer = DeltaComputer::new();
|
||||
|
||||
@@ -827,7 +850,7 @@ impl IndexMerger {
|
||||
debug!("write-storable-field");
|
||||
|
||||
if !doc_id_mapping.is_trivial() {
|
||||
debug!("non-trivial-doc-id-mapping");
|
||||
debug!("non-trivial-doc-id-mapping (index is sorted)");
|
||||
|
||||
let store_readers: Vec<_> = self
|
||||
.readers
|
||||
@@ -855,7 +878,7 @@ impl IndexMerger {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("trivial-doc-id-mapping");
|
||||
debug!("trivial-doc-id-mapping (index is not sorted)");
|
||||
for reader in &self.readers {
|
||||
let store_reader = reader.get_store_reader(1)?;
|
||||
if reader.has_deletes()
|
||||
|
||||
@@ -1,20 +1,11 @@
|
||||
use crate::query::Weight;
|
||||
use crate::schema::{Document, Term};
|
||||
use crate::Opstamp;
|
||||
|
||||
/// Timestamped Delete operation.
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub struct DeleteOperation {
|
||||
pub opstamp: Opstamp,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
impl Default for DeleteOperation {
|
||||
fn default() -> Self {
|
||||
DeleteOperation {
|
||||
opstamp: 0u64,
|
||||
term: Term::new(),
|
||||
}
|
||||
}
|
||||
pub target: Box<dyn Weight>,
|
||||
}
|
||||
|
||||
/// Timestamped Add operation.
|
||||
|
||||
Reference in New Issue
Block a user