Compare commits

...

7 Commits

Author SHA1 Message Date
Andre-Philippe Paquet
1a91973ab0 fix merge delta positions with remapping 2021-08-02 21:04:11 -04:00
Paul Masurel
8a7ca64b16 Added merge 2021-08-02 07:29:49 -04:00
Paul Masurel
6c485bfd8a added tool to detect position corruption 2021-08-02 07:29:48 -04:00
Paul Masurel
67f53289ef Bumped tantivy version to 0.15.3 in Cargo.toml 2021-06-30 16:25:59 +09:00
Paul Masurel
f632be8258 Closes #1101 fix delete documents with sort by field
Closes #1101

* fix delete documents with sort by field

Co-authored-by: Andre-Philippe Paquet <appaquet@gmail.com>
2021-06-30 16:22:32 +09:00
Paul Masurel
6847af74ad Hotfix 0.15.2 2021-06-16 22:15:55 +09:00
Andre-Philippe Paquet
5baa91fdf3 fix store reader iterator, take 2 2021-06-16 22:13:19 +09:00
19 changed files with 382 additions and 156 deletions

View File

@@ -1,3 +1,12 @@
Tantivy 0.15.3
=========================
- Major bugfix. Deleting documents was broken when the index was sorted by a field. (@appaquet, @fulmicoton) #1101
Tantivy 0.15.2
========================
- Major bugfix. DocStore still panics when a deleted doc is at the beginning of a block. (@appaquet) #1088
Tantivy 0.15.1
=========================
- Major bugfix. DocStore panics when first block is deleted. (@appaquet) #1077

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.15.1"
version = "0.15.3"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -12,6 +12,11 @@ readme = "README.md"
keywords = ["search", "information", "retrieval"]
edition = "2018"
[[bin]]
name = "debug_position"
path = "src/debug_position.rs"
[dependencies]
base64 = "0.13"
byteorder = "1.4.3"

View File

@@ -16,7 +16,6 @@ pub use self::index_meta::{
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::searcher::Searcher;
pub use self::segment::Segment;
pub use self::segment::SerializableSegment;
pub use self::segment_component::SegmentComponent;
pub use self::segment_id::SegmentId;
pub use self::segment_reader::SegmentReader;

View File

@@ -1,13 +1,12 @@
use super::SegmentComponent;
use crate::core::Index;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::directory::error::{OpenReadError, OpenWriteError};
use crate::directory::Directory;
use crate::directory::{FileSlice, WritePtr};
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::schema::Schema;
use crate::Opstamp;
use crate::{core::Index, indexer::doc_id_mapping::DocIdMapping};
use std::fmt;
use std::path::PathBuf;
@@ -90,20 +89,3 @@ impl Segment {
Ok(write)
}
}
pub trait SerializableSegment {
/// Writes a view of a segment by pushing information
/// to the `SegmentSerializer`.
///
/// # Returns
/// The number of documents in the segment.
///
/// doc_id_map is used when index is created and sorted, to map to the new doc_id order.
/// It is not used by the `IndexMerger`, since the doc_id_mapping on cross-segments works
/// differently
fn write(
&self,
serializer: SegmentSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<u32>;
}

77
src/debug_position.rs Normal file
View File

@@ -0,0 +1,77 @@
use std::panic;
use futures::executor::block_on;
use tantivy;
use tantivy::DocSet;
use tantivy::Postings;
use tantivy::Searcher;
use tantivy::TERMINATED;
use tantivy::merge_policy;
use tantivy::merge_policy::DefaultMergePolicy;
use tantivy::merge_policy::MergePolicy;
use tantivy::schema::Field;
use tantivy::schema::IndexRecordOption;
fn test_field(searcher: &Searcher, field: Field) -> tantivy::Result<()> {
for segment_reader in searcher.segment_readers() {
println!("\n\n====\nsegment {:?}", segment_reader.segment_id());
println!("maxdoc {} del {} ", segment_reader.max_doc(), segment_reader.num_deleted_docs());
let inv_idx = segment_reader.inverted_index(field)?;
let termdict = inv_idx.terms();
println!("num terms {}", termdict.num_terms());
let mut terms = termdict.stream()?;
while terms.advance() {
let term_info = terms.value();
let mut postings = inv_idx.read_postings_from_terminfo(term_info, tantivy::schema::IndexRecordOption::WithFreqsAndPositions)?;
let mut seen_doc = 0;
while postings.doc() != TERMINATED {
let mut postings_clone= postings.clone();
// println!("termord {} seen_doc {} termpositions {:?} docfreq {}", terms.term_ord(), seen_doc, term_info.positions_range, term_info.doc_freq);
let mut positions = Vec::new();
postings_clone.positions(&mut positions);
seen_doc += 1;
postings.advance();
}
}
}
Ok(())
}
fn main() -> tantivy::Result<()> {
let index = tantivy::Index::open_in_dir(".")?;
let reader = index.reader()?;
let searcher = reader.searcher();
let schema = index.schema();
for (field, field_entry) in schema.fields() {
let field_type = field_entry.field_type();
let has_position = field_type.get_index_record_option()
.map(|opt| opt == IndexRecordOption::WithFreqsAndPositions)
.unwrap_or(false);
if !has_position {
continue;
}
test_field(&*searcher, field)?;
}
println!("GC");
let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?;
block_on(index_writer.garbage_collect_files())?;
print!("----- validdating checksum");
index.validate_checksum()?;
print!("----- success");
let default_merge_policy = DefaultMergePolicy::default();
let segment_metas = index.searchable_segment_metas()?;
let merge_candidates = default_merge_policy.compute_merge_candidates(&segment_metas);
println!("{:?}", merge_candidates);
for merge_candidate in merge_candidates {
println!("merge_candidate {:?}", merge_candidate);
let future = index_writer.merge(&merge_candidate.0[..]);
let seg = block_on(future)?;
println!("seg {:?} ", seg);
}
Ok(())
}

View File

@@ -4,11 +4,12 @@ use crate::common::HasLen;
use crate::directory::OwnedBytes;
use std::fmt;
use std::ops::Range;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::{Arc, Weak};
use std::{io, ops::Deref};
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + UnwindSafe + RefUnwindSafe + 'static>;
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + UnwindSafe + RefUnwindSafe + 'static>;
/// Objects that represents files sections in tantivy.
///
@@ -40,7 +41,7 @@ impl<T: Deref<Target = [u8]>> HasLen for T {
impl<B> From<B> for FileSlice
where
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync + UnwindSafe + RefUnwindSafe,
{
fn from(bytes: B) -> FileSlice {
FileSlice::new(Box::new(OwnedBytes::new(bytes)))

View File

@@ -20,6 +20,7 @@ use std::fs::OpenOptions;
use std::fs::{self, File};
use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::path::{Path, PathBuf};
use std::result;
use std::sync::Arc;
@@ -314,7 +315,7 @@ impl TerminatingWrite for SafeFileWriter {
}
#[derive(Clone)]
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync + RefUnwindSafe + UnwindSafe>);
impl Deref for MmapArc {
type Target = [u8];

View File

@@ -84,11 +84,11 @@ impl BytesFastFieldWriter {
&'a self,
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [u8]> {
let doc_id_iter = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box<dyn Iterator<Item = u32>>
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids())
} else {
Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32))
as Box<dyn Iterator<Item = u32>>
let max_doc = self.doc_index.len() as u32;
Box::new(0..max_doc)
};
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
}

View File

@@ -103,11 +103,11 @@ impl MultiValuedFastFieldWriter {
&'a self,
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [u64]> {
let doc_id_iter = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box<dyn Iterator<Item = u32>>
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids())
} else {
Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32))
as Box<dyn Iterator<Item = u32>>
let max_doc = self.doc_index.len() as DocId;
Box::new(0..max_doc)
};
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
}

View File

@@ -284,7 +284,7 @@ impl IntFastFieldWriter {
let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?;
if let Some(doc_id_map) = doc_id_map {
for doc_id in doc_id_map.iter_old_doc_ids() {
single_field_serializer.add_val(self.vals.get(*doc_id as usize))?;
single_field_serializer.add_val(self.vals.get(doc_id as usize))?;
}
} else {
for val in self.vals.iter() {

View File

@@ -98,7 +98,7 @@ impl FieldNormsWriter {
let mut mapped_fieldnorm_values = vec![];
mapped_fieldnorm_values.resize(fieldnorm_values.len(), 0u8);
for (new_doc_id, old_doc_id) in doc_id_map.iter_old_doc_ids().enumerate() {
mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[*old_doc_id as usize];
mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[old_doc_id as usize];
}
fieldnorms_serializer.serialize_field(field, &mapped_fieldnorm_values)?;
} else {

View File

@@ -24,8 +24,8 @@ impl DocIdMapping {
self.new_doc_id_to_old[doc_id as usize]
}
/// iterate over old doc_ids in order of the new doc_ids
pub fn iter_old_doc_ids(&self) -> std::slice::Iter<'_, DocId> {
self.new_doc_id_to_old.iter()
pub fn iter_old_doc_ids(&self) -> impl Iterator<Item = DocId> + Clone + '_ {
self.new_doc_id_to_old.iter().cloned()
}
}

View File

@@ -14,35 +14,27 @@ use crate::Opstamp;
// The doc to opstamp mapping stores precisely an array
// indexed by doc id and storing the opstamp of the document.
//
// This mapping is (for the moment) stricly increasing
// because of the way document id are allocated.
// This mapping is NOT necessarily increasing, because
// we might be sorting documents according to a fast field.
#[derive(Clone)]
pub enum DocToOpstampMapping<'a> {
WithMap(&'a [Opstamp]),
None,
}
impl<'a> From<&'a [u64]> for DocToOpstampMapping<'a> {
fn from(opstamps: &[Opstamp]) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(opstamps)
}
}
impl<'a> DocToOpstampMapping<'a> {
/// Given an opstamp return the limit doc id L
/// such that all doc id D such that
// D >= L iff opstamp(D) >= than `target_opstamp`.
//
// The edge case opstamp = some doc opstamp is in practise
// never called.
pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId {
match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&target_opstamp) {
Ok(doc_id) | Err(doc_id) => doc_id as DocId,
}
/// Assess whether a document should be considered deleted given that it contains
/// a deleted term that was deleted at the opstamp: `delete_opstamp`.
///
/// This function returns true if the `DocToOpstamp` mapping is none or if
/// the `doc_opstamp` is anterior to the delete opstamp.
pub fn is_deleted(&self, doc_id: DocId, delete_opstamp: Opstamp) -> bool {
match self {
Self::WithMap(doc_opstamps) => {
let doc_opstamp = doc_opstamps[doc_id as usize];
doc_opstamp < delete_opstamp
}
DocToOpstampMapping::None => DocId::max_value(),
Self::None => true,
}
}
}
@@ -55,40 +47,17 @@ mod tests {
#[test]
fn test_doc_to_opstamp_mapping_none() {
let doc_to_opstamp_mapping = DocToOpstampMapping::None;
assert_eq!(
doc_to_opstamp_mapping.compute_doc_limit(1),
u32::max_value()
);
assert!(doc_to_opstamp_mapping.is_deleted(1u32, 0u64));
assert!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64));
}
#[test]
fn test_doc_to_opstamp_mapping_complex() {
{
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[][..]);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 0);
}
{
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[1u64][..]);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 1);
}
{
let doc_to_opstamp_mapping =
DocToOpstampMapping::from(&[1u64, 12u64, 17u64, 23u64][..]);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
for i in 2u64..13u64 {
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 1);
}
for i in 13u64..18u64 {
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 2);
}
for i in 18u64..24u64 {
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 3);
}
for i in 24u64..30u64 {
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 4);
}
}
fn test_doc_to_opstamp_mapping_with_map() {
let doc_to_opstamp_mapping = DocToOpstampMapping::WithMap(&[5u64, 1u64, 0u64, 4u64, 3u64]);
assert_eq!(doc_to_opstamp_mapping.is_deleted(0u32, 2u64), false);
assert_eq!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64), true);
assert_eq!(doc_to_opstamp_mapping.is_deleted(2u32, 2u64), true);
assert_eq!(doc_to_opstamp_mapping.is_deleted(3u32, 2u64), false);
assert_eq!(doc_to_opstamp_mapping.is_deleted(4u32, 2u64), false);
}
}

View File

@@ -106,22 +106,18 @@ fn compute_deleted_bitset(
}
// A delete operation should only affect
// document that were inserted after it.
//
// Limit doc helps identify the first document
// that may be affected by the delete operation.
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
// document that were inserted before it.
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
if let Some(mut docset) =
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
{
let mut deleted_doc = docset.doc();
while deleted_doc != TERMINATED {
if deleted_doc < limit_doc {
delete_bitset.insert(deleted_doc);
let mut doc_matching_deleted_term = docset.doc();
while doc_matching_deleted_term != TERMINATED {
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
delete_bitset.insert(doc_matching_deleted_term);
might_have_changed = true;
}
deleted_doc = docset.advance();
doc_matching_deleted_term = docset.advance();
}
}
delete_cursor.advance();
@@ -230,14 +226,8 @@ fn index_documents(
let segment_with_max_doc = segment.with_max_doc(max_doc);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
let delete_bitset_opt = apply_deletes(
&segment_with_max_doc,
&mut delete_cursor,
&doc_opstamps,
last_docstamp,
)?;
let delete_bitset_opt =
apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
let meta = segment_with_max_doc.meta().clone();
meta.untrack_temp_docstore();
@@ -247,19 +237,26 @@ fn index_documents(
Ok(true)
}
/// `doc_opstamps` is required to be non-empty.
fn apply_deletes(
segment: &Segment,
mut delete_cursor: &mut DeleteCursor,
doc_opstamps: &[Opstamp],
last_docstamp: Opstamp,
) -> crate::Result<Option<BitSet>> {
if delete_cursor.get().is_none() {
// if there are no delete operation in the queue, no need
// to even open the segment.
return Ok(None);
}
let max_doc_opstamp: Opstamp = doc_opstamps
.iter()
.cloned()
.max()
.expect("Empty DocOpstamp is forbidden");
let segment_reader = SegmentReader::open(segment)?;
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let doc_to_opstamps = DocToOpstampMapping::WithMap(doc_opstamps);
let max_doc = segment.meta().max_doc();
let mut deleted_bitset = BitSet::with_max_value(max_doc);
@@ -268,7 +265,7 @@ fn apply_deletes(
&segment_reader,
&mut delete_cursor,
&doc_to_opstamps,
last_docstamp,
max_doc_opstamp,
)?;
Ok(if may_have_deletes {
Some(deleted_bitset)
@@ -784,17 +781,22 @@ impl Drop for IndexWriter {
#[cfg(test)]
mod tests {
use proptest::prelude::*;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
use super::super::operation::UserOperation;
use crate::collector::TopDocs;
use crate::directory::error::LockError;
use crate::error::*;
use crate::fastfield::FastFieldReader;
use crate::indexer::NoMergePolicy;
use crate::query::TermQuery;
use crate::schema::{self, IndexRecordOption, STRING};
use crate::schema::{self, IndexRecordOption, FAST, INDEXED, STRING};
use crate::Index;
use crate::ReloadPolicy;
use crate::Term;
use crate::{IndexSettings, IndexSortByField, Order};
#[test]
fn test_operations_group() {
@@ -1282,6 +1284,177 @@ mod tests {
assert!(commit_again.is_ok());
}
#[test]
fn test_delete_with_sort_by_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field =
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Desc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let index_reader = index.reader()?;
let mut index_writer = index.writer_for_tests()?;
// create and delete docs in same commit
for id in 0u64..5u64 {
index_writer.add_document(doc!(id_field => id));
}
for id in 2u64..4u64 {
index_writer.delete_term(Term::from_field_u64(id_field, id));
}
for id in 5u64..10u64 {
index_writer.add_document(doc!(id_field => id));
}
index_writer.commit()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.num_docs(), 8);
assert_eq!(segment_reader.max_doc(), 10);
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.map(|doc| fast_field_reader.get(doc))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 1, 0]);
Ok(())
}
#[derive(Debug, Clone, Copy)]
enum IndexingOp {
AddDoc { id: u64 },
DeleteDoc { id: u64 },
}
fn operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
(0u64..10u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
(0u64..10u64).prop_map(|id| IndexingOp::AddDoc { id }),
]
}
fn expected_ids(ops: &[IndexingOp]) -> Vec<u64> {
let mut ids = Vec::new();
for &op in ops {
match op {
IndexingOp::AddDoc { id } => {
ids.push(id);
}
IndexingOp::DeleteDoc { id } => {
ids.retain(|&id_val| id_val != id);
}
}
}
ids.sort();
ids
}
fn test_operation_strategy(ops: &[IndexingOp]) -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Asc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
for &op in ops {
match op {
IndexingOp::AddDoc { id } => {
index_writer.add_document(doc!(id_field=>id));
}
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
}
}
}
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let ids: Vec<u64> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
segment_reader
.doc_ids_alive()
.map(move |doc| ff_reader.get(doc))
})
.collect();
assert_eq!(ids, expected_ids(ops));
Ok(())
}
proptest! {
#[test]
fn test_delete_with_sort_proptest(ops in proptest::collection::vec(operation_strategy(), 1..10)) {
assert!(test_operation_strategy(&ops[..]).is_ok());
}
}
#[test]
fn test_delete_with_sort_by_field_last_opstamp_is_not_max() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let sort_by_field = schema_builder.add_u64_field("sort_by", FAST);
let id_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "sort_by".to_string(),
order: Order::Asc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
// We add a doc...
index_writer.add_document(doc!(sort_by_field => 2u64, id_field => 0u64));
// And remove it.
index_writer.delete_term(Term::from_field_u64(id_field, 0u64));
// We add another doc.
index_writer.add_document(doc!(sort_by_field=>1u64, id_field => 0u64));
// The expected result is a segment with
// maxdoc = 2
// numdoc = 1.
index_writer.commit()?;
let searcher = index.reader()?.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.max_doc(), 2);
assert_eq!(segment_reader.num_deleted_docs(), 1);
Ok(())
}
#[test]
fn test_index_doc_missing_field() {
let mut schema_builder = schema::Schema::builder();

View File

@@ -1,4 +1,3 @@
use super::doc_id_mapping::DocIdMapping;
use crate::error::DataCorruption;
use crate::fastfield::CompositeFastFieldSerializer;
use crate::fastfield::DeleteBitSet;
@@ -18,11 +17,11 @@ use crate::schema::{Field, Schema};
use crate::store::StoreWriter;
use crate::termdict::TermMerger;
use crate::termdict::TermOrdinal;
use crate::IndexSortByField;
use crate::{common::HasLen, fastfield::MultiValueLength};
use crate::{common::MAX_DOC_LIMIT, IndexSettings};
use crate::{core::Segment, indexer::doc_id_mapping::expect_field_id_for_sort_field};
use crate::{core::SegmentReader, Order};
use crate::{core::SerializableSegment, IndexSortByField};
use crate::{
docset::{DocSet, TERMINATED},
SegmentOrdinal,
@@ -928,14 +927,14 @@ impl IndexMerger {
// I think this is not strictly necessary, it would be possible to
// avoid the loading into a vec via some form of kmerge, but then the merge
// logic would deviate much more from the stacking case (unsorted index)
let delta_positions = delta_computer.compute_delta(&positions_buffer);
if doc_id_mapping.is_some() {
doc_id_and_positions.push((
remapped_doc_id,
term_freq,
positions_buffer.to_vec(),
delta_positions.to_vec(),
));
} else {
let delta_positions = delta_computer.compute_delta(&positions_buffer);
field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions);
}
}
@@ -1042,14 +1041,13 @@ impl IndexMerger {
}
Ok(())
}
}
impl SerializableSegment for IndexMerger {
fn write(
&self,
mut serializer: SegmentSerializer,
_: Option<&DocIdMapping>,
) -> crate::Result<u32> {
/// Writes the merged segment by pushing information
/// to the `SegmentSerializer`.
///
/// # Returns
/// The number of documents in the resulting segment.
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref()
{
// If the documents are already sorted and stackable, we ignore the mapping and execute

View File

@@ -5,7 +5,6 @@ use crate::core::IndexSettings;
use crate::core::Segment;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SerializableSegment;
use crate::core::META_FILEPATH;
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::indexer::delete_queue::DeleteCursor;
@@ -140,7 +139,7 @@ fn merge(
// ... we just serialize this index merger in our new segment to merge the segments.
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?;
let num_docs = merger.write(segment_serializer, None)?;
let num_docs = merger.write(segment_serializer)?;
let merged_segment_id = merged_segment.id();
@@ -209,7 +208,7 @@ pub fn merge_segments<Dir: Directory>(
&segments[..],
)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
let num_docs = merger.write(segment_serializer, None)?;
let num_docs = merger.write(segment_serializer)?;
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);

View File

@@ -12,12 +12,12 @@ use crate::schema::Schema;
use crate::schema::Term;
use crate::schema::Value;
use crate::schema::{Field, FieldEntry};
use crate::store::StoreReader;
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
use crate::tokenizer::{TokenStreamChain, Tokenizer};
use crate::Opstamp;
use crate::{core::Segment, store::StoreWriter};
use crate::{core::SerializableSegment, store::StoreReader};
use crate::{DocId, SegmentComponent};
/// Computes the initial size of the hash table.
@@ -36,6 +36,20 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
}
}
fn remap_doc_opstamps(
opstamps: Vec<Opstamp>,
doc_id_mapping_opt: Option<&DocIdMapping>,
) -> Vec<Opstamp> {
if let Some(doc_id_mapping_opt) = doc_id_mapping_opt {
doc_id_mapping_opt
.iter_old_doc_ids()
.map(|doc| opstamps[doc as usize])
.collect()
} else {
opstamps
}
}
/// A `SegmentWriter` is in charge of creating segment index from a
/// set of documents.
///
@@ -112,14 +126,15 @@ impl SegmentWriter {
.clone()
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
.transpose()?;
write(
remap_and_write(
&self.multifield_postings,
&self.fast_field_writers,
&self.fieldnorms_writer,
self.segment_serializer,
mapping.as_ref(),
)?;
Ok(self.doc_opstamps)
let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping.as_ref());
Ok(doc_opstamps)
}
pub fn mem_usage(&self) -> usize {
@@ -315,8 +330,12 @@ impl SegmentWriter {
}
}
// This method is used as a trick to workaround the borrow checker
fn write(
/// This method is used as a trick to workaround the borrow checker
/// Writes a view of a segment by pushing information
/// to the `SegmentSerializer`.
///
/// `doc_id_map` is used to map to the new doc_id order.
fn remap_and_write(
multifield_postings: &MultiFieldPostingsWriter,
fast_field_writers: &FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
@@ -340,6 +359,7 @@ fn write(
&term_ord_map,
doc_id_map,
)?;
// finalize temp docstore and create version, which reflects the doc_id_map
if let Some(doc_id_map) = doc_id_map {
let store_write = serializer
@@ -356,31 +376,16 @@ fn write(
.segment()
.open_read(SegmentComponent::TempStore)?,
)?;
for old_doc_id in doc_id_map.iter_old_doc_ids() {
let doc_bytes = store_read.get_document_bytes(*old_doc_id)?;
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
serializer.get_store_writer().store_bytes(&doc_bytes)?;
}
}
serializer.close()?;
Ok(())
}
impl SerializableSegment for SegmentWriter {
fn write(
&self,
serializer: SegmentSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<u32> {
let max_doc = self.max_doc;
write(
&self.multifield_postings,
&self.fast_field_writers,
&self.fieldnorms_writer,
serializer,
doc_id_map,
)?;
Ok(max_doc)
}
serializer.close()?;
Ok(())
}
#[cfg(test)]

View File

@@ -133,12 +133,22 @@ pub mod tests {
format!("Doc {}", i)
);
}
for (_, doc) in store.iter(Some(&delete_bitset)).enumerate() {
let doc = doc?;
let title_content = doc.get_first(field_title).unwrap().text().unwrap();
if !title_content.starts_with("Doc ") {
panic!("unexpected title_content {}", title_content);
}
let id = title_content
.strip_prefix("Doc ")
.unwrap()
.parse::<u32>()
.unwrap();
if delete_bitset.is_deleted(id) {
panic!("unexpected deleted document {}", id);
}
}
Ok(())

View File

@@ -171,11 +171,6 @@ impl StoreReader {
.filter_map(move |doc_id| {
// filter_map is only used to resolve lifetime issues between the two closures on
// the outer variables
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if !alive {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
}
// check move to next checkpoint
if doc_id >= curr_checkpoint.as_ref().unwrap().doc_range.end {
@@ -187,6 +182,7 @@ impl StoreReader {
num_skipped = 0;
}
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if alive {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
// the map block will move over the num_skipped, so we reset to 0
@@ -194,6 +190,8 @@ impl StoreReader {
reset_block_pos = false;
ret
} else {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
None
}
})