mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-23 02:29:57 +00:00
Closes #1101 fix delete documents with sort by field
Closes #1101 * fix delete documents with sort by field Co-authored-by: Andre-Philippe Paquet <appaquet@gmail.com>
This commit is contained in:
@@ -1,3 +1,7 @@
|
||||
Tantivy 0.15.2
|
||||
=========================
|
||||
- Major bugfix. Deleting documents was broken when the index was sorted by a field. (@appaquet, @fulmicoton) #1101
|
||||
|
||||
Tantivy 0.15.1
|
||||
=========================
|
||||
- Major bugfix. DocStore panics when first block is deleted. (@appaquet) #1077
|
||||
|
||||
@@ -16,7 +16,6 @@ pub use self::index_meta::{
|
||||
pub use self::inverted_index_reader::InvertedIndexReader;
|
||||
pub use self::searcher::Searcher;
|
||||
pub use self::segment::Segment;
|
||||
pub use self::segment::SerializableSegment;
|
||||
pub use self::segment_component::SegmentComponent;
|
||||
pub use self::segment_id::SegmentId;
|
||||
pub use self::segment_reader::SegmentReader;
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
use super::SegmentComponent;
|
||||
use crate::core::Index;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::error::{OpenReadError, OpenWriteError};
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use crate::indexer::segment_serializer::SegmentSerializer;
|
||||
use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use crate::{core::Index, indexer::doc_id_mapping::DocIdMapping};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@@ -90,20 +89,3 @@ impl Segment {
|
||||
Ok(write)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SerializableSegment {
|
||||
/// Writes a view of a segment by pushing information
|
||||
/// to the `SegmentSerializer`.
|
||||
///
|
||||
/// # Returns
|
||||
/// The number of documents in the segment.
|
||||
///
|
||||
/// doc_id_map is used when index is created and sorted, to map to the new doc_id order.
|
||||
/// It is not used by the `IndexMerger`, since the doc_id_mapping on cross-segments works
|
||||
/// differently
|
||||
fn write(
|
||||
&self,
|
||||
serializer: SegmentSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> crate::Result<u32>;
|
||||
}
|
||||
|
||||
@@ -83,11 +83,11 @@ impl BytesFastFieldWriter {
|
||||
&'a self,
|
||||
doc_id_map: Option<&'b DocIdMapping>,
|
||||
) -> impl Iterator<Item = &'b [u8]> {
|
||||
let doc_id_iter = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box<dyn Iterator<Item = u32>>
|
||||
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids())
|
||||
} else {
|
||||
Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32))
|
||||
as Box<dyn Iterator<Item = u32>>
|
||||
let max_doc = self.doc_index.len() as u32;
|
||||
Box::new(0..max_doc)
|
||||
};
|
||||
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
|
||||
}
|
||||
|
||||
@@ -102,11 +102,11 @@ impl MultiValuedFastFieldWriter {
|
||||
&'a self,
|
||||
doc_id_map: Option<&'b DocIdMapping>,
|
||||
) -> impl Iterator<Item = &'b [u64]> {
|
||||
let doc_id_iter = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box<dyn Iterator<Item = u32>>
|
||||
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids())
|
||||
} else {
|
||||
Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32))
|
||||
as Box<dyn Iterator<Item = u32>>
|
||||
let max_doc = self.doc_index.len() as DocId;
|
||||
Box::new(0..max_doc)
|
||||
};
|
||||
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
|
||||
}
|
||||
|
||||
@@ -296,7 +296,7 @@ impl IntFastFieldWriter {
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
let iter = doc_id_map
|
||||
.iter_old_doc_ids()
|
||||
.map(|doc_id| self.vals.get(*doc_id as usize));
|
||||
.map(|doc_id| self.vals.get(doc_id as usize));
|
||||
serializer.create_auto_detect_u64_fast_field(
|
||||
self.field,
|
||||
stats,
|
||||
|
||||
@@ -98,7 +98,7 @@ impl FieldNormsWriter {
|
||||
let mut mapped_fieldnorm_values = vec![];
|
||||
mapped_fieldnorm_values.resize(fieldnorm_values.len(), 0u8);
|
||||
for (new_doc_id, old_doc_id) in doc_id_map.iter_old_doc_ids().enumerate() {
|
||||
mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[*old_doc_id as usize];
|
||||
mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[old_doc_id as usize];
|
||||
}
|
||||
fieldnorms_serializer.serialize_field(field, &mapped_fieldnorm_values)?;
|
||||
} else {
|
||||
|
||||
@@ -24,8 +24,8 @@ impl DocIdMapping {
|
||||
self.new_doc_id_to_old[doc_id as usize]
|
||||
}
|
||||
/// iterate over old doc_ids in order of the new doc_ids
|
||||
pub fn iter_old_doc_ids(&self) -> std::slice::Iter<'_, DocId> {
|
||||
self.new_doc_id_to_old.iter()
|
||||
pub fn iter_old_doc_ids(&self) -> impl Iterator<Item = DocId> + Clone + '_ {
|
||||
self.new_doc_id_to_old.iter().cloned()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,35 +14,27 @@ use crate::Opstamp;
|
||||
// The doc to opstamp mapping stores precisely an array
|
||||
// indexed by doc id and storing the opstamp of the document.
|
||||
//
|
||||
// This mapping is (for the moment) stricly increasing
|
||||
// because of the way document id are allocated.
|
||||
// This mapping is NOT necessarily increasing, because
|
||||
// we might be sorting documents according to a fast field.
|
||||
#[derive(Clone)]
|
||||
pub enum DocToOpstampMapping<'a> {
|
||||
WithMap(&'a [Opstamp]),
|
||||
None,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a [u64]> for DocToOpstampMapping<'a> {
|
||||
fn from(opstamps: &[Opstamp]) -> DocToOpstampMapping {
|
||||
DocToOpstampMapping::WithMap(opstamps)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DocToOpstampMapping<'a> {
|
||||
/// Given an opstamp return the limit doc id L
|
||||
/// such that all doc id D such that
|
||||
// D >= L iff opstamp(D) >= than `target_opstamp`.
|
||||
//
|
||||
// The edge case opstamp = some doc opstamp is in practise
|
||||
// never called.
|
||||
pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId {
|
||||
match *self {
|
||||
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
|
||||
match doc_opstamps.binary_search(&target_opstamp) {
|
||||
Ok(doc_id) | Err(doc_id) => doc_id as DocId,
|
||||
}
|
||||
/// Assess whether a document should be considered deleted given that it contains
|
||||
/// a deleted term that was deleted at the opstamp: `delete_opstamp`.
|
||||
///
|
||||
/// This function returns true if the `DocToOpstamp` mapping is none or if
|
||||
/// the `doc_opstamp` is anterior to the delete opstamp.
|
||||
pub fn is_deleted(&self, doc_id: DocId, delete_opstamp: Opstamp) -> bool {
|
||||
match self {
|
||||
Self::WithMap(doc_opstamps) => {
|
||||
let doc_opstamp = doc_opstamps[doc_id as usize];
|
||||
doc_opstamp < delete_opstamp
|
||||
}
|
||||
DocToOpstampMapping::None => DocId::max_value(),
|
||||
Self::None => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -55,40 +47,17 @@ mod tests {
|
||||
#[test]
|
||||
fn test_doc_to_opstamp_mapping_none() {
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::None;
|
||||
assert_eq!(
|
||||
doc_to_opstamp_mapping.compute_doc_limit(1),
|
||||
u32::max_value()
|
||||
);
|
||||
assert!(doc_to_opstamp_mapping.is_deleted(1u32, 0u64));
|
||||
assert!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_doc_to_opstamp_mapping_complex() {
|
||||
{
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[][..]);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 0);
|
||||
}
|
||||
{
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[1u64][..]);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 1);
|
||||
}
|
||||
{
|
||||
let doc_to_opstamp_mapping =
|
||||
DocToOpstampMapping::from(&[1u64, 12u64, 17u64, 23u64][..]);
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
|
||||
for i in 2u64..13u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 1);
|
||||
}
|
||||
for i in 13u64..18u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 2);
|
||||
}
|
||||
for i in 18u64..24u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 3);
|
||||
}
|
||||
for i in 24u64..30u64 {
|
||||
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 4);
|
||||
}
|
||||
}
|
||||
fn test_doc_to_opstamp_mapping_with_map() {
|
||||
let doc_to_opstamp_mapping = DocToOpstampMapping::WithMap(&[5u64, 1u64, 0u64, 4u64, 3u64]);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(0u32, 2u64), false);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64), true);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(2u32, 2u64), true);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(3u32, 2u64), false);
|
||||
assert_eq!(doc_to_opstamp_mapping.is_deleted(4u32, 2u64), false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,22 +106,18 @@ fn compute_deleted_bitset(
|
||||
}
|
||||
|
||||
// A delete operation should only affect
|
||||
// document that were inserted after it.
|
||||
//
|
||||
// Limit doc helps identify the first document
|
||||
// that may be affected by the delete operation.
|
||||
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
|
||||
// document that were inserted before it.
|
||||
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
|
||||
if let Some(mut docset) =
|
||||
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
|
||||
{
|
||||
let mut deleted_doc = docset.doc();
|
||||
while deleted_doc != TERMINATED {
|
||||
if deleted_doc < limit_doc {
|
||||
delete_bitset.insert(deleted_doc);
|
||||
let mut doc_matching_deleted_term = docset.doc();
|
||||
while doc_matching_deleted_term != TERMINATED {
|
||||
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
|
||||
delete_bitset.insert(doc_matching_deleted_term);
|
||||
might_have_changed = true;
|
||||
}
|
||||
deleted_doc = docset.advance();
|
||||
doc_matching_deleted_term = docset.advance();
|
||||
}
|
||||
}
|
||||
delete_cursor.advance();
|
||||
@@ -230,14 +226,8 @@ fn index_documents(
|
||||
|
||||
let segment_with_max_doc = segment.with_max_doc(max_doc);
|
||||
|
||||
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
|
||||
|
||||
let delete_bitset_opt = apply_deletes(
|
||||
&segment_with_max_doc,
|
||||
&mut delete_cursor,
|
||||
&doc_opstamps,
|
||||
last_docstamp,
|
||||
)?;
|
||||
let delete_bitset_opt =
|
||||
apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
|
||||
|
||||
let meta = segment_with_max_doc.meta().clone();
|
||||
meta.untrack_temp_docstore();
|
||||
@@ -247,19 +237,26 @@ fn index_documents(
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// `doc_opstamps` is required to be non-empty.
|
||||
fn apply_deletes(
|
||||
segment: &Segment,
|
||||
mut delete_cursor: &mut DeleteCursor,
|
||||
doc_opstamps: &[Opstamp],
|
||||
last_docstamp: Opstamp,
|
||||
) -> crate::Result<Option<BitSet>> {
|
||||
if delete_cursor.get().is_none() {
|
||||
// if there are no delete operation in the queue, no need
|
||||
// to even open the segment.
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let max_doc_opstamp: Opstamp = doc_opstamps
|
||||
.iter()
|
||||
.cloned()
|
||||
.max()
|
||||
.expect("Empty DocOpstamp is forbidden");
|
||||
|
||||
let segment_reader = SegmentReader::open(segment)?;
|
||||
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
|
||||
let doc_to_opstamps = DocToOpstampMapping::WithMap(doc_opstamps);
|
||||
|
||||
let max_doc = segment.meta().max_doc();
|
||||
let mut deleted_bitset = BitSet::with_max_value(max_doc);
|
||||
@@ -268,7 +265,7 @@ fn apply_deletes(
|
||||
&segment_reader,
|
||||
&mut delete_cursor,
|
||||
&doc_to_opstamps,
|
||||
last_docstamp,
|
||||
max_doc_opstamp,
|
||||
)?;
|
||||
Ok(if may_have_deletes {
|
||||
Some(deleted_bitset)
|
||||
@@ -784,17 +781,22 @@ impl Drop for IndexWriter {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use proptest::prelude::*;
|
||||
use proptest::prop_oneof;
|
||||
use proptest::strategy::Strategy;
|
||||
|
||||
use super::super::operation::UserOperation;
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::error::*;
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::TermQuery;
|
||||
use crate::schema::{self, IndexRecordOption, STRING};
|
||||
use crate::schema::{self, IndexRecordOption, FAST, INDEXED, STRING};
|
||||
use crate::Index;
|
||||
use crate::ReloadPolicy;
|
||||
use crate::Term;
|
||||
use crate::{IndexSettings, IndexSortByField, Order};
|
||||
|
||||
#[test]
|
||||
fn test_operations_group() {
|
||||
@@ -1282,6 +1284,177 @@ mod tests {
|
||||
assert!(commit_again.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_with_sort_by_field() -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let id_field =
|
||||
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "id".to_string(),
|
||||
order: Order::Desc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let index_reader = index.reader()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
// create and delete docs in same commit
|
||||
for id in 0u64..5u64 {
|
||||
index_writer.add_document(doc!(id_field => id));
|
||||
}
|
||||
for id in 2u64..4u64 {
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||
}
|
||||
for id in 5u64..10u64 {
|
||||
index_writer.add_document(doc!(id_field => id));
|
||||
}
|
||||
index_writer.commit()?;
|
||||
index_reader.reload()?;
|
||||
|
||||
let searcher = index_reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
assert_eq!(segment_reader.num_docs(), 8);
|
||||
assert_eq!(segment_reader.max_doc(), 10);
|
||||
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
|
||||
let in_order_alive_ids: Vec<u64> = segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(|doc| fast_field_reader.get(doc))
|
||||
.collect();
|
||||
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 1, 0]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum IndexingOp {
|
||||
AddDoc { id: u64 },
|
||||
DeleteDoc { id: u64 },
|
||||
}
|
||||
|
||||
fn operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||
prop_oneof![
|
||||
(0u64..10u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
(0u64..10u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||
]
|
||||
}
|
||||
|
||||
fn expected_ids(ops: &[IndexingOp]) -> Vec<u64> {
|
||||
let mut ids = Vec::new();
|
||||
for &op in ops {
|
||||
match op {
|
||||
IndexingOp::AddDoc { id } => {
|
||||
ids.push(id);
|
||||
}
|
||||
IndexingOp::DeleteDoc { id } => {
|
||||
ids.retain(|&id_val| id_val != id);
|
||||
}
|
||||
}
|
||||
}
|
||||
ids.sort();
|
||||
ids
|
||||
}
|
||||
|
||||
fn test_operation_strategy(ops: &[IndexingOp]) -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "id".to_string(),
|
||||
order: Order::Asc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
for &op in ops {
|
||||
match op {
|
||||
IndexingOp::AddDoc { id } => {
|
||||
index_writer.add_document(doc!(id_field=>id));
|
||||
}
|
||||
IndexingOp::DeleteDoc { id } => {
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||
}
|
||||
}
|
||||
}
|
||||
index_writer.commit()?;
|
||||
let searcher = index.reader()?.searcher();
|
||||
let ids: Vec<u64> = searcher
|
||||
.segment_readers()
|
||||
.iter()
|
||||
.flat_map(|segment_reader| {
|
||||
let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
|
||||
segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(move |doc| ff_reader.get(doc))
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(ids, expected_ids(ops));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_delete_with_sort_proptest(ops in proptest::collection::vec(operation_strategy(), 1..10)) {
|
||||
assert!(test_operation_strategy(&ops[..]).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_with_sort_by_field_last_opstamp_is_not_max() -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let sort_by_field = schema_builder.add_u64_field("sort_by", FAST);
|
||||
let id_field = schema_builder.add_u64_field("id", INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "sort_by".to_string(),
|
||||
order: Order::Asc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
// We add a doc...
|
||||
index_writer.add_document(doc!(sort_by_field => 2u64, id_field => 0u64));
|
||||
// And remove it.
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, 0u64));
|
||||
// We add another doc.
|
||||
index_writer.add_document(doc!(sort_by_field=>1u64, id_field => 0u64));
|
||||
|
||||
// The expected result is a segment with
|
||||
// maxdoc = 2
|
||||
// numdoc = 1.
|
||||
index_writer.commit()?;
|
||||
|
||||
let searcher = index.reader()?.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
assert_eq!(segment_reader.max_doc(), 2);
|
||||
assert_eq!(segment_reader.num_deleted_docs(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_doc_missing_field() {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use super::doc_id_mapping::DocIdMapping;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
@@ -19,11 +18,11 @@ use crate::schema::{Field, Schema};
|
||||
use crate::store::StoreWriter;
|
||||
use crate::termdict::TermMerger;
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::IndexSortByField;
|
||||
use crate::{common::HasLen, fastfield::MultiValueLength};
|
||||
use crate::{common::MAX_DOC_LIMIT, IndexSettings};
|
||||
use crate::{core::Segment, indexer::doc_id_mapping::expect_field_id_for_sort_field};
|
||||
use crate::{core::SegmentReader, Order};
|
||||
use crate::{core::SerializableSegment, IndexSortByField};
|
||||
use crate::{
|
||||
docset::{DocSet, TERMINATED},
|
||||
SegmentOrdinal,
|
||||
@@ -1084,14 +1083,13 @@ impl IndexMerger {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SerializableSegment for IndexMerger {
|
||||
fn write(
|
||||
&self,
|
||||
mut serializer: SegmentSerializer,
|
||||
_: Option<&DocIdMapping>,
|
||||
) -> crate::Result<u32> {
|
||||
/// Writes the merged segment by pushing information
|
||||
/// to the `SegmentSerializer`.
|
||||
///
|
||||
/// # Returns
|
||||
/// The number of documents in the resulting segment.
|
||||
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
|
||||
let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref()
|
||||
{
|
||||
// If the documents are already sorted and stackable, we ignore the mapping and execute
|
||||
|
||||
@@ -5,7 +5,6 @@ use crate::core::IndexSettings;
|
||||
use crate::core::Segment;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::SerializableSegment;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
@@ -140,7 +139,7 @@ fn merge(
|
||||
// ... we just serialize this index merger in our new segment to merge the segments.
|
||||
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?;
|
||||
|
||||
let num_docs = merger.write(segment_serializer, None)?;
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
|
||||
let merged_segment_id = merged_segment.id();
|
||||
|
||||
@@ -209,7 +208,7 @@ pub fn merge_segments<Dir: Directory>(
|
||||
&segments[..],
|
||||
)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
|
||||
let num_docs = merger.write(segment_serializer, None)?;
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
|
||||
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
|
||||
|
||||
|
||||
@@ -12,12 +12,12 @@ use crate::schema::Schema;
|
||||
use crate::schema::Term;
|
||||
use crate::schema::Value;
|
||||
use crate::schema::{Field, FieldEntry};
|
||||
use crate::store::StoreReader;
|
||||
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
||||
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
||||
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
||||
use crate::Opstamp;
|
||||
use crate::{core::Segment, store::StoreWriter};
|
||||
use crate::{core::SerializableSegment, store::StoreReader};
|
||||
use crate::{DocId, SegmentComponent};
|
||||
|
||||
/// Computes the initial size of the hash table.
|
||||
@@ -36,6 +36,20 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
|
||||
}
|
||||
}
|
||||
|
||||
fn remap_doc_opstamps(
|
||||
opstamps: Vec<Opstamp>,
|
||||
doc_id_mapping_opt: Option<&DocIdMapping>,
|
||||
) -> Vec<Opstamp> {
|
||||
if let Some(doc_id_mapping_opt) = doc_id_mapping_opt {
|
||||
doc_id_mapping_opt
|
||||
.iter_old_doc_ids()
|
||||
.map(|doc| opstamps[doc as usize])
|
||||
.collect()
|
||||
} else {
|
||||
opstamps
|
||||
}
|
||||
}
|
||||
|
||||
/// A `SegmentWriter` is in charge of creating segment index from a
|
||||
/// set of documents.
|
||||
///
|
||||
@@ -112,14 +126,15 @@ impl SegmentWriter {
|
||||
.clone()
|
||||
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
|
||||
.transpose()?;
|
||||
write(
|
||||
remap_and_write(
|
||||
&self.multifield_postings,
|
||||
&self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
self.segment_serializer,
|
||||
mapping.as_ref(),
|
||||
)?;
|
||||
Ok(self.doc_opstamps)
|
||||
let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping.as_ref());
|
||||
Ok(doc_opstamps)
|
||||
}
|
||||
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
@@ -315,8 +330,12 @@ impl SegmentWriter {
|
||||
}
|
||||
}
|
||||
|
||||
// This method is used as a trick to workaround the borrow checker
|
||||
fn write(
|
||||
/// This method is used as a trick to workaround the borrow checker
|
||||
/// Writes a view of a segment by pushing information
|
||||
/// to the `SegmentSerializer`.
|
||||
///
|
||||
/// `doc_id_map` is used to map to the new doc_id order.
|
||||
fn remap_and_write(
|
||||
multifield_postings: &MultiFieldPostingsWriter,
|
||||
fast_field_writers: &FastFieldsWriter,
|
||||
fieldnorms_writer: &FieldNormsWriter,
|
||||
@@ -340,6 +359,7 @@ fn write(
|
||||
&term_ord_map,
|
||||
doc_id_map,
|
||||
)?;
|
||||
|
||||
// finalize temp docstore and create version, which reflects the doc_id_map
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
let store_write = serializer
|
||||
@@ -356,31 +376,16 @@ fn write(
|
||||
.segment()
|
||||
.open_read(SegmentComponent::TempStore)?,
|
||||
)?;
|
||||
|
||||
for old_doc_id in doc_id_map.iter_old_doc_ids() {
|
||||
let doc_bytes = store_read.get_document_bytes(*old_doc_id)?;
|
||||
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
|
||||
serializer.get_store_writer().store_bytes(&doc_bytes)?;
|
||||
}
|
||||
}
|
||||
serializer.close()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl SerializableSegment for SegmentWriter {
|
||||
fn write(
|
||||
&self,
|
||||
serializer: SegmentSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> crate::Result<u32> {
|
||||
let max_doc = self.max_doc;
|
||||
write(
|
||||
&self.multifield_postings,
|
||||
&self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
serializer,
|
||||
doc_id_map,
|
||||
)?;
|
||||
Ok(max_doc)
|
||||
}
|
||||
serializer.close()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user