mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-03 15:52:55 +00:00
Adde demux operation (#1150)
* add merge for DeleteBitSet, allow custom DeleteBitSet on merge * forward delete bitsets on merge, add tests * add demux operation and tests
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
Tantivy 0.16.1
|
||||
========================
|
||||
- Major Bugfix on multivalued fastfield. #1151
|
||||
- Demux operation (@PSeitz)
|
||||
|
||||
Tantivy 0.16.0
|
||||
=========================
|
||||
|
||||
@@ -192,7 +192,6 @@ impl BitSet {
|
||||
|
||||
/// Deserialize a `BitSet`.
|
||||
///
|
||||
#[cfg(test)]
|
||||
pub fn deserialize(mut data: &[u8]) -> io::Result<Self> {
|
||||
let max_value: u32 = u32::from_le_bytes(data[..4].try_into().unwrap());
|
||||
data = &data[4..];
|
||||
@@ -247,7 +246,22 @@ impl BitSet {
|
||||
}
|
||||
}
|
||||
|
||||
/// Intersect with serialized bitset
|
||||
pub fn intersect_update(&mut self, other: &ReadSerializedBitSet) {
|
||||
self.intersect_update_with_iter(other.iter_tinysets());
|
||||
}
|
||||
|
||||
/// Intersect with tinysets
|
||||
fn intersect_update_with_iter(&mut self, other: impl Iterator<Item = TinySet>) {
|
||||
self.len = 0;
|
||||
for (left, right) in self.tinysets.iter_mut().zip(other) {
|
||||
*left = left.intersect(right);
|
||||
self.len += left.len() as u64;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of elements in the `BitSet`.
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.len as usize
|
||||
}
|
||||
@@ -297,6 +311,7 @@ impl BitSet {
|
||||
.map(|delta_bucket| bucket + delta_bucket as u32)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn max_value(&self) -> u32 {
|
||||
self.max_value
|
||||
}
|
||||
@@ -334,7 +349,7 @@ impl ReadSerializedBitSet {
|
||||
/// Iterate the tinyset on the fly from serialized data.
|
||||
///
|
||||
#[inline]
|
||||
fn iter_tinysets<'a>(&'a self) -> impl Iterator<Item = TinySet> + 'a {
|
||||
fn iter_tinysets(&self) -> impl Iterator<Item = TinySet> + '_ {
|
||||
assert!((self.data.len()) % 8 == 0);
|
||||
self.data.chunks_exact(8).map(move |chunk| {
|
||||
let tinyset: TinySet = TinySet::deserialize(chunk.try_into().unwrap()).unwrap();
|
||||
@@ -401,6 +416,46 @@ mod tests {
|
||||
assert_eq!(bitset.len(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bitset_intersect() {
|
||||
let bitset_serialized = {
|
||||
let mut bitset = BitSet::with_max_value_and_full(5);
|
||||
bitset.remove(1);
|
||||
bitset.remove(3);
|
||||
let mut out = vec![];
|
||||
bitset.serialize(&mut out).unwrap();
|
||||
|
||||
ReadSerializedBitSet::open(OwnedBytes::new(out))
|
||||
};
|
||||
|
||||
let mut bitset = BitSet::with_max_value_and_full(5);
|
||||
bitset.remove(1);
|
||||
bitset.intersect_update(&bitset_serialized);
|
||||
|
||||
assert!(bitset.contains(0));
|
||||
assert!(!bitset.contains(1));
|
||||
assert!(bitset.contains(2));
|
||||
assert!(!bitset.contains(3));
|
||||
assert!(bitset.contains(4));
|
||||
|
||||
bitset.intersect_update_with_iter(vec![TinySet::singleton(0)].into_iter());
|
||||
|
||||
assert!(bitset.contains(0));
|
||||
assert!(!bitset.contains(1));
|
||||
assert!(!bitset.contains(2));
|
||||
assert!(!bitset.contains(3));
|
||||
assert!(!bitset.contains(4));
|
||||
assert_eq!(bitset.len(), 1);
|
||||
|
||||
bitset.intersect_update_with_iter(vec![TinySet::singleton(1)].into_iter());
|
||||
assert!(!bitset.contains(0));
|
||||
assert!(!bitset.contains(1));
|
||||
assert!(!bitset.contains(2));
|
||||
assert!(!bitset.contains(3));
|
||||
assert!(!bitset.contains(4));
|
||||
assert_eq!(bitset.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_serialized_bitset_empty() {
|
||||
let mut bitset = BitSet::with_max_value(5);
|
||||
|
||||
@@ -439,7 +439,7 @@ impl Index {
|
||||
/// Creates a multithreaded writer
|
||||
///
|
||||
/// Tantivy will automatically define the number of threads to use, but
|
||||
/// no more than [`MAX_NUM_THREAD`] threads.
|
||||
/// no more than 8 threads.
|
||||
/// `overall_heap_size_in_bytes` is the total target memory usage that will be split
|
||||
/// between a given number of threads.
|
||||
///
|
||||
|
||||
@@ -88,7 +88,7 @@ impl Searcher {
|
||||
&self.segment_readers
|
||||
}
|
||||
|
||||
/// Returns the segment_reader associated with the given segment_ordinal
|
||||
/// Returns the segment_reader associated with the given segment_ord
|
||||
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
|
||||
&self.segment_readers[segment_ord as usize]
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::core::SegmentId;
|
||||
use crate::directory::CompositeFile;
|
||||
use crate::directory::FileSlice;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::union_alive_bitset;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::fastfield::FacetReader;
|
||||
use crate::fastfield::FastFieldReaders;
|
||||
@@ -140,6 +141,14 @@ impl SegmentReader {
|
||||
|
||||
/// Open a new segment for reading.
|
||||
pub fn open(segment: &Segment) -> crate::Result<SegmentReader> {
|
||||
Self::open_with_custom_alive_set(segment, None)
|
||||
}
|
||||
|
||||
/// Open a new segment for reading.
|
||||
pub fn open_with_custom_alive_set(
|
||||
segment: &Segment,
|
||||
custom_bitset: Option<AliveBitSet>,
|
||||
) -> crate::Result<SegmentReader> {
|
||||
let termdict_file = segment.open_read(SegmentComponent::Terms)?;
|
||||
let termdict_composite = CompositeFile::open(&termdict_file)?;
|
||||
|
||||
@@ -164,22 +173,35 @@ impl SegmentReader {
|
||||
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
|
||||
let fast_field_readers =
|
||||
Arc::new(FastFieldReaders::new(schema.clone(), fast_fields_composite));
|
||||
|
||||
let fieldnorm_data = segment.open_read(SegmentComponent::FieldNorms)?;
|
||||
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
|
||||
|
||||
let mut num_docs = segment.meta().num_docs();
|
||||
let max_doc = segment.meta().max_doc();
|
||||
|
||||
let alive_bitset_opt = if segment.meta().has_deletes() {
|
||||
let alive_bitset_bytes = segment.open_read(SegmentComponent::Delete)?.read_bytes()?;
|
||||
let alive_bitset = AliveBitSet::open(alive_bitset_bytes);
|
||||
let delete_data = segment.open_read(SegmentComponent::Delete)?;
|
||||
let mut alive_bitset = AliveBitSet::open(delete_data.read_bytes()?);
|
||||
|
||||
if let Some(provided_bitset) = custom_bitset {
|
||||
assert_eq!(max_doc, provided_bitset.bitset().max_value());
|
||||
alive_bitset = union_alive_bitset(&alive_bitset, &provided_bitset)?;
|
||||
num_docs = alive_bitset.num_alive_docs() as u32;
|
||||
}
|
||||
Some(alive_bitset)
|
||||
} else {
|
||||
if let Some(provided_bitset) = custom_bitset {
|
||||
num_docs = provided_bitset.num_alive_docs() as u32;
|
||||
Some(provided_bitset)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
Ok(SegmentReader {
|
||||
inv_idx_reader_cache: Default::default(),
|
||||
max_doc: segment.meta().max_doc(),
|
||||
num_docs: segment.meta().num_docs(),
|
||||
num_docs,
|
||||
max_doc,
|
||||
termdict_composite,
|
||||
postings_composite,
|
||||
fast_fields_readers: fast_field_readers,
|
||||
|
||||
@@ -6,6 +6,20 @@ use ownedbytes::OwnedBytes;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
/// Merges (intersects) two AliveBitSet in a new one.
|
||||
/// The two bitsets need to have the same max_value.
|
||||
pub fn union_alive_bitset(left: &AliveBitSet, right: &AliveBitSet) -> crate::Result<AliveBitSet> {
|
||||
assert_eq!(left.bitset().max_value(), right.bitset().max_value());
|
||||
|
||||
let mut merged_bitset = BitSet::deserialize(left.data().as_slice())?;
|
||||
merged_bitset.intersect_update(right.bitset());
|
||||
|
||||
let mut alive_bitset_buffer = vec![];
|
||||
write_alive_bitset(&merged_bitset, &mut alive_bitset_buffer)?;
|
||||
|
||||
Ok(AliveBitSet::open(OwnedBytes::new(alive_bitset_buffer)))
|
||||
}
|
||||
|
||||
/// Write a alive `BitSet`
|
||||
///
|
||||
/// where `alive_bitset` is the set of alive `DocId`.
|
||||
@@ -22,6 +36,7 @@ pub struct AliveBitSet {
|
||||
num_alive_docs: usize,
|
||||
bitset: ReadSerializedBitSet,
|
||||
num_bytes: ByteCount,
|
||||
data: OwnedBytes,
|
||||
}
|
||||
|
||||
impl AliveBitSet {
|
||||
@@ -38,14 +53,21 @@ impl AliveBitSet {
|
||||
Self::open(alive_bitset_bytes)
|
||||
}
|
||||
|
||||
pub(crate) fn from_bitset(bitset: &BitSet) -> AliveBitSet {
|
||||
let mut out = vec![];
|
||||
write_alive_bitset(bitset, &mut out).unwrap();
|
||||
AliveBitSet::open(OwnedBytes::new(out))
|
||||
}
|
||||
|
||||
/// Opens a delete bitset given its file.
|
||||
pub fn open(bytes: OwnedBytes) -> AliveBitSet {
|
||||
let num_bytes = bytes.len();
|
||||
let bitset = ReadSerializedBitSet::open(bytes);
|
||||
let bitset = ReadSerializedBitSet::open(bytes.clone());
|
||||
AliveBitSet {
|
||||
num_alive_docs: bitset.len(),
|
||||
bitset,
|
||||
num_bytes,
|
||||
data: bytes,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +83,7 @@ impl AliveBitSet {
|
||||
!self.is_alive(doc)
|
||||
}
|
||||
|
||||
/// Iterate over the alive docids.
|
||||
/// Iterate over the alive doc_ids.
|
||||
#[inline]
|
||||
pub fn iter_alive(&self) -> impl Iterator<Item = DocId> + '_ {
|
||||
self.bitset.iter()
|
||||
@@ -82,6 +104,11 @@ impl AliveBitSet {
|
||||
pub fn space_usage(&self) -> ByteCount {
|
||||
self.num_bytes
|
||||
}
|
||||
|
||||
/// Get underlying bytes.
|
||||
pub(crate) fn data(&self) -> OwnedBytes {
|
||||
self.data.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -23,6 +23,7 @@ values stored.
|
||||
Read access performance is comparable to that of an array lookup.
|
||||
*/
|
||||
|
||||
pub use self::alive_bitset::union_alive_bitset;
|
||||
pub use self::alive_bitset::write_alive_bitset;
|
||||
pub use self::alive_bitset::AliveBitSet;
|
||||
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
|
||||
|
||||
323
src/indexer/demuxer.rs
Normal file
323
src/indexer/demuxer.rs
Normal file
@@ -0,0 +1,323 @@
|
||||
use common::BitSet;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::{merge_filtered_segments, Directory, Index, IndexSettings, Segment, SegmentOrdinal};
|
||||
/// DemuxMapping can be used to reorganize data from multiple segments.
|
||||
///
|
||||
/// DemuxMapping is useful in a multitenant settings, in which each document might actually belong to a different tenant.
|
||||
/// It allows to reorganize documents as follows:
|
||||
///
|
||||
/// e.g. if you have two tenant ids TENANT_A and TENANT_B and two segments with
|
||||
/// the documents (simplified)
|
||||
/// Seg 1 [TENANT_A, TENANT_B]
|
||||
/// Seg 2 [TENANT_A, TENANT_B]
|
||||
///
|
||||
/// You may want to group your documents to
|
||||
/// Seg 1 [TENANT_A, TENANT_A]
|
||||
/// Seg 2 [TENANT_B, TENANT_B]
|
||||
///
|
||||
/// Demuxing is the tool for that.
|
||||
/// Semantically you can define a mapping from [old segment ordinal, old doc_id] -> [new segment ordinal].
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DemuxMapping {
|
||||
/// [index old segment ordinal] -> [index doc_id] = new segment ordinal
|
||||
mapping: Vec<DocIdToSegmentOrdinal>,
|
||||
}
|
||||
|
||||
/// DocIdToSegmentOrdinal maps from doc_id within a segment to the new segment ordinal for demuxing.
|
||||
///
|
||||
/// For every source segment there is a `DocIdToSegmentOrdinal` to distribute its doc_ids.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DocIdToSegmentOrdinal {
|
||||
doc_id_index_to_segment_ord: Vec<SegmentOrdinal>,
|
||||
}
|
||||
|
||||
impl DocIdToSegmentOrdinal {
|
||||
/// Creates a new DocIdToSegmentOrdinal with size of num_doc_ids.
|
||||
/// Initially all doc_ids point to segment ordinal 0 and need to be set
|
||||
/// the via `set` method.
|
||||
pub fn with_max_doc(max_doc: usize) -> Self {
|
||||
DocIdToSegmentOrdinal {
|
||||
doc_id_index_to_segment_ord: vec![0; max_doc],
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of documents in this mapping.
|
||||
/// It should be equal to the `max_doc` of the segment it targets.
|
||||
pub fn max_doc(&self) -> u32 {
|
||||
self.doc_id_index_to_segment_ord.len() as u32
|
||||
}
|
||||
|
||||
/// Associates a doc_id with an output `SegmentOrdinal`.
|
||||
pub fn set(&mut self, doc_id: u32, segment_ord: SegmentOrdinal) {
|
||||
self.doc_id_index_to_segment_ord[doc_id as usize] = segment_ord;
|
||||
}
|
||||
|
||||
/// Iterates over the new SegmentOrdinal in the order of the doc_id.
|
||||
pub fn iter(&self) -> impl Iterator<Item = SegmentOrdinal> + '_ {
|
||||
self.doc_id_index_to_segment_ord.iter().cloned()
|
||||
}
|
||||
}
|
||||
|
||||
impl DemuxMapping {
|
||||
/// Adds a DocIdToSegmentOrdinal. The order of the pus calls
|
||||
/// defines the old segment ordinal. e.g. first push = ordinal 0.
|
||||
pub fn add(&mut self, segment_mapping: DocIdToSegmentOrdinal) {
|
||||
self.mapping.push(segment_mapping);
|
||||
}
|
||||
|
||||
/// Returns the old number of segments.
|
||||
pub fn get_old_num_segments(&self) -> usize {
|
||||
self.mapping.len()
|
||||
}
|
||||
}
|
||||
|
||||
fn docs_for_segment_ord(
|
||||
doc_id_to_segment_ord: &DocIdToSegmentOrdinal,
|
||||
target_segment_ord: SegmentOrdinal,
|
||||
) -> AliveBitSet {
|
||||
let mut bitset = BitSet::with_max_value(doc_id_to_segment_ord.max_doc());
|
||||
for doc_id in doc_id_to_segment_ord
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_doc_id, new_segment_ord)| *new_segment_ord == target_segment_ord)
|
||||
.map(|(doc_id, _)| doc_id)
|
||||
{
|
||||
// add document if segment ordinal = target segment ordinal
|
||||
bitset.insert(doc_id as u32);
|
||||
}
|
||||
AliveBitSet::from_bitset(&bitset)
|
||||
}
|
||||
|
||||
fn get_alive_bitsets(
|
||||
demux_mapping: &DemuxMapping,
|
||||
target_segment_ord: SegmentOrdinal,
|
||||
) -> Vec<AliveBitSet> {
|
||||
demux_mapping
|
||||
.mapping
|
||||
.iter()
|
||||
.map(|doc_id_to_segment_ord| {
|
||||
docs_for_segment_ord(doc_id_to_segment_ord, target_segment_ord)
|
||||
})
|
||||
.collect_vec()
|
||||
}
|
||||
|
||||
/// Demux the segments according to `demux_mapping`. See `DemuxMapping`.
|
||||
/// The number of output_directories need to match max new segment ordinal from `demux_mapping`.
|
||||
///
|
||||
/// The ordinal of `segments` need to match the ordinals provided in `demux_mapping`.
|
||||
pub fn demux<Dir: Directory>(
|
||||
segments: &[Segment],
|
||||
demux_mapping: &DemuxMapping,
|
||||
target_settings: IndexSettings,
|
||||
output_directories: Vec<Dir>,
|
||||
) -> crate::Result<Vec<Index>> {
|
||||
let mut indices = vec![];
|
||||
for (target_segment_ord, output_directory) in output_directories.into_iter().enumerate() {
|
||||
let delete_bitsets = get_alive_bitsets(demux_mapping, target_segment_ord as u32)
|
||||
.into_iter()
|
||||
.map(|bitset| Some(bitset))
|
||||
.collect_vec();
|
||||
let index = merge_filtered_segments(
|
||||
segments,
|
||||
target_settings.clone(),
|
||||
delete_bitsets,
|
||||
output_directory,
|
||||
)?;
|
||||
indices.push(index);
|
||||
}
|
||||
Ok(indices)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
collector::TopDocs,
|
||||
directory::RamDirectory,
|
||||
query::QueryParser,
|
||||
schema::{Schema, TEXT},
|
||||
DocAddress, Term,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_demux_map_to_deletebitset() {
|
||||
let max_value = 2;
|
||||
let mut demux_mapping = DemuxMapping::default();
|
||||
//segment ordinal 0 mapping
|
||||
let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value);
|
||||
doc_id_to_segment.set(0, 1);
|
||||
doc_id_to_segment.set(1, 0);
|
||||
demux_mapping.add(doc_id_to_segment);
|
||||
|
||||
//segment ordinal 1 mapping
|
||||
let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value);
|
||||
doc_id_to_segment.set(0, 1);
|
||||
doc_id_to_segment.set(1, 1);
|
||||
demux_mapping.add(doc_id_to_segment);
|
||||
{
|
||||
let bit_sets_for_demuxing_to_segment_ord_0 =
|
||||
get_alive_bitsets(&demux_mapping, 0, &[max_value as u32, max_value as u32]);
|
||||
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_0[0].is_deleted(0),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_0[0].is_deleted(1),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_0[1].is_deleted(0),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_0[1].is_deleted(1),
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
let bit_sets_for_demuxing_to_segment_ord_1 =
|
||||
get_alive_bitsets(&demux_mapping, 1, &[max_value as u32, max_value as u32]);
|
||||
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_1[0].is_deleted(0),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_1[0].is_deleted(1),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_1[1].is_deleted(0),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
bit_sets_for_demuxing_to_segment_ord_1[1].is_deleted(1),
|
||||
false
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_demux_segments() -> crate::Result<()> {
|
||||
let first_index = {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"texto1"));
|
||||
index_writer.add_document(doc!(text_field=>"texto2"));
|
||||
index_writer.commit()?;
|
||||
index
|
||||
};
|
||||
|
||||
let second_index = {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"texto3"));
|
||||
index_writer.add_document(doc!(text_field=>"texto4"));
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "4"));
|
||||
|
||||
index_writer.commit()?;
|
||||
index
|
||||
};
|
||||
|
||||
let mut segments: Vec<Segment> = Vec::new();
|
||||
segments.extend(first_index.searchable_segments()?);
|
||||
segments.extend(second_index.searchable_segments()?);
|
||||
|
||||
let target_settings = first_index.settings().clone();
|
||||
|
||||
let mut demux_mapping = DemuxMapping::default();
|
||||
{
|
||||
let max_value = 2;
|
||||
//segment ordinal 0 mapping
|
||||
let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value);
|
||||
doc_id_to_segment.set(0, 1);
|
||||
doc_id_to_segment.set(1, 0);
|
||||
demux_mapping.add(doc_id_to_segment);
|
||||
|
||||
//segment ordinal 1 mapping
|
||||
let mut doc_id_to_segment = DocIdToSegmentOrdinal::with_max_doc(max_value);
|
||||
doc_id_to_segment.set(0, 1);
|
||||
doc_id_to_segment.set(1, 1);
|
||||
demux_mapping.add(doc_id_to_segment);
|
||||
}
|
||||
assert_eq!(demux_mapping.get_old_num_segments(), 2);
|
||||
|
||||
let demuxed_indices = demux(
|
||||
&segments,
|
||||
&demux_mapping,
|
||||
target_settings,
|
||||
vec![RamDirectory::default(), RamDirectory::default()],
|
||||
)?;
|
||||
|
||||
{
|
||||
let index = &demuxed_indices[0];
|
||||
|
||||
let segments = index.searchable_segments()?;
|
||||
assert_eq!(segments.len(), 1);
|
||||
|
||||
let segment_metas = segments[0].meta();
|
||||
assert_eq!(segment_metas.num_deleted_docs(), 0);
|
||||
assert_eq!(segment_metas.num_docs(), 1);
|
||||
|
||||
let searcher = index.reader().unwrap().searcher();
|
||||
{
|
||||
let text_field = index.schema().get_field("text").unwrap();
|
||||
|
||||
let do_search = |term: &str| {
|
||||
let query = QueryParser::for_index(&index, vec![text_field])
|
||||
.parse_query(term)
|
||||
.unwrap();
|
||||
let top_docs: Vec<(f32, DocAddress)> =
|
||||
searcher.search(&query, &TopDocs::with_limit(3)).unwrap();
|
||||
|
||||
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
assert_eq!(do_search("texto1"), vec![] as Vec<u32>);
|
||||
assert_eq!(do_search("texto2"), vec![0]);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let index = &demuxed_indices[1];
|
||||
|
||||
let segments = index.searchable_segments()?;
|
||||
assert_eq!(segments.len(), 1);
|
||||
|
||||
let segment_metas = segments[0].meta();
|
||||
assert_eq!(segment_metas.num_deleted_docs(), 0);
|
||||
assert_eq!(segment_metas.num_docs(), 3);
|
||||
|
||||
let searcher = index.reader().unwrap().searcher();
|
||||
{
|
||||
let text_field = index.schema().get_field("text").unwrap();
|
||||
|
||||
let do_search = |term: &str| {
|
||||
let query = QueryParser::for_index(&index, vec![text_field])
|
||||
.parse_query(term)
|
||||
.unwrap();
|
||||
let top_docs: Vec<(f32, DocAddress)> =
|
||||
searcher.search(&query, &TopDocs::with_limit(3)).unwrap();
|
||||
|
||||
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
assert_eq!(do_search("texto1"), vec![0]);
|
||||
assert_eq!(do_search("texto2"), vec![] as Vec<u32>);
|
||||
assert_eq!(do_search("texto3"), vec![1]);
|
||||
assert_eq!(do_search("texto4"), vec![2]);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -11,12 +11,12 @@ use std::{cmp::Reverse, ops::Index};
|
||||
|
||||
/// Struct to provide mapping from new doc_id to old doc_id and segment.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SegmentDocidMapping {
|
||||
pub(crate) struct SegmentDocIdMapping {
|
||||
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
|
||||
is_trivial: bool,
|
||||
}
|
||||
|
||||
impl SegmentDocidMapping {
|
||||
impl SegmentDocIdMapping {
|
||||
pub(crate) fn new(
|
||||
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
|
||||
is_trivial: bool,
|
||||
@@ -40,14 +40,14 @@ impl SegmentDocidMapping {
|
||||
self.is_trivial
|
||||
}
|
||||
}
|
||||
impl Index<usize> for SegmentDocidMapping {
|
||||
impl Index<usize> for SegmentDocIdMapping {
|
||||
type Output = (DocId, SegmentOrdinal);
|
||||
|
||||
fn index(&self, idx: usize) -> &Self::Output {
|
||||
&self.new_doc_id_to_old_and_segment[idx]
|
||||
}
|
||||
}
|
||||
impl IntoIterator for SegmentDocidMapping {
|
||||
impl IntoIterator for SegmentDocIdMapping {
|
||||
type Item = (DocId, SegmentOrdinal);
|
||||
type IntoIter = std::vec::IntoIter<Self::Item>;
|
||||
|
||||
|
||||
@@ -164,15 +164,8 @@ pub(crate) fn advance_deletes(
|
||||
target_opstamp,
|
||||
)?;
|
||||
|
||||
// TODO optimize
|
||||
// It should be possible to do something smarter by manipulation bitsets directly
|
||||
// to compute this union.
|
||||
if let Some(seg_alive_bitset) = segment_reader.alive_bitset() {
|
||||
for doc in 0u32..max_doc {
|
||||
if seg_alive_bitset.is_deleted(doc) {
|
||||
alive_bitset.remove(doc);
|
||||
}
|
||||
}
|
||||
alive_bitset.intersect_update(seg_alive_bitset.bitset());
|
||||
}
|
||||
|
||||
let num_alive_docs: u32 = alive_bitset.len() as u32;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::fastfield::DynamicFastFieldReader;
|
||||
use crate::fastfield::FastFieldDataAccess;
|
||||
@@ -9,7 +10,7 @@ use crate::fastfield::MultiValuedFastFieldReader;
|
||||
use crate::fieldnorm::FieldNormsSerializer;
|
||||
use crate::fieldnorm::FieldNormsWriter;
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders};
|
||||
use crate::indexer::doc_id_mapping::SegmentDocidMapping;
|
||||
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
|
||||
use crate::indexer::SegmentSerializer;
|
||||
use crate::postings::Postings;
|
||||
use crate::postings::{InvertedIndexSerializer, SegmentPostings};
|
||||
@@ -156,16 +157,38 @@ impl IndexMerger {
|
||||
schema: Schema,
|
||||
index_settings: IndexSettings,
|
||||
segments: &[Segment],
|
||||
) -> crate::Result<IndexMerger> {
|
||||
let delete_bitsets = segments.iter().map(|_| None).collect_vec();
|
||||
Self::open_with_custom_alive_set(schema, index_settings, segments, delete_bitsets)
|
||||
}
|
||||
|
||||
// Create merge with a custom delete set.
|
||||
// For every Segment, a delete bitset can be provided, which
|
||||
// will be merged with the existing bit set. Make sure the index
|
||||
// corresponds to the segment index.
|
||||
//
|
||||
// If `None` is provided for custom alive set, the regular alive set will be used.
|
||||
// If a delete_bitsets is provided, the union between the provided and regular
|
||||
// alive set will be used.
|
||||
//
|
||||
// This can be used to merge but also apply an additional filter.
|
||||
// One use case is demux, which is basically taking a list of
|
||||
// segments and partitions them e.g. by a value in a field.
|
||||
pub fn open_with_custom_alive_set(
|
||||
schema: Schema,
|
||||
index_settings: IndexSettings,
|
||||
segments: &[Segment],
|
||||
alive_bitset_opt: Vec<Option<AliveBitSet>>,
|
||||
) -> crate::Result<IndexMerger> {
|
||||
let mut readers = vec![];
|
||||
let mut max_doc: u32 = 0u32;
|
||||
for segment in segments {
|
||||
for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt.into_iter()) {
|
||||
if segment.meta().num_docs() > 0 {
|
||||
let reader = SegmentReader::open(segment)?;
|
||||
max_doc += reader.num_docs();
|
||||
let reader =
|
||||
SegmentReader::open_with_custom_alive_set(segment, new_alive_bitset_opt)?;
|
||||
readers.push(reader);
|
||||
}
|
||||
}
|
||||
let max_doc = readers.iter().map(|reader| reader.num_docs()).sum();
|
||||
if let Some(sort_by_field) = index_settings.sort_by_field.as_ref() {
|
||||
readers = Self::sort_readers_by_min_sort_field(readers, sort_by_field)?;
|
||||
}
|
||||
@@ -213,7 +236,7 @@ impl IndexMerger {
|
||||
fn write_fieldnorms(
|
||||
&self,
|
||||
mut fieldnorms_serializer: FieldNormsSerializer,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema);
|
||||
let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize);
|
||||
@@ -241,7 +264,7 @@ impl IndexMerger {
|
||||
&self,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("write_fast_fields");
|
||||
|
||||
@@ -292,7 +315,7 @@ impl IndexMerger {
|
||||
&self,
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
let (min_value, max_value) = self.readers.iter().map(|reader|{
|
||||
let u64_reader: DynamicFastFieldReader<u64> = reader
|
||||
@@ -324,17 +347,17 @@ impl IndexMerger {
|
||||
num_vals: doc_id_mapping.len() as u64,
|
||||
};
|
||||
#[derive(Clone)]
|
||||
struct SortedDocidFieldAccessProvider<'a> {
|
||||
doc_id_mapping: &'a SegmentDocidMapping,
|
||||
struct SortedDocIdFieldAccessProvider<'a> {
|
||||
doc_id_mapping: &'a SegmentDocIdMapping,
|
||||
fast_field_readers: &'a Vec<DynamicFastFieldReader<u64>>,
|
||||
}
|
||||
impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> {
|
||||
impl<'a> FastFieldDataAccess for SortedDocIdFieldAccessProvider<'a> {
|
||||
fn get_val(&self, doc: u64) -> u64 {
|
||||
let (doc_id, reader_ordinal) = self.doc_id_mapping[doc as usize];
|
||||
self.fast_field_readers[reader_ordinal as usize].get(doc_id)
|
||||
}
|
||||
}
|
||||
let fastfield_accessor = SortedDocidFieldAccessProvider {
|
||||
let fastfield_accessor = SortedDocIdFieldAccessProvider {
|
||||
doc_id_mapping,
|
||||
fast_field_readers: &fast_field_readers,
|
||||
};
|
||||
@@ -416,7 +439,7 @@ impl IndexMerger {
|
||||
pub(crate) fn generate_doc_id_mapping(
|
||||
&self,
|
||||
sort_by_field: &IndexSortByField,
|
||||
) -> crate::Result<SegmentDocidMapping> {
|
||||
) -> crate::Result<SegmentDocIdMapping> {
|
||||
let reader_ordinal_and_field_accessors =
|
||||
self.get_reader_with_sort_field_accessor(sort_by_field)?;
|
||||
// Loading the field accessor on demand causes a 15x regression
|
||||
@@ -459,7 +482,7 @@ impl IndexMerger {
|
||||
})
|
||||
.map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)),
|
||||
);
|
||||
Ok(SegmentDocidMapping::new(sorted_doc_ids, false))
|
||||
Ok(SegmentDocIdMapping::new(sorted_doc_ids, false))
|
||||
}
|
||||
|
||||
// Creating the index file to point into the data, generic over `BytesFastFieldReader` and
|
||||
@@ -468,7 +491,7 @@ impl IndexMerger {
|
||||
fn write_1_n_fast_field_idx_generic<T: MultiValueLength>(
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
reader_and_field_accessors: &[(&SegmentReader, T)],
|
||||
) -> crate::Result<Vec<u64>> {
|
||||
let mut total_num_vals = 0u64;
|
||||
@@ -527,7 +550,7 @@ impl IndexMerger {
|
||||
&self,
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<Vec<u64>> {
|
||||
let reader_ordinal_and_field_accessors = self.readers.iter().map(|reader|{
|
||||
let u64s_reader: MultiValuedFastFieldReader<u64> = reader.fast_fields()
|
||||
@@ -549,7 +572,7 @@ impl IndexMerger {
|
||||
field: Field,
|
||||
term_ordinal_mappings: &TermOrdinalMapping,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("write_hierarchical_facet_field");
|
||||
|
||||
@@ -598,7 +621,7 @@ impl IndexMerger {
|
||||
|
||||
/// Creates a mapping if the segments are stacked. this is helpful to merge codelines between index
|
||||
/// sorting and the others
|
||||
pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result<SegmentDocidMapping> {
|
||||
pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result<SegmentDocIdMapping> {
|
||||
let total_num_new_docs = self
|
||||
.readers
|
||||
.iter()
|
||||
@@ -618,13 +641,13 @@ impl IndexMerger {
|
||||
})
|
||||
.flatten(),
|
||||
);
|
||||
Ok(SegmentDocidMapping::new(mapping, true))
|
||||
Ok(SegmentDocIdMapping::new(mapping, true))
|
||||
}
|
||||
fn write_multi_fast_field(
|
||||
&self,
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
// Multifastfield consists in 2 fastfields.
|
||||
// The first serves as an index into the second one and is stricly increasing.
|
||||
@@ -680,16 +703,16 @@ impl IndexMerger {
|
||||
min_value,
|
||||
};
|
||||
|
||||
struct SortedDocidMultiValueAccessProvider<'a> {
|
||||
doc_id_mapping: &'a SegmentDocidMapping,
|
||||
struct SortedDocIdMultiValueAccessProvider<'a> {
|
||||
doc_id_mapping: &'a SegmentDocIdMapping,
|
||||
fast_field_readers: &'a Vec<MultiValuedFastFieldReader<u64>>,
|
||||
offsets: Vec<u64>,
|
||||
}
|
||||
impl<'a> FastFieldDataAccess for SortedDocidMultiValueAccessProvider<'a> {
|
||||
impl<'a> FastFieldDataAccess for SortedDocIdMultiValueAccessProvider<'a> {
|
||||
fn get_val(&self, pos: u64) -> u64 {
|
||||
// use the offsets index to find the doc_id which will contain the position.
|
||||
// the offsets are stricly increasing so we can do a simple search on it.
|
||||
let new_docid = self
|
||||
let new_doc_id = self
|
||||
.offsets
|
||||
.iter()
|
||||
.position(|&offset| offset > pos)
|
||||
@@ -697,10 +720,10 @@ impl IndexMerger {
|
||||
- 1;
|
||||
|
||||
// now we need to find the position of `pos` in the multivalued bucket
|
||||
let num_pos_covered_until_now = self.offsets[new_docid];
|
||||
let num_pos_covered_until_now = self.offsets[new_doc_id];
|
||||
let pos_in_values = pos - num_pos_covered_until_now;
|
||||
|
||||
let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_docid as usize];
|
||||
let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_doc_id as usize];
|
||||
let num_vals = self.fast_field_readers[reader_ordinal as usize].get_len(old_doc_id);
|
||||
assert!(num_vals >= pos_in_values);
|
||||
let mut vals = vec![];
|
||||
@@ -709,7 +732,7 @@ impl IndexMerger {
|
||||
vals[pos_in_values as usize]
|
||||
}
|
||||
}
|
||||
let fastfield_accessor = SortedDocidMultiValueAccessProvider {
|
||||
let fastfield_accessor = SortedDocIdMultiValueAccessProvider {
|
||||
doc_id_mapping,
|
||||
fast_field_readers: &ff_readers,
|
||||
offsets,
|
||||
@@ -748,7 +771,7 @@ impl IndexMerger {
|
||||
&self,
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
let reader_and_field_accessors = self
|
||||
.readers
|
||||
@@ -784,7 +807,7 @@ impl IndexMerger {
|
||||
field_type: &FieldType,
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
fieldnorm_reader: Option<FieldNormReader>,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<Option<TermOrdinalMapping>> {
|
||||
debug_time!("write_postings_for_field");
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
@@ -823,8 +846,8 @@ impl IndexMerger {
|
||||
segment_local_map
|
||||
})
|
||||
.collect();
|
||||
for (new_doc_id, (old_doc_id, segment_ordinal)) in doc_id_mapping.iter().enumerate() {
|
||||
let segment_map = &mut merged_doc_id_map[*segment_ordinal as usize];
|
||||
for (new_doc_id, (old_doc_id, segment_ord)) in doc_id_mapping.iter().enumerate() {
|
||||
let segment_map = &mut merged_doc_id_map[*segment_ord as usize];
|
||||
segment_map[*old_doc_id as usize] = Some(new_doc_id as DocId);
|
||||
}
|
||||
|
||||
@@ -866,7 +889,7 @@ impl IndexMerger {
|
||||
let mut total_doc_freq = 0;
|
||||
|
||||
// Let's compute the list of non-empty posting lists
|
||||
for (segment_ord, term_info) in merged_terms.current_segment_ordinals_and_term_infos() {
|
||||
for (segment_ord, term_info) in merged_terms.current_segment_ords_and_term_infos() {
|
||||
let segment_reader = &self.readers[segment_ord];
|
||||
let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord];
|
||||
let segment_postings = inverted_index
|
||||
@@ -916,9 +939,9 @@ impl IndexMerger {
|
||||
// there is at least one document.
|
||||
let term_freq = segment_postings.term_freq();
|
||||
segment_postings.positions(&mut positions_buffer);
|
||||
// if doc_id_mapping exists, the docids are reordered, they are
|
||||
// if doc_id_mapping exists, the doc_ids are reordered, they are
|
||||
// not just stacked. The field serializer expects monotonically increasing
|
||||
// docids, so we collect and sort them first, before writing.
|
||||
// doc_ids, so we collect and sort them first, before writing.
|
||||
//
|
||||
// I think this is not strictly necessary, it would be possible to
|
||||
// avoid the loading into a vec via some form of kmerge, but then the merge
|
||||
@@ -958,7 +981,7 @@ impl IndexMerger {
|
||||
&self,
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
fieldnorm_readers: FieldNormReaders,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<HashMap<Field, TermOrdinalMapping>> {
|
||||
let mut term_ordinal_mappings = HashMap::new();
|
||||
for (field, field_entry) in self.schema.fields() {
|
||||
@@ -981,7 +1004,7 @@ impl IndexMerger {
|
||||
fn write_storable_fields(
|
||||
&self,
|
||||
store_writer: &mut StoreWriter,
|
||||
doc_id_mapping: &SegmentDocidMapping,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("write_storable_fields");
|
||||
|
||||
@@ -1573,7 +1596,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_merge_facets_sort_asc() {
|
||||
// In the merge case this will go through the docid mapping code
|
||||
// In the merge case this will go through the doc_id mapping code
|
||||
test_merge_facets(
|
||||
Some(IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
@@ -1584,7 +1607,7 @@ mod tests {
|
||||
}),
|
||||
true,
|
||||
);
|
||||
// In the merge case this will not go through the docid mapping code, because the data is
|
||||
// In the merge case this will not go through the doc_id mapping code, because the data is
|
||||
// sorted and disjunct
|
||||
test_merge_facets(
|
||||
Some(IndexSettings {
|
||||
@@ -1600,7 +1623,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_merge_facets_sort_desc() {
|
||||
// In the merge case this will go through the docid mapping code
|
||||
// In the merge case this will go through the doc_id mapping code
|
||||
test_merge_facets(
|
||||
Some(IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
@@ -1611,7 +1634,7 @@ mod tests {
|
||||
}),
|
||||
true,
|
||||
);
|
||||
// In the merge case this will not go through the docid mapping code, because the data is
|
||||
// In the merge case this will not go through the doc_id mapping code, because the data is
|
||||
// sorted and disjunct
|
||||
test_merge_facets(
|
||||
Some(IndexSettings {
|
||||
|
||||
@@ -554,7 +554,7 @@ mod bench_sorted_index_merge {
|
||||
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");
|
||||
(doc_id, reader, u64_reader)
|
||||
});
|
||||
// add values in order of the new docids
|
||||
// add values in order of the new doc_ids
|
||||
let mut val = 0;
|
||||
for (doc_id, _reader, field_reader) in sorted_doc_ids {
|
||||
val = field_reader.get(*doc_id);
|
||||
@@ -567,7 +567,7 @@ mod bench_sorted_index_merge {
|
||||
Ok(())
|
||||
}
|
||||
#[bench]
|
||||
fn create_sorted_index_create_docid_mapping(b: &mut Bencher) -> crate::Result<()> {
|
||||
fn create_sorted_index_create_doc_id_mapping(b: &mut Bencher) -> crate::Result<()> {
|
||||
let sort_by_field = IndexSortByField {
|
||||
field: "intval".to_string(),
|
||||
order: Order::Desc,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod delete_queue;
|
||||
|
||||
pub mod demuxer;
|
||||
pub mod doc_id_mapping;
|
||||
mod doc_opstamp_mapping;
|
||||
pub mod index_writer;
|
||||
@@ -26,7 +27,8 @@ pub use self::prepared_commit::PreparedCommit;
|
||||
pub use self::segment_entry::SegmentEntry;
|
||||
pub use self::segment_manager::SegmentManager;
|
||||
pub use self::segment_serializer::SegmentSerializer;
|
||||
pub use self::segment_updater::merge_segments;
|
||||
pub use self::segment_updater::merge_filtered_segments;
|
||||
pub use self::segment_updater::merge_indices;
|
||||
pub use self::segment_writer::SegmentWriter;
|
||||
|
||||
/// Alias for the default merge policy, which is the `LogMergePolicy`.
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::indexer::index_writer::advance_deletes;
|
||||
use crate::indexer::merge_operation::MergeOperationInventory;
|
||||
@@ -159,7 +160,7 @@ fn merge(
|
||||
/// meant to work if you have an IndexWriter running for the origin indices, or
|
||||
/// the destination Index.
|
||||
#[doc(hidden)]
|
||||
pub fn merge_segments<Dir: Directory>(
|
||||
pub fn merge_indices<Dir: Directory>(
|
||||
indices: &[Index],
|
||||
output_directory: Dir,
|
||||
) -> crate::Result<Index> {
|
||||
@@ -170,19 +171,8 @@ pub fn merge_segments<Dir: Directory>(
|
||||
));
|
||||
}
|
||||
|
||||
let target_schema = indices[0].schema();
|
||||
let target_settings = indices[0].settings().clone();
|
||||
|
||||
// let's check that all of the indices have the same schema
|
||||
if indices
|
||||
.iter()
|
||||
.skip(1)
|
||||
.any(|index| index.schema() != target_schema)
|
||||
{
|
||||
return Err(crate::TantivyError::InvalidArgument(
|
||||
"Attempt to merge different schema indices".to_string(),
|
||||
));
|
||||
}
|
||||
// let's check that all of the indices have the same index settings
|
||||
if indices
|
||||
.iter()
|
||||
@@ -199,13 +189,61 @@ pub fn merge_segments<Dir: Directory>(
|
||||
segments.extend(index.searchable_segments()?);
|
||||
}
|
||||
|
||||
let mut merged_index = Index::create(output_directory, target_schema.clone(), target_settings)?;
|
||||
let non_filter = segments.iter().map(|_| None).collect::<Vec<_>>();
|
||||
merge_filtered_segments(&segments, target_settings, non_filter, output_directory)
|
||||
}
|
||||
|
||||
/// Advanced: Merges a list of segments from different indices in a new index.
|
||||
/// Additional you can provide a delete bitset for each segment to ignore doc_ids.
|
||||
///
|
||||
/// Returns `TantivyError` if the the indices list is empty or their
|
||||
/// schemas don't match.
|
||||
///
|
||||
/// `output_directory`: is assumed to be empty.
|
||||
///
|
||||
/// # Warning
|
||||
/// This function does NOT check or take the `IndexWriter` is running. It is not
|
||||
/// meant to work if you have an IndexWriter running for the origin indices, or
|
||||
/// the destination Index.
|
||||
#[doc(hidden)]
|
||||
pub fn merge_filtered_segments<Dir: Directory>(
|
||||
segments: &[Segment],
|
||||
target_settings: IndexSettings,
|
||||
filter_doc_ids: Vec<Option<AliveBitSet>>,
|
||||
output_directory: Dir,
|
||||
) -> crate::Result<Index> {
|
||||
if segments.is_empty() {
|
||||
// If there are no indices to merge, there is no need to do anything.
|
||||
return Err(crate::TantivyError::InvalidArgument(
|
||||
"No segments given to marge".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let target_schema = segments[0].schema();
|
||||
|
||||
// let's check that all of the indices have the same schema
|
||||
if segments
|
||||
.iter()
|
||||
.skip(1)
|
||||
.any(|index| index.schema() != target_schema)
|
||||
{
|
||||
return Err(crate::TantivyError::InvalidArgument(
|
||||
"Attempt to merge different schema indices".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut merged_index = Index::create(
|
||||
output_directory,
|
||||
target_schema.clone(),
|
||||
target_settings.clone(),
|
||||
)?;
|
||||
let merged_segment = merged_index.new_segment();
|
||||
let merged_segment_id = merged_segment.id();
|
||||
let merger: IndexMerger = IndexMerger::open(
|
||||
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
|
||||
merged_index.schema(),
|
||||
merged_index.settings().clone(),
|
||||
&segments[..],
|
||||
filter_doc_ids,
|
||||
)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
|
||||
let num_docs = merger.write(segment_serializer)?;
|
||||
@@ -225,7 +263,7 @@ pub fn merge_segments<Dir: Directory>(
|
||||
);
|
||||
|
||||
let index_meta = IndexMeta {
|
||||
index_settings: indices[0].load_metas()?.index_settings, // index_settings of all segments should be the same
|
||||
index_settings: target_settings, // index_settings of all segments should be the same
|
||||
segments: vec![segment_meta],
|
||||
schema: target_schema,
|
||||
opstamp: 0u64,
|
||||
@@ -646,11 +684,18 @@ impl SegmentUpdater {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::merge_segments;
|
||||
use super::merge_indices;
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::RamDirectory;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::indexer::merge_policy::tests::MergeWheneverPossible;
|
||||
use crate::indexer::merger::IndexMerger;
|
||||
use crate::indexer::segment_updater::merge_filtered_segments;
|
||||
use crate::query::QueryParser;
|
||||
use crate::schema::*;
|
||||
use crate::DocAddress;
|
||||
use crate::Index;
|
||||
use crate::Segment;
|
||||
|
||||
#[test]
|
||||
fn test_delete_during_merge() -> crate::Result<()> {
|
||||
@@ -693,6 +738,50 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_all_docs_min() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
for _ in 0..10 {
|
||||
index_writer.add_document(doc!(text_field=>"a"));
|
||||
index_writer.add_document(doc!(text_field=>"b"));
|
||||
}
|
||||
index_writer.commit()?;
|
||||
|
||||
let seg_ids = index.searchable_segment_ids()?;
|
||||
// docs exist, should have at least 1 segment
|
||||
assert!(!seg_ids.is_empty());
|
||||
|
||||
let term = Term::from_field_text(text_field, "a");
|
||||
index_writer.delete_term(term);
|
||||
index_writer.commit()?;
|
||||
|
||||
let term = Term::from_field_text(text_field, "b");
|
||||
index_writer.delete_term(term);
|
||||
index_writer.commit()?;
|
||||
|
||||
index_writer.wait_merging_threads()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
|
||||
let seg_ids = index.searchable_segment_ids()?;
|
||||
assert!(seg_ids.is_empty());
|
||||
|
||||
reader.reload()?;
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
// empty segments should be erased
|
||||
assert!(index.searchable_segment_metas()?.is_empty());
|
||||
assert!(reader.searcher().segment_readers().is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_all_docs() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
@@ -797,7 +886,7 @@ mod tests {
|
||||
|
||||
assert_eq!(indices.len(), 3);
|
||||
let output_directory = RamDirectory::default();
|
||||
let index = merge_segments(&indices, output_directory)?;
|
||||
let index = merge_indices(&indices, output_directory)?;
|
||||
assert_eq!(index.schema(), schema);
|
||||
|
||||
let segments = index.searchable_segments()?;
|
||||
@@ -811,7 +900,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_merge_empty_indices_array() {
|
||||
let merge_result = merge_segments(&[], RamDirectory::default());
|
||||
let merge_result = merge_indices(&[], RamDirectory::default());
|
||||
assert!(merge_result.is_err());
|
||||
}
|
||||
|
||||
@@ -838,9 +927,191 @@ mod tests {
|
||||
};
|
||||
|
||||
// mismatched schema index list
|
||||
let result = merge_segments(&[first_index, second_index], RamDirectory::default());
|
||||
let result = merge_indices(&[first_index, second_index], RamDirectory::default());
|
||||
assert!(result.is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_filtered_segments() -> crate::Result<()> {
|
||||
let first_index = {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"some text 1"));
|
||||
index_writer.add_document(doc!(text_field=>"some text 2"));
|
||||
index_writer.commit()?;
|
||||
index
|
||||
};
|
||||
|
||||
let second_index = {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"some text 3"));
|
||||
index_writer.add_document(doc!(text_field=>"some text 4"));
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "4"));
|
||||
|
||||
index_writer.commit()?;
|
||||
index
|
||||
};
|
||||
|
||||
let mut segments: Vec<Segment> = Vec::new();
|
||||
segments.extend(first_index.searchable_segments()?);
|
||||
segments.extend(second_index.searchable_segments()?);
|
||||
|
||||
let target_settings = first_index.settings().clone();
|
||||
|
||||
let filter_segment_1 = AliveBitSet::for_test_from_deleted_docs(&[1], 2);
|
||||
let filter_segment_2 = AliveBitSet::for_test_from_deleted_docs(&[0], 2);
|
||||
|
||||
let filter_segments = vec![Some(filter_segment_1), Some(filter_segment_2)];
|
||||
|
||||
let merged_index = merge_filtered_segments(
|
||||
&segments,
|
||||
target_settings,
|
||||
filter_segments,
|
||||
RamDirectory::default(),
|
||||
)?;
|
||||
|
||||
let segments = merged_index.searchable_segments()?;
|
||||
assert_eq!(segments.len(), 1);
|
||||
|
||||
let segment_metas = segments[0].meta();
|
||||
assert_eq!(segment_metas.num_deleted_docs(), 0);
|
||||
assert_eq!(segment_metas.num_docs(), 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_single_filtered_segments() -> crate::Result<()> {
|
||||
let first_index = {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"test text"));
|
||||
index_writer.add_document(doc!(text_field=>"some text 2"));
|
||||
|
||||
index_writer.add_document(doc!(text_field=>"some text 3"));
|
||||
index_writer.add_document(doc!(text_field=>"some text 4"));
|
||||
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "4"));
|
||||
|
||||
index_writer.commit()?;
|
||||
index
|
||||
};
|
||||
|
||||
let mut segments: Vec<Segment> = Vec::new();
|
||||
segments.extend(first_index.searchable_segments()?);
|
||||
|
||||
let target_settings = first_index.settings().clone();
|
||||
|
||||
let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[0], 4);
|
||||
|
||||
let filter_segments = vec![Some(filter_segment)];
|
||||
|
||||
let index = merge_filtered_segments(
|
||||
&segments,
|
||||
target_settings,
|
||||
filter_segments,
|
||||
RamDirectory::default(),
|
||||
)?;
|
||||
|
||||
let segments = index.searchable_segments()?;
|
||||
assert_eq!(segments.len(), 1);
|
||||
|
||||
let segment_metas = segments[0].meta();
|
||||
assert_eq!(segment_metas.num_deleted_docs(), 0);
|
||||
assert_eq!(segment_metas.num_docs(), 2);
|
||||
|
||||
let searcher = index.reader().unwrap().searcher();
|
||||
{
|
||||
let text_field = index.schema().get_field("text").unwrap();
|
||||
|
||||
let do_search = |term: &str| {
|
||||
let query = QueryParser::for_index(&index, vec![text_field])
|
||||
.parse_query(term)
|
||||
.unwrap();
|
||||
let top_docs: Vec<(f32, DocAddress)> =
|
||||
searcher.search(&query, &TopDocs::with_limit(3)).unwrap();
|
||||
|
||||
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
assert_eq!(do_search("test"), vec![] as Vec<u32>);
|
||||
assert_eq!(do_search("text"), vec![0, 1]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_apply_doc_id_filter_in_merger() -> crate::Result<()> {
|
||||
let first_index = {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"some text 1"));
|
||||
index_writer.add_document(doc!(text_field=>"some text 2"));
|
||||
|
||||
index_writer.add_document(doc!(text_field=>"some text 3"));
|
||||
index_writer.add_document(doc!(text_field=>"some text 4"));
|
||||
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "4"));
|
||||
|
||||
index_writer.commit()?;
|
||||
index
|
||||
};
|
||||
|
||||
let mut segments: Vec<Segment> = Vec::new();
|
||||
segments.extend(first_index.searchable_segments()?);
|
||||
|
||||
let target_settings = first_index.settings().clone();
|
||||
{
|
||||
let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[1], 4);
|
||||
let filter_segments = vec![Some(filter_segment)];
|
||||
let target_schema = segments[0].schema();
|
||||
let merged_index = Index::create(
|
||||
RamDirectory::default(),
|
||||
target_schema.clone(),
|
||||
target_settings.clone(),
|
||||
)?;
|
||||
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
|
||||
merged_index.schema(),
|
||||
merged_index.settings().clone(),
|
||||
&segments[..],
|
||||
filter_segments,
|
||||
)?;
|
||||
|
||||
let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
|
||||
assert_eq!(doc_ids_alive, vec![0, 2]);
|
||||
}
|
||||
|
||||
{
|
||||
let filter_segments = vec![None];
|
||||
let target_schema = segments[0].schema();
|
||||
let merged_index = Index::create(
|
||||
RamDirectory::default(),
|
||||
target_schema.clone(),
|
||||
target_settings.clone(),
|
||||
)?;
|
||||
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
|
||||
merged_index.schema(),
|
||||
merged_index.settings().clone(),
|
||||
&segments[..],
|
||||
filter_segments,
|
||||
)?;
|
||||
|
||||
let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
|
||||
assert_eq!(doc_ids_alive, vec![0, 1, 2]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,7 +169,9 @@ pub use crate::core::{
|
||||
};
|
||||
pub use crate::core::{InvertedIndexReader, SegmentReader};
|
||||
pub use crate::directory::Directory;
|
||||
pub use crate::indexer::merge_segments;
|
||||
pub use crate::indexer::demuxer::*;
|
||||
pub use crate::indexer::merge_filtered_segments;
|
||||
pub use crate::indexer::merge_indices;
|
||||
pub use crate::indexer::operation::UserOperation;
|
||||
pub use crate::indexer::IndexWriter;
|
||||
pub use crate::postings::Postings;
|
||||
|
||||
@@ -112,9 +112,9 @@ pub mod tests {
|
||||
#[test]
|
||||
fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> {
|
||||
// this will cover deletion of the first element in a checkpoint
|
||||
let deleted_docids = (200..300).collect::<Vec<_>>();
|
||||
let deleted_doc_ids = (200..300).collect::<Vec<_>>();
|
||||
let alive_bitset =
|
||||
AliveBitSet::for_test_from_deleted_docs(&deleted_docids, NUM_DOCS as u32);
|
||||
AliveBitSet::for_test_from_deleted_docs(&deleted_doc_ids, NUM_DOCS as u32);
|
||||
|
||||
let path = Path::new("store");
|
||||
let directory = RamDirectory::create();
|
||||
|
||||
@@ -151,7 +151,7 @@ impl StoreReader {
|
||||
&'b self,
|
||||
alive_bitset: Option<&'a AliveBitSet>,
|
||||
) -> impl Iterator<Item = crate::Result<OwnedBytes>> + 'b {
|
||||
let last_docid = self
|
||||
let last_doc_id = self
|
||||
.block_checkpoints()
|
||||
.last()
|
||||
.map(|checkpoint| checkpoint.doc_range.end)
|
||||
@@ -164,7 +164,7 @@ impl StoreReader {
|
||||
let mut block_start_pos = 0;
|
||||
let mut num_skipped = 0;
|
||||
let mut reset_block_pos = false;
|
||||
(0..last_docid)
|
||||
(0..last_doc_id)
|
||||
.filter_map(move |doc_id| {
|
||||
// filter_map is only used to resolve lifetime issues between the two closures on
|
||||
// the outer variables
|
||||
|
||||
@@ -77,7 +77,7 @@ impl<'a> TermMerger<'a> {
|
||||
/// This method may be called
|
||||
/// iff advance() has been called before
|
||||
/// and "true" was returned.
|
||||
pub fn current_segment_ordinals_and_term_infos<'b: 'a>(
|
||||
pub fn current_segment_ords_and_term_infos<'b: 'a>(
|
||||
&'b self,
|
||||
) -> impl 'b + Iterator<Item = (usize, TermInfo)> {
|
||||
self.current_segment_and_term_ordinals
|
||||
|
||||
Reference in New Issue
Block a user