mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
7 Commits
raphael_be
...
debug-posi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a91973ab0 | ||
|
|
8a7ca64b16 | ||
|
|
6c485bfd8a | ||
|
|
67f53289ef | ||
|
|
f632be8258 | ||
|
|
6847af74ad | ||
|
|
5baa91fdf3 |
@@ -1,3 +1,12 @@
|
||||
Tantivy 0.15.3
|
||||
=========================
|
||||
- Major bugfix. Deleting documents was broken when the index was sorted by a field. (@appaquet, @fulmicoton) #1101
|
||||
|
||||
|
||||
Tantivy 0.15.2
|
||||
========================
|
||||
- Major bugfix. DocStore still panics when a deleted doc is at the beginning of a block. (@appaquet) #1088
|
||||
|
||||
Tantivy 0.15.1
|
||||
=========================
|
||||
- Major bugfix. DocStore panics when first block is deleted. (@appaquet) #1077
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.15.1"
|
||||
version = "0.15.3"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -12,6 +12,11 @@ readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2018"
|
||||
|
||||
|
||||
[[bin]]
|
||||
name = "debug_position"
|
||||
path = "src/debug_position.rs"
|
||||
|
||||
[dependencies]
|
||||
base64 = "0.13"
|
||||
byteorder = "1.4.3"
|
||||
|
||||
@@ -16,7 +16,6 @@ pub use self::index_meta::{
|
||||
pub use self::inverted_index_reader::InvertedIndexReader;
|
||||
pub use self::searcher::Searcher;
|
||||
pub use self::segment::Segment;
|
||||
pub use self::segment::SerializableSegment;
|
||||
pub use self::segment_component::SegmentComponent;
|
||||
pub use self::segment_id::SegmentId;
|
||||
pub use self::segment_reader::SegmentReader;
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
use super::SegmentComponent;
|
||||
use crate::core::Index;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::error::{OpenReadError, OpenWriteError};
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
||||
use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use crate::{core::Index, indexer::doc_id_mapping::DocIdMapping};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@@ -90,20 +89,3 @@ impl Segment {
|
||||
Ok(write)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SerializableSegment {
|
||||
/// Writes a view of a segment by pushing information
|
||||
/// to the `SegmentSerializer`.
|
||||
///
|
||||
/// # Returns
|
||||
/// The number of documents in the segment.
|
||||
///
|
||||
/// doc_id_map is used when index is created and sorted, to map to the new doc_id order.
|
||||
/// It is not used by the `IndexMerger`, since the doc_id_mapping on cross-segments works
|
||||
/// differently
|
||||
fn write(
|
||||
&self,
|
||||
serializer: SegmentSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> crate::Result<u32>;
|
||||
}
|
||||
|
||||
77
src/debug_position.rs
Normal file
77
src/debug_position.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use std::panic;
|
||||
|
||||
use futures::executor::block_on;
|
||||
use tantivy;
|
||||
use tantivy::DocSet;
|
||||
use tantivy::Postings;
|
||||
use tantivy::Searcher;
|
||||
use tantivy::TERMINATED;
|
||||
use tantivy::merge_policy;
|
||||
use tantivy::merge_policy::DefaultMergePolicy;
|
||||
use tantivy::merge_policy::MergePolicy;
|
||||
use tantivy::schema::Field;
|
||||
use tantivy::schema::IndexRecordOption;
|
||||
|
||||
fn test_field(searcher: &Searcher, field: Field) -> tantivy::Result<()> {
|
||||
for segment_reader in searcher.segment_readers() {
|
||||
println!("\n\n====\nsegment {:?}", segment_reader.segment_id());
|
||||
println!("maxdoc {} del {} ", segment_reader.max_doc(), segment_reader.num_deleted_docs());
|
||||
let inv_idx = segment_reader.inverted_index(field)?;
|
||||
let termdict = inv_idx.terms();
|
||||
println!("num terms {}", termdict.num_terms());
|
||||
let mut terms = termdict.stream()?;
|
||||
while terms.advance() {
|
||||
let term_info = terms.value();
|
||||
let mut postings = inv_idx.read_postings_from_terminfo(term_info, tantivy::schema::IndexRecordOption::WithFreqsAndPositions)?;
|
||||
let mut seen_doc = 0;
|
||||
while postings.doc() != TERMINATED {
|
||||
let mut postings_clone= postings.clone();
|
||||
// println!("termord {} seen_doc {} termpositions {:?} docfreq {}", terms.term_ord(), seen_doc, term_info.positions_range, term_info.doc_freq);
|
||||
let mut positions = Vec::new();
|
||||
postings_clone.positions(&mut positions);
|
||||
seen_doc += 1;
|
||||
postings.advance();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> tantivy::Result<()> {
|
||||
let index = tantivy::Index::open_in_dir(".")?;
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
let schema = index.schema();
|
||||
for (field, field_entry) in schema.fields() {
|
||||
let field_type = field_entry.field_type();
|
||||
let has_position = field_type.get_index_record_option()
|
||||
.map(|opt| opt == IndexRecordOption::WithFreqsAndPositions)
|
||||
.unwrap_or(false);
|
||||
if !has_position {
|
||||
continue;
|
||||
}
|
||||
test_field(&*searcher, field)?;
|
||||
}
|
||||
|
||||
|
||||
println!("GC");
|
||||
let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?;
|
||||
block_on(index_writer.garbage_collect_files())?;
|
||||
|
||||
print!("----- validdating checksum");
|
||||
index.validate_checksum()?;
|
||||
|
||||
print!("----- success");
|
||||
|
||||
let default_merge_policy = DefaultMergePolicy::default();
|
||||
let segment_metas = index.searchable_segment_metas()?;
|
||||
let merge_candidates = default_merge_policy.compute_merge_candidates(&segment_metas);
|
||||
println!("{:?}", merge_candidates);
|
||||
for merge_candidate in merge_candidates {
|
||||
println!("merge_candidate {:?}", merge_candidate);
|
||||
let future = index_writer.merge(&merge_candidate.0[..]);
|
||||
let seg = block_on(future)?;
|
||||
println!("seg {:?} ", seg);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,11 +4,12 @@ use crate::common::HasLen;
|
||||
use crate::directory::OwnedBytes;
|
||||
use std::fmt;
|
||||
use std::ops::Range;
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::{io, ops::Deref};
|
||||
|
||||
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + UnwindSafe + RefUnwindSafe + 'static>;
|
||||
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + UnwindSafe + RefUnwindSafe + 'static>;
|
||||
|
||||
/// Objects that represents files sections in tantivy.
|
||||
///
|
||||
@@ -40,7 +41,7 @@ impl<T: Deref<Target = [u8]>> HasLen for T {
|
||||
|
||||
impl<B> From<B> for FileSlice
|
||||
where
|
||||
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
|
||||
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync + UnwindSafe + RefUnwindSafe,
|
||||
{
|
||||
fn from(bytes: B) -> FileSlice {
|
||||
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::fs::OpenOptions;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::io::{BufWriter, Read, Write};
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::Arc;
|
||||
@@ -314,7 +315,7 @@ impl TerminatingWrite for SafeFileWriter {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
|
||||
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync + RefUnwindSafe + UnwindSafe>);
|
||||
|
||||
impl Deref for MmapArc {
|
||||
type Target = [u8];
|
||||
|
||||
@@ -84,11 +84,11 @@ impl BytesFastFieldWriter {
|
||||
&'a self,
|
||||
doc_id_map: Option<&'b DocIdMapping>,
|
||||
) -> impl Iterator<Item = &'b [u8]> {
|
||||
let doc_id_iter = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box<dyn Iterator<Item = u32>>
|
||||
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids())
|
||||
} else {
|
||||
Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32))
|
||||
as Box<dyn Iterator<Item = u32>>
|
||||
let max_doc = self.doc_index.len() as u32;
|
||||
Box::new(0..max_doc)
|
||||
};
|
||||
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
|
||||
}
|
||||
|
||||
@@ -103,11 +103,11 @@ impl MultiValuedFastFieldWriter {
|
||||
&'a self,
|
||||
doc_id_map: Option<&'b DocIdMapping>,
|
||||
) -> impl Iterator<Item = &'b [u64]> {
|
||||
let doc_id_iter = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box<dyn Iterator<Item = u32>>
|
||||
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids())
|
||||
} else {
|
||||
Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32))
|
||||
as Box<dyn Iterator<Item = u32>>
|
||||
let max_doc = self.doc_index.len() as DocId;
|
||||
Box::new(0..max_doc)
|
||||
};
|
||||
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
|
||||
}
|
||||
|
||||
@@ -284,7 +284,7 @@ impl IntFastFieldWriter {
|
||||
let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?;
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
for doc_id in doc_id_map.iter_old_doc_ids() {
|
||||
single_field_serializer.add_val(self.vals.get(*doc_id as usize))?;
|
||||
single_field_serializer.add_val(self.vals.get(doc_id as usize))?;
|
||||
}
|
||||
} else {
|
||||
for val in self.vals.iter() {
|
||||
|
||||
@@ -98,7 +98,7 @@ impl FieldNormsWriter {
|
||||
let mut mapped_fieldnorm_values = vec![];
|
||||
mapped_fieldnorm_values.resize(fieldnorm_values.len(), 0u8);
|
||||
for (new_doc_id, old_doc_id) in doc_id_map.iter_old_doc_ids().enumerate() {
|
||||
mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[*old_doc_id as usize];
|
||||
mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[old_doc_id as usize];
|
||||
}
|
||||
fieldnorms_serializer.serialize_field(field, &mapped_fieldnorm_values)?;
|
||||
} else {
|
||||
|
||||
@@ -24,8 +24,8 @@ impl DocIdMapping {
|
||||
self.new_doc_id_to_old[doc_id as usize]
|
||||
}
|
||||
/// iterate over old doc_ids in order of the new doc_ids
|
||||
pub fn iter_old_doc_ids(&self) -> std::slice::Iter<'_, DocId> {
|
||||
self.new_doc_id_to_old.iter()
|
||||
pub fn iter_old_doc_ids(&self) -> impl Iterator<Item = DocId> + Clone + '_ {
|
||||
self.new_doc_id_to_old.iter().cloned()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,35 +14,27 @@ use crate::Opstamp;
|
||||
// The doc to opstamp mapping stores precisely an array
|
||||
// indexed by doc id and storing the opstamp of the document.
|
||||
//
|
||||
// This mapping is (for the moment) stricly increasing
|
||||
// because of the way document id are allocated.
|
||||
// This mapping is NOT necessarily increasing, because
|
||||
// we might be sorting documents according to a fast field.
|
||||
#[derive(Clone)]
|
||||
pub enum DocToOpstampMapping<'a> {
|
||||
WithMap(&'a [Opstamp]),
|
||||
None,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a [u64]> for DocToOpstampMapping<'a> {
|
||||
fn from(opstamps: &[Opstamp]) -> DocToOpstampMapping {
|
||||
DocToOpstampMapping::WithMap(opstamps)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DocToOpstampMapping<'a> {
|
||||
/// Given an opstamp return the limit doc id L
|
||||
/// such that all doc id D such that
|
||||
// D >= L iff opstamp(D) >= than `target_opstamp`.
|
||||
//
|
||||
// The edge case opstamp = some doc opstamp is in practise
|
||||
// never called.
|
||||
pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId {
|
||||
match *self {
|
||||
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
|
||||
match doc_opstamps.binary_search(&target_opstamp) {
|
||||
Ok(doc_id) | Err(doc_id) => doc_id as DocId,
|
||||
}
|
||||
/// Assess whether a document should be considered deleted given that it contains
|
||||
/// a deleted term that was deleted at the opstamp: `delete_opstamp`.
|
||||
///
|
||||
/// This function returns true if the `DocToOpstamp` mapping is none or if
|
||||
/// the `doc_opstamp` is anterior to the delete opstamp.
|
||||
pub fn is_deleted(&self, doc_id: DocId, delete_opstamp: Opstamp) -> bool {
|
||||
match self {
|
||||
Self::WithMap(doc_opstamps) => {
|
||||
let doc_opstamp = doc_opstamps[doc_id as usize];
|
||||
doc_opstamp < delete_opstamp
|
||||
}
|
||||
DocToOpstampMapping::None => DocId::max_value(),
|
||||
Self::None => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -55,40 +47,17 @@ mod tests {
|
||||
#[test]
|
||||
fn test_doc_to_opstamp_mapping_none() {
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::None;
|
||||
assert_eq!(
|
||||
doc_to_opstamp_mapping.compute_doc_limit(1),
|
||||
u32::max_value()
|
||||
);
|
||||
assert!(doc_to_opstamp_mapping.is_deleted(1u32, 0u64));
|
||||
assert!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_doc_to_opstamp_mapping_complex() {
|
||||
{
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[][..]);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 0);
|
||||
}
|
||||
{
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[1u64][..]);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 1);
|
||||
}
|
||||
{
|
||||
let doc_to_opstamp_mapping =
|
||||
DocToOpstampMapping::from(&[1u64, 12u64, 17u64, 23u64][..]);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
for i in 2u64..13u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 1);
|
||||
}
|
||||
for i in 13u64..18u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 2);
|
||||
}
|
||||
for i in 18u64..24u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 3);
|
||||
}
|
||||
for i in 24u64..30u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 4);
|
||||
}
|
||||
}
|
||||
fn test_doc_to_opstamp_mapping_with_map() {
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::WithMap(&[5u64, 1u64, 0u64, 4u64, 3u64]);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(0u32, 2u64), false);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64), true);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(2u32, 2u64), true);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(3u32, 2u64), false);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(4u32, 2u64), false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,22 +106,18 @@ fn compute_deleted_bitset(
|
||||
}
|
||||
|
||||
// A delete operation should only affect
|
||||
// document that were inserted after it.
|
||||
//
|
||||
// Limit doc helps identify the first document
|
||||
// that may be affected by the delete operation.
|
||||
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
|
||||
// document that were inserted before it.
|
||||
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
|
||||
if let Some(mut docset) =
|
||||
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
|
||||
{
|
||||
let mut deleted_doc = docset.doc();
|
||||
while deleted_doc != TERMINATED {
|
||||
if deleted_doc < limit_doc {
|
||||
delete_bitset.insert(deleted_doc);
|
||||
let mut doc_matching_deleted_term = docset.doc();
|
||||
while doc_matching_deleted_term != TERMINATED {
|
||||
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
|
||||
delete_bitset.insert(doc_matching_deleted_term);
|
||||
might_have_changed = true;
|
||||
}
|
||||
deleted_doc = docset.advance();
|
||||
doc_matching_deleted_term = docset.advance();
|
||||
}
|
||||
}
|
||||
delete_cursor.advance();
|
||||
@@ -230,14 +226,8 @@ fn index_documents(
|
||||
|
||||
let segment_with_max_doc = segment.with_max_doc(max_doc);
|
||||
|
||||
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
|
||||
|
||||
let delete_bitset_opt = apply_deletes(
|
||||
&segment_with_max_doc,
|
||||
&mut delete_cursor,
|
||||
&doc_opstamps,
|
||||
last_docstamp,
|
||||
)?;
|
||||
let delete_bitset_opt =
|
||||
apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
|
||||
|
||||
let meta = segment_with_max_doc.meta().clone();
|
||||
meta.untrack_temp_docstore();
|
||||
@@ -247,19 +237,26 @@ fn index_documents(
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// `doc_opstamps` is required to be non-empty.
|
||||
fn apply_deletes(
|
||||
segment: &Segment,
|
||||
mut delete_cursor: &mut DeleteCursor,
|
||||
doc_opstamps: &[Opstamp],
|
||||
last_docstamp: Opstamp,
|
||||
) -> crate::Result<Option<BitSet>> {
|
||||
if delete_cursor.get().is_none() {
|
||||
// if there are no delete operation in the queue, no need
|
||||
// to even open the segment.
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let max_doc_opstamp: Opstamp = doc_opstamps
|
||||
.iter()
|
||||
.cloned()
|
||||
.max()
|
||||
.expect("Empty DocOpstamp is forbidden");
|
||||
|
||||
let segment_reader = SegmentReader::open(segment)?;
|
||||
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
|
||||
let doc_to_opstamps = DocToOpstampMapping::WithMap(doc_opstamps);
|
||||
|
||||
let max_doc = segment.meta().max_doc();
|
||||
let mut deleted_bitset = BitSet::with_max_value(max_doc);
|
||||
@@ -268,7 +265,7 @@ fn apply_deletes(
|
||||
&segment_reader,
|
||||
&mut delete_cursor,
|
||||
&doc_to_opstamps,
|
||||
last_docstamp,
|
||||
max_doc_opstamp,
|
||||
)?;
|
||||
Ok(if may_have_deletes {
|
||||
Some(deleted_bitset)
|
||||
@@ -784,17 +781,22 @@ impl Drop for IndexWriter {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use proptest::prelude::*;
|
||||
use proptest::prop_oneof;
|
||||
use proptest::strategy::Strategy;
|
||||
|
||||
use super::super::operation::UserOperation;
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::error::*;
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::TermQuery;
|
||||
use crate::schema::{self, IndexRecordOption, STRING};
|
||||
use crate::schema::{self, IndexRecordOption, FAST, INDEXED, STRING};
|
||||
use crate::Index;
|
||||
use crate::ReloadPolicy;
|
||||
use crate::Term;
|
||||
use crate::{IndexSettings, IndexSortByField, Order};
|
||||
|
||||
#[test]
|
||||
fn test_operations_group() {
|
||||
@@ -1282,6 +1284,177 @@ mod tests {
|
||||
assert!(commit_again.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_with_sort_by_field() -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let id_field =
|
||||
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "id".to_string(),
|
||||
order: Order::Desc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let index_reader = index.reader()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
// create and delete docs in same commit
|
||||
for id in 0u64..5u64 {
|
||||
index_writer.add_document(doc!(id_field => id));
|
||||
}
|
||||
for id in 2u64..4u64 {
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||
}
|
||||
for id in 5u64..10u64 {
|
||||
index_writer.add_document(doc!(id_field => id));
|
||||
}
|
||||
index_writer.commit()?;
|
||||
index_reader.reload()?;
|
||||
|
||||
let searcher = index_reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
assert_eq!(segment_reader.num_docs(), 8);
|
||||
assert_eq!(segment_reader.max_doc(), 10);
|
||||
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
|
||||
let in_order_alive_ids: Vec<u64> = segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(|doc| fast_field_reader.get(doc))
|
||||
.collect();
|
||||
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 1, 0]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum IndexingOp {
|
||||
AddDoc { id: u64 },
|
||||
DeleteDoc { id: u64 },
|
||||
}
|
||||
|
||||
fn operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||
prop_oneof![
|
||||
(0u64..10u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
(0u64..10u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||
]
|
||||
}
|
||||
|
||||
fn expected_ids(ops: &[IndexingOp]) -> Vec<u64> {
|
||||
let mut ids = Vec::new();
|
||||
for &op in ops {
|
||||
match op {
|
||||
IndexingOp::AddDoc { id } => {
|
||||
ids.push(id);
|
||||
}
|
||||
IndexingOp::DeleteDoc { id } => {
|
||||
ids.retain(|&id_val| id_val != id);
|
||||
}
|
||||
}
|
||||
}
|
||||
ids.sort();
|
||||
ids
|
||||
}
|
||||
|
||||
fn test_operation_strategy(ops: &[IndexingOp]) -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "id".to_string(),
|
||||
order: Order::Asc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
for &op in ops {
|
||||
match op {
|
||||
IndexingOp::AddDoc { id } => {
|
||||
index_writer.add_document(doc!(id_field=>id));
|
||||
}
|
||||
IndexingOp::DeleteDoc { id } => {
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||
}
|
||||
}
|
||||
}
|
||||
index_writer.commit()?;
|
||||
let searcher = index.reader()?.searcher();
|
||||
let ids: Vec<u64> = searcher
|
||||
.segment_readers()
|
||||
.iter()
|
||||
.flat_map(|segment_reader| {
|
||||
let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
|
||||
segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(move |doc| ff_reader.get(doc))
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(ids, expected_ids(ops));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_delete_with_sort_proptest(ops in proptest::collection::vec(operation_strategy(), 1..10)) {
|
||||
assert!(test_operation_strategy(&ops[..]).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_with_sort_by_field_last_opstamp_is_not_max() -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let sort_by_field = schema_builder.add_u64_field("sort_by", FAST);
|
||||
let id_field = schema_builder.add_u64_field("id", INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "sort_by".to_string(),
|
||||
order: Order::Asc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
// We add a doc...
|
||||
index_writer.add_document(doc!(sort_by_field => 2u64, id_field => 0u64));
|
||||
// And remove it.
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, 0u64));
|
||||
// We add another doc.
|
||||
index_writer.add_document(doc!(sort_by_field=>1u64, id_field => 0u64));
|
||||
|
||||
// The expected result is a segment with
|
||||
// maxdoc = 2
|
||||
// numdoc = 1.
|
||||
index_writer.commit()?;
|
||||
|
||||
let searcher = index.reader()?.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
assert_eq!(segment_reader.max_doc(), 2);
|
||||
assert_eq!(segment_reader.num_deleted_docs(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_doc_missing_field() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use super::doc_id_mapping::DocIdMapping;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
@@ -18,11 +17,11 @@ use crate::schema::{Field, Schema};
|
||||
use crate::store::StoreWriter;
|
||||
use crate::termdict::TermMerger;
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::IndexSortByField;
|
||||
use crate::{common::HasLen, fastfield::MultiValueLength};
|
||||
use crate::{common::MAX_DOC_LIMIT, IndexSettings};
|
||||
use crate::{core::Segment, indexer::doc_id_mapping::expect_field_id_for_sort_field};
|
||||
use crate::{core::SegmentReader, Order};
|
||||
use crate::{core::SerializableSegment, IndexSortByField};
|
||||
use crate::{
|
||||
docset::{DocSet, TERMINATED},
|
||||
SegmentOrdinal,
|
||||
@@ -928,14 +927,14 @@ impl IndexMerger {
|
||||
// I think this is not strictly necessary, it would be possible to
|
||||
// avoid the loading into a vec via some form of kmerge, but then the merge
|
||||
// logic would deviate much more from the stacking case (unsorted index)
|
||||
let delta_positions = delta_computer.compute_delta(&positions_buffer);
|
||||
if doc_id_mapping.is_some() {
|
||||
doc_id_and_positions.push((
|
||||
remapped_doc_id,
|
||||
term_freq,
|
||||
positions_buffer.to_vec(),
|
||||
delta_positions.to_vec(),
|
||||
));
|
||||
} else {
|
||||
let delta_positions = delta_computer.compute_delta(&positions_buffer);
|
||||
field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions);
|
||||
}
|
||||
}
|
||||
@@ -1042,14 +1041,13 @@ impl IndexMerger {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SerializableSegment for IndexMerger {
|
||||
fn write(
|
||||
&self,
|
||||
mut serializer: SegmentSerializer,
|
||||
_: Option<&DocIdMapping>,
|
||||
) -> crate::Result<u32> {
|
||||
/// Writes the merged segment by pushing information
|
||||
/// to the `SegmentSerializer`.
|
||||
///
|
||||
/// # Returns
|
||||
/// The number of documents in the resulting segment.
|
||||
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
|
||||
let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref()
|
||||
{
|
||||
// If the documents are already sorted and stackable, we ignore the mapping and execute
|
||||
|
||||
@@ -5,7 +5,6 @@ use crate::core::IndexSettings;
|
||||
use crate::core::Segment;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::SerializableSegment;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
@@ -140,7 +139,7 @@ fn merge(
|
||||
// ... we just serialize this index merger in our new segment to merge the segments.
|
||||
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?;
|
||||
|
||||
let num_docs = merger.write(segment_serializer, None)?;
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
|
||||
let merged_segment_id = merged_segment.id();
|
||||
|
||||
@@ -209,7 +208,7 @@ pub fn merge_segments<Dir: Directory>(
|
||||
&segments[..],
|
||||
)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
|
||||
let num_docs = merger.write(segment_serializer, None)?;
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
|
||||
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
|
||||
|
||||
|
||||
@@ -12,12 +12,12 @@ use crate::schema::Schema;
|
||||
use crate::schema::Term;
|
||||
use crate::schema::Value;
|
||||
use crate::schema::{Field, FieldEntry};
|
||||
use crate::store::StoreReader;
|
||||
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
||||
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
||||
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
||||
use crate::Opstamp;
|
||||
use crate::{core::Segment, store::StoreWriter};
|
||||
use crate::{core::SerializableSegment, store::StoreReader};
|
||||
use crate::{DocId, SegmentComponent};
|
||||
|
||||
/// Computes the initial size of the hash table.
|
||||
@@ -36,6 +36,20 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
|
||||
}
|
||||
}
|
||||
|
||||
fn remap_doc_opstamps(
|
||||
opstamps: Vec<Opstamp>,
|
||||
doc_id_mapping_opt: Option<&DocIdMapping>,
|
||||
) -> Vec<Opstamp> {
|
||||
if let Some(doc_id_mapping_opt) = doc_id_mapping_opt {
|
||||
doc_id_mapping_opt
|
||||
.iter_old_doc_ids()
|
||||
.map(|doc| opstamps[doc as usize])
|
||||
.collect()
|
||||
} else {
|
||||
opstamps
|
||||
}
|
||||
}
|
||||
|
||||
/// A `SegmentWriter` is in charge of creating segment index from a
|
||||
/// set of documents.
|
||||
///
|
||||
@@ -112,14 +126,15 @@ impl SegmentWriter {
|
||||
.clone()
|
||||
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
|
||||
.transpose()?;
|
||||
write(
|
||||
remap_and_write(
|
||||
&self.multifield_postings,
|
||||
&self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
self.segment_serializer,
|
||||
mapping.as_ref(),
|
||||
)?;
|
||||
Ok(self.doc_opstamps)
|
||||
let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping.as_ref());
|
||||
Ok(doc_opstamps)
|
||||
}
|
||||
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
@@ -315,8 +330,12 @@ impl SegmentWriter {
|
||||
}
|
||||
}
|
||||
|
||||
// This method is used as a trick to workaround the borrow checker
|
||||
fn write(
|
||||
/// This method is used as a trick to workaround the borrow checker
|
||||
/// Writes a view of a segment by pushing information
|
||||
/// to the `SegmentSerializer`.
|
||||
///
|
||||
/// `doc_id_map` is used to map to the new doc_id order.
|
||||
fn remap_and_write(
|
||||
multifield_postings: &MultiFieldPostingsWriter,
|
||||
fast_field_writers: &FastFieldsWriter,
|
||||
fieldnorms_writer: &FieldNormsWriter,
|
||||
@@ -340,6 +359,7 @@ fn write(
|
||||
&term_ord_map,
|
||||
doc_id_map,
|
||||
)?;
|
||||
|
||||
// finalize temp docstore and create version, which reflects the doc_id_map
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
let store_write = serializer
|
||||
@@ -356,31 +376,16 @@ fn write(
|
||||
.segment()
|
||||
.open_read(SegmentComponent::TempStore)?,
|
||||
)?;
|
||||
|
||||
for old_doc_id in doc_id_map.iter_old_doc_ids() {
|
||||
let doc_bytes = store_read.get_document_bytes(*old_doc_id)?;
|
||||
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
|
||||
serializer.get_store_writer().store_bytes(&doc_bytes)?;
|
||||
}
|
||||
}
|
||||
serializer.close()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl SerializableSegment for SegmentWriter {
|
||||
fn write(
|
||||
&self,
|
||||
serializer: SegmentSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> crate::Result<u32> {
|
||||
let max_doc = self.max_doc;
|
||||
write(
|
||||
&self.multifield_postings,
|
||||
&self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
serializer,
|
||||
doc_id_map,
|
||||
)?;
|
||||
Ok(max_doc)
|
||||
}
|
||||
serializer.close()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -133,12 +133,22 @@ pub mod tests {
|
||||
format!("Doc {}", i)
|
||||
);
|
||||
}
|
||||
|
||||
for (_, doc) in store.iter(Some(&delete_bitset)).enumerate() {
|
||||
let doc = doc?;
|
||||
let title_content = doc.get_first(field_title).unwrap().text().unwrap();
|
||||
if !title_content.starts_with("Doc ") {
|
||||
panic!("unexpected title_content {}", title_content);
|
||||
}
|
||||
|
||||
let id = title_content
|
||||
.strip_prefix("Doc ")
|
||||
.unwrap()
|
||||
.parse::<u32>()
|
||||
.unwrap();
|
||||
if delete_bitset.is_deleted(id) {
|
||||
panic!("unexpected deleted document {}", id);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -171,11 +171,6 @@ impl StoreReader {
|
||||
.filter_map(move |doc_id| {
|
||||
// filter_map is only used to resolve lifetime issues between the two closures on
|
||||
// the outer variables
|
||||
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
|
||||
if !alive {
|
||||
// we keep the number of skipped documents to move forward in the map block
|
||||
num_skipped += 1;
|
||||
}
|
||||
|
||||
// check move to next checkpoint
|
||||
if doc_id >= curr_checkpoint.as_ref().unwrap().doc_range.end {
|
||||
@@ -187,6 +182,7 @@ impl StoreReader {
|
||||
num_skipped = 0;
|
||||
}
|
||||
|
||||
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
|
||||
if alive {
|
||||
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
|
||||
// the map block will move over the num_skipped, so we reset to 0
|
||||
@@ -194,6 +190,8 @@ impl StoreReader {
|
||||
reset_block_pos = false;
|
||||
ret
|
||||
} else {
|
||||
// we keep the number of skipped documents to move forward in the map block
|
||||
num_skipped += 1;
|
||||
None
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user