Compare commits

..

1 Commits

Author SHA1 Message Date
Luca Cominardi
57a270393d feat: allow provided doc id mappings for segment serialization
Expose merge-time doc id mapping control and add a single-segment finalize path that accepts an explicit new-doc-id to old-doc-id permutation. This lets callers choose document order without adding a persistent sort field.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-25 18:31:58 +02:00
12 changed files with 373 additions and 312 deletions

View File

@@ -281,16 +281,12 @@ impl BitSet {
}
/// Inserts an element in the `BitSet`
///
/// Returns true if the set changed.
#[inline]
pub fn insert(&mut self, el: u32) -> bool {
pub fn insert(&mut self, el: u32) {
// we do not check saturated els.
let higher = el / 64u32;
let lower = el % 64u32;
let changed = self.tinysets[higher as usize].insert_mut(lower);
self.len += u64::from(changed);
changed
self.len += u64::from(self.tinysets[higher as usize].insert_mut(lower));
}
/// Inserts an element in the `BitSet`

View File

@@ -931,9 +931,7 @@ fn build_allowed_term_ids_for_str(
// add matches
allowed = Some(BitSet::with_max_value(allowed_capacity));
let allowed = allowed.as_mut().unwrap();
for_each_matching_term_ord(str_col, include, |ord| {
let _ = allowed.insert(ord);
})?;
for_each_matching_term_ord(str_col, include, |ord| allowed.insert(ord))?;
};
if let Some(exclude) = exclude {

View File

@@ -1,14 +1,14 @@
use crate::collector::Count;
use crate::directory::{RamDirectory, WatchCallback};
use crate::index::SegmentId;
use crate::indexer::{DocIdMapping, LogMergePolicy, NoMergePolicy};
use crate::indexer::{LogMergePolicy, NoMergePolicy};
use crate::postings::Postings;
use crate::query::TermQuery;
use crate::schema::{Field, IndexRecordOption, Schema, Value, INDEXED, STORED, STRING, TEXT};
use crate::schema::{Field, IndexRecordOption, Schema, INDEXED, STRING, TEXT};
use crate::tokenizer::TokenizerManager;
use crate::{
Directory, DocAddress, DocSet, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter,
ReloadPolicy, TantivyDocument, Term,
Directory, DocSet, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, ReloadPolicy,
TantivyDocument, Term,
};
#[test]
@@ -300,49 +300,6 @@ fn test_single_segment_index_writer() -> crate::Result<()> {
Ok(())
}
#[test]
fn test_single_segment_index_writer_with_doc_id_mapping() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT | STORED);
let schema = schema_builder.build();
let directory = RamDirectory::default();
let mut settings = IndexSettings::default();
settings.manual_doc_id_mapping = true;
let mut single_segment_index_writer = Index::builder()
.schema(schema)
.settings(settings)
.single_segment_index_writer(directory, 15_000_000)?;
single_segment_index_writer.add_document(doc!(text_field=>"alpha beta"))?;
single_segment_index_writer.add_document(doc!())?;
single_segment_index_writer.add_document(doc!(text_field=>"gamma"))?;
let mapping = DocIdMapping::from_new_id_to_old_id(vec![2, 1, 0]);
let index = single_segment_index_writer.finalize_with_doc_id_mapping(&mapping)?;
let searcher = index.reader()?.searcher();
let segment_reader = searcher.segment_reader(0);
let fieldnorm_reader = segment_reader.get_fieldnorms_reader(text_field)?;
assert_eq!(fieldnorm_reader.fieldnorm(0), 1);
assert_eq!(fieldnorm_reader.fieldnorm(1), 0);
assert_eq!(fieldnorm_reader.fieldnorm(2), 2);
let doc_0 = searcher.doc::<TantivyDocument>(DocAddress::new(0, 0))?;
assert_eq!(
doc_0.get_first(text_field).and_then(|val| val.as_str()),
Some("gamma")
);
let doc_1 = searcher.doc::<TantivyDocument>(DocAddress::new(0, 1))?;
assert!(doc_1.get_first(text_field).is_none());
let doc_2 = searcher.doc::<TantivyDocument>(DocAddress::new(0, 2))?;
assert_eq!(
doc_2.get_first(text_field).and_then(|val| val.as_str()),
Some("alpha beta")
);
Ok(())
}
#[test]
fn test_merging_segment_update_docfreq() {
let mut schema_builder = Schema::builder();

View File

@@ -250,10 +250,6 @@ pub struct IndexSettings {
/// provided in `IndexSortByField`
#[serde(skip_serializing_if = "Option::is_none")]
pub sort_by_field: Option<IndexSortByField>,
/// If true, enables caller-provided doc id mappings at segment finalization time.
#[doc(hidden)]
#[serde(skip)]
pub manual_doc_id_mapping: bool,
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
@@ -277,7 +273,6 @@ impl Default for IndexSettings {
fn default() -> Self {
Self {
sort_by_field: None,
manual_doc_id_mapping: false,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
docstore_compress_dedicated_thread: true,
@@ -465,7 +460,6 @@ mod tests {
field: "text".to_string(),
order: Order::Asc,
}),
manual_doc_id_mapping: false,
docstore_compression: crate::store::Compressor::Zstd(ZstdCompressor {
compression_level: Some(4),
}),
@@ -535,7 +529,6 @@ mod tests {
index_settings,
IndexSettings {
sort_by_field: None,
manual_doc_id_mapping: false,
docstore_compression: Compressor::default(),
docstore_compress_dedicated_thread: true,
docstore_blocksize: 16_384
@@ -554,18 +547,6 @@ mod tests {
serde_json::from_value(index_settings_json).unwrap();
assert_eq!(index_settings_deser, index_settings);
}
{
index_settings.manual_doc_id_mapping = true;
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
assert_eq!(
index_settings_json,
serde_json::json!({
"docstore_compression": "lz4",
"docstore_blocksize": 16384
})
);
index_settings.manual_doc_id_mapping = false;
}
{
index_settings.docstore_compress_dedicated_thread = false;
let index_settings_json = serde_json::to_value(&index_settings).unwrap();

View File

@@ -7,16 +7,20 @@ use super::SegmentWriter;
use crate::schema::{Field, Schema};
use crate::{DocAddress, DocId, IndexSortByField, TantivyError};
/// Describes how the document ID mapping was produced during a merge.
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum MappingType {
/// Segments are concatenated in order with no deletes; doc IDs are contiguous ranges.
Stacked,
/// Segments are concatenated in order but some documents have been deleted and are skipped.
StackedWithDeletes,
/// Documents have been reordered, for instance by a sort field or by caller-provided order.
Shuffled,
}
/// Struct to provide mapping from new doc_id to old doc_id and segment.
#[derive(Clone)]
pub(crate) struct SegmentDocIdMapping {
pub struct SegmentDocIdMapping {
pub(crate) new_doc_id_to_old_doc_addr: Vec<DocAddress>,
pub(crate) alive_bitsets: Vec<Option<ReadOnlyBitSet>>,
mapping_type: MappingType,
@@ -35,6 +39,24 @@ impl SegmentDocIdMapping {
}
}
/// Build a `Shuffled` mapping from an explicit permutation of [`DocAddress`]es.
///
/// `new_doc_id_to_old_doc_addr[new_id]` gives the source segment and doc id for
/// the document that should appear at position `new_id` in the merged segment.
/// `alive_bitsets` must contain one entry per source segment, in the same order
/// as the segments passed to [`IndexMerger::open_with_custom_alive_set`].
pub fn new_shuffled(
new_doc_id_to_old_doc_addr: Vec<DocAddress>,
alive_bitsets: Vec<Option<ReadOnlyBitSet>>,
) -> Self {
Self {
new_doc_id_to_old_doc_addr,
alive_bitsets,
mapping_type: MappingType::Shuffled,
}
}
/// Returns the [`MappingType`] that describes how this mapping was constructed.
pub fn mapping_type(&self) -> MappingType {
self.mapping_type
}
@@ -71,10 +93,7 @@ pub struct DocIdMapping {
}
impl DocIdMapping {
/// Creates a `DocIdMapping` from a mapping of new doc ids to old doc ids.
///
/// The caller MUST ensure that `new_doc_id_to_old` is a permutation of the
/// segment's old doc ids, with every old doc id appearing exactly once.
/// Constructs a [`DocIdMapping`] from a vector mapping each new doc ID to its old doc ID.
pub fn from_new_id_to_old_id(new_doc_id_to_old: Vec<DocId>) -> Self {
let max_doc = new_doc_id_to_old.len();
let old_max_doc = new_doc_id_to_old
@@ -94,39 +113,37 @@ impl DocIdMapping {
}
/// returns the new doc_id for the old doc_id
pub(crate) fn get_new_doc_id(&self, doc_id: DocId) -> DocId {
pub fn get_new_doc_id(&self, doc_id: DocId) -> DocId {
self.old_doc_id_to_new[doc_id as usize]
}
/// returns the old doc_id for the new doc_id
pub fn get_old_doc_id(&self, doc_id: DocId) -> DocId {
self.new_doc_id_to_old[doc_id as usize]
}
/// iterate over old doc_ids in order of the new doc_ids
pub(crate) fn iter_old_doc_ids(&self) -> impl Iterator<Item = DocId> + Clone + '_ {
pub fn iter_old_doc_ids(&self) -> impl Iterator<Item = DocId> + Clone + '_ {
self.new_doc_id_to_old.iter().cloned()
}
/// returns the new doc_ids in order of the old doc_ids
pub(crate) fn old_to_new_ids(&self) -> &[DocId] {
/// Returns a slice mapping each old doc ID to its corresponding new doc ID.
pub fn old_to_new_ids(&self) -> &[DocId] {
&self.old_doc_id_to_new[..]
}
/// Remaps a given array to the new doc ids.
pub(crate) fn remap<T: Copy>(&self, els: &[T]) -> Vec<T> {
pub fn remap<T: Copy>(&self, els: &[T]) -> Vec<T> {
self.new_doc_id_to_old
.iter()
.map(|old_doc| els[*old_doc as usize])
.collect()
}
/// returns the number of new doc_ids
pub(crate) fn len(&self) -> usize {
/// Returns the number of new doc IDs in this mapping.
pub fn num_new_doc_ids(&self) -> usize {
self.new_doc_id_to_old.len()
}
}
#[cfg(test)]
impl DocIdMapping {
/// returns the old doc_id for the new doc_id
fn get_old_doc_id(&self, doc_id: DocId) -> DocId {
self.new_doc_id_to_old[doc_id as usize]
/// Returns the number of old doc IDs covered by this mapping.
pub fn num_old_doc_ids(&self) -> usize {
self.old_doc_id_to_new.len()
}
}

View File

@@ -113,6 +113,7 @@ fn estimate_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::
Ok(total_num_tokens)
}
/// Merges multiple index segments into one segment.
pub struct IndexMerger {
index_settings: IndexSettings,
schema: Schema,
@@ -218,6 +219,7 @@ impl IndexMerger {
.any(|doc_id| col.first(doc_id).is_none())
}
/// Opens an [`IndexMerger`] over the given segments using their existing delete sets.
pub fn open(
schema: Schema,
index_settings: IndexSettings,
@@ -239,6 +241,9 @@ impl IndexMerger {
// This can be used to merge but also apply an additional filter.
// One use case is demux, which is basically taking a list of
// segments and partitions them e.g. by a value in a field.
/// Opens an [`IndexMerger`] with a custom alive set per segment.
///
/// Each entry in `alive_bitset_opt` corresponds to the segment at the same ordinal.
pub fn open_with_custom_alive_set(
schema: Schema,
index_settings: IndexSettings,
@@ -947,7 +952,7 @@ impl IndexMerger {
///
/// # Returns
/// The number of documents in the resulting segment.
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
pub fn write(&self, serializer: SegmentSerializer) -> crate::Result<u32> {
let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref()
{
if self.is_disjunct_and_sorted_on_sort_property(sort_by_field)? {
@@ -958,6 +963,27 @@ impl IndexMerger {
} else {
self.get_doc_id_from_concatenated_data()?
};
self.write_with_mapping(serializer, doc_id_mapping)
}
/// Like [`IndexMerger::write`], but uses the caller-supplied `doc_id_mapping` instead of
/// deriving one from an index sort field.
///
/// The mapping must cover all live documents across every segment passed to
/// [`IndexMerger::open_with_custom_alive_set`].
pub fn write_with_doc_id_mapping(
&self,
serializer: SegmentSerializer,
doc_id_mapping: SegmentDocIdMapping,
) -> crate::Result<u32> {
self.write_with_mapping(serializer, doc_id_mapping)
}
fn write_with_mapping(
&self,
mut serializer: SegmentSerializer,
doc_id_mapping: SegmentDocIdMapping,
) -> crate::Result<u32> {
debug!("write-fieldnorms");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
self.write_fieldnorms(fieldnorms_serializer, &doc_id_mapping)?;

View File

@@ -8,7 +8,7 @@
pub(crate) mod delete_queue;
pub(crate) mod path_to_unordered_id;
pub(crate) mod doc_id_mapping;
pub mod doc_id_mapping;
mod doc_opstamp_mapping;
mod flat_map_with_buffer;
pub(crate) mod index_writer;
@@ -17,7 +17,8 @@ pub(crate) mod indexing_term;
mod log_merge_policy;
mod merge_operation;
pub(crate) mod merge_policy;
pub(crate) mod merger;
/// Segment merger APIs for combining multiple existing segments.
pub mod merger;
mod merger_sorted_index_test;
pub(crate) mod operation;
pub(crate) mod prepared_commit;
@@ -33,16 +34,19 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;
pub use self::doc_id_mapping::DocIdMapping;
pub use self::doc_id_mapping::SegmentDocIdMapping;
pub use self::index_writer::{advance_deletes, IndexWriter, IndexWriterOptions};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
pub use self::merger::IndexMerger;
pub use self::operation::{AddOperation, DeleteOperation, UserOperation};
pub use self::prepared_commit::PreparedCommit;
pub use self::segment_entry::SegmentEntry;
pub(crate) use self::segment_serializer::SegmentSerializer;
pub use self::segment_updater::{merge_filtered_segments, merge_indices};
pub use self::segment_updater::{
merge_filtered_segments, merge_indices, merge_segments_with_doc_id_mapping,
};
pub use self::segment_writer::SegmentWriter;
pub use self::single_segment_index_writer::SingleSegmentIndexWriter;

View File

@@ -4,13 +4,14 @@ use crate::directory::WritePtr;
use crate::fieldnorm::FieldNormsSerializer;
use crate::index::{Segment, SegmentComponent};
use crate::postings::InvertedIndexSerializer;
use crate::store::{Compressor, StoreWriter};
use crate::store::StoreWriter;
/// Segment serializer is in charge of laying out on disk
/// the data accumulated and sorted by the `SegmentWriter`.
pub struct SegmentSerializer {
segment: Segment,
pub(crate) store_writer: StoreWriter,
store_is_temp: bool,
fast_field_write: WritePtr,
fieldnorms_serializer: Option<FieldNormsSerializer>,
postings_serializer: InvertedIndexSerializer,
@@ -18,25 +19,29 @@ pub struct SegmentSerializer {
impl SegmentSerializer {
/// Creates a new `SegmentSerializer`.
pub fn for_segment(
mut segment: Segment,
is_in_merge: bool,
) -> crate::Result<SegmentSerializer> {
pub fn for_segment(segment: Segment, is_in_merge: bool) -> crate::Result<SegmentSerializer> {
// If the segment is going to be sorted, we stream the docs first to a temporary file.
// In the merge case this is not necessary because we can kmerge the already sorted
// segments
let remapping_required = segment.index().settings().sort_by_field.is_some() && !is_in_merge;
Self::for_segment_with_remapping_required(segment, remapping_required)
}
/// Creates a new `SegmentSerializer` with an explicit remapping requirement.
pub fn for_segment_with_remapping_required(
mut segment: Segment,
remapping_required: bool,
) -> crate::Result<SegmentSerializer> {
let settings = segment.index().settings().clone();
let remapping_required =
(settings.sort_by_field.is_some() || settings.manual_doc_id_mapping) && !is_in_merge;
let store_writer = if remapping_required {
let store_write = segment.open_write(SegmentComponent::TempStore)?;
StoreWriter::new(
store_write,
Compressor::None,
crate::store::Compressor::None,
// We want fast random access on the docs, so we choose a small block size.
// If this is zero, the skip index will contain too many checkpoints and
// therefore will be relatively slow.
16_000,
16000,
settings.docstore_compress_dedicated_thread,
)?
} else {
@@ -58,6 +63,7 @@ impl SegmentSerializer {
Ok(SegmentSerializer {
segment,
store_writer,
store_is_temp: remapping_required,
fast_field_write,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
@@ -77,6 +83,10 @@ impl SegmentSerializer {
&mut self.segment
}
pub fn store_is_temp(&self) -> bool {
self.store_is_temp
}
/// Accessor to the `PostingsSerializer`.
pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer {
&mut self.postings_serializer

View File

@@ -15,6 +15,7 @@ use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::fastfield::AliveBitSet;
use crate::index::{Index, IndexMeta, IndexSettings, Segment, SegmentId, SegmentMeta};
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
use crate::indexer::index_writer::advance_deletes;
use crate::indexer::merge_operation::MergeOperationInventory;
use crate::indexer::merger::IndexMerger;
@@ -255,6 +256,81 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
Ok(merged_index)
}
/// Like [`merge_filtered_segments`], but uses a caller-supplied [`SegmentDocIdMapping`]
/// to control the final document order.
///
/// The mapping should be built from the same segments, in the same order, passed here.
///
/// # Warning
/// Same caveats as [`merge_filtered_segments`]: no live `IndexWriter` is allowed.
#[doc(hidden)]
pub fn merge_segments_with_doc_id_mapping<T: Into<Box<dyn Directory>>>(
segments: &[Segment],
target_settings: IndexSettings,
filter_doc_ids: Vec<Option<AliveBitSet>>,
doc_id_mapping: SegmentDocIdMapping,
output_directory: T,
) -> crate::Result<Index> {
if segments.is_empty() {
return Err(crate::TantivyError::InvalidArgument(
"No segments given to merge".to_string(),
));
}
let target_schema = segments[0].schema();
if segments
.iter()
.skip(1)
.any(|segment| segment.schema() != target_schema)
{
return Err(crate::TantivyError::InvalidArgument(
"Attempt to merge different schema indices".to_string(),
));
}
let mut merged_index = Index::create(
output_directory,
target_schema.clone(),
target_settings.clone(),
)?;
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
merged_index.settings().clone(),
segments,
filter_doc_ids,
)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
let num_docs = merger.write_with_doc_id_mapping(segment_serializer, doc_id_mapping)?;
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
let stats = format!(
"Segments Merge (external reordering): [{}]",
segments
.iter()
.fold(String::new(), |sum, current| format!(
"{sum}{} ",
current.meta().id().uuid_string()
))
.trim_end()
);
let index_meta = IndexMeta {
index_settings: target_settings,
segments: vec![segment_meta],
schema: target_schema,
opstamp: 0u64,
payload: Some(stats),
};
save_metas(&index_meta, merged_index.directory_mut())?;
Ok(merged_index)
}
pub(crate) struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file every time we need it in the

View File

@@ -1,5 +1,5 @@
use columnar::MonotonicallyMappableToU64;
use common::{BitSet, JsonPathWriter};
use common::JsonPathWriter;
use itertools::Itertools;
use tokenizer_api::BoxTokenStream;
@@ -18,7 +18,9 @@ use crate::postings::{
use crate::schema::document::{Document, Value};
use crate::schema::{FieldEntry, FieldType, Schema, DATE_TIME_PRECISION_INDEXED};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer};
use crate::tokenizer::{
FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer, TokenizerManager,
};
use crate::{DocId, Opstamp, TantivyError};
/// Computes the initial size of the hash table.
@@ -90,8 +92,42 @@ impl SegmentWriter {
let schema = segment.schema();
let tokenizer_manager = segment.index().tokenizers().clone();
let tokenizer_manager_fast_field = segment.index().fast_field_tokenizer().clone();
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
Self::for_segment_serializer(
memory_budget_in_bytes,
schema,
tokenizer_manager,
tokenizer_manager_fast_field,
segment_serializer,
)
}
pub(crate) fn for_segment_with_provided_doc_id_mapping(
memory_budget_in_bytes: usize,
segment: Segment,
) -> crate::Result<Self> {
let schema = segment.schema();
let tokenizer_manager = segment.index().tokenizers().clone();
let tokenizer_manager_fast_field = segment.index().fast_field_tokenizer().clone();
let segment_serializer =
SegmentSerializer::for_segment_with_remapping_required(segment, true)?;
Self::for_segment_serializer(
memory_budget_in_bytes,
schema,
tokenizer_manager,
tokenizer_manager_fast_field,
segment_serializer,
)
}
fn for_segment_serializer(
memory_budget_in_bytes: usize,
schema: Schema,
tokenizer_manager: TokenizerManager,
tokenizer_manager_fast_field: TokenizerManager,
segment_serializer: SegmentSerializer,
) -> crate::Result<Self> {
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema);
let per_field_text_analyzers = schema
.fields()
@@ -136,8 +172,10 @@ impl SegmentWriter {
/// Lay on disk the current content of the `SegmentWriter`
///
/// Finalize consumes the `SegmentWriter`, so that it cannot be used afterwards.
pub fn finalize(self) -> crate::Result<Vec<u64>> {
/// Finalize consumes the `SegmentWriter`, so that it cannot
/// be used afterwards.
pub fn finalize(mut self) -> crate::Result<Vec<u64>> {
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
let mapping: Option<DocIdMapping> = self
.segment_serializer
.segment()
@@ -147,54 +185,31 @@ impl SegmentWriter {
.clone()
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
.transpose()?;
self.finalize_inner(mapping.as_ref())
self.finalize_with_mapping(mapping.as_ref())
}
/// Lay on disk the current content of the `SegmentWriter` using the given doc id mapping.
/// Lay on disk the current content of the `SegmentWriter`, using a caller-provided document
/// order.
///
/// The mapping must cover all documents in this segment and maps the segment's original doc ids
/// to the doc ids that should be written on disk.
///
/// Finalize consumes the `SegmentWriter`, so that it cannot be used afterwards.
pub fn finalize_with_doc_id_mapping(self, mapping: &DocIdMapping) -> crate::Result<Vec<u64>> {
// Ensure the segment writer was created in remap mode so the docstore can be reordered.
if !self
.segment_serializer
.segment()
.index()
.settings()
.manual_doc_id_mapping
{
return Err(TantivyError::InvalidArgument(
"IndexSettings::manual_doc_id_mapping must be set to true".to_string(),
));
}
// Check that the mapping eventually covers all documents in the segment.
if mapping.len() != self.max_doc as usize {
return Err(TantivyError::InvalidArgument(format!(
"Mapping must cover all documents in this segment. Expected {} documents, got {}",
self.max_doc,
mapping.len()
/// `new_doc_id_to_old_doc_id[new_id]` is the old document id of the document that should be
/// serialized at `new_id`.
pub fn finalize_with_doc_id_mapping(
mut self,
new_doc_id_to_old_doc_id: Vec<DocId>,
) -> crate::Result<Vec<u64>> {
if new_doc_id_to_old_doc_id.len() != self.max_doc as usize {
return Err(crate::TantivyError::InvalidArgument(format!(
"provided doc id mapping length {} does not match segment max_doc {}",
new_doc_id_to_old_doc_id.len(),
self.max_doc
)));
}
// Check that the mapping is a permutation of the segment doc ids.
let mut seen_doc_ids = BitSet::with_max_value(self.max_doc);
for old_doc_id in mapping.iter_old_doc_ids() {
if old_doc_id >= self.max_doc || !seen_doc_ids.insert(old_doc_id) {
return Err(TantivyError::InvalidArgument(
"Mapping must be a permutation of the segment doc ids".to_string(),
));
}
}
self.finalize_inner(Some(mapping))
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
let mapping = DocIdMapping::from_new_id_to_old_id(new_doc_id_to_old_doc_id);
self.finalize_with_mapping(Some(&mapping))
}
fn finalize_inner(mut self, mapping: Option<&DocIdMapping>) -> crate::Result<Vec<u64>> {
// Pad before remapping; the mapping indexes fieldnorms by old doc id.
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
fn finalize_with_mapping(self, mapping: Option<&DocIdMapping>) -> crate::Result<Vec<u64>> {
remap_and_write(
self.schema,
&self.per_field_postings_writers,
@@ -202,6 +217,7 @@ impl SegmentWriter {
self.fast_field_writers,
&self.fieldnorms_writer,
self.segment_serializer,
self.max_doc,
mapping,
)?;
let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping);
@@ -466,6 +482,7 @@ fn remap_and_write(
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
mut serializer: SegmentSerializer,
max_doc: DocId,
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<()> {
debug!("remap-and-write");
@@ -487,9 +504,10 @@ fn remap_and_write(
debug!("fastfield-serialize");
fast_field_writers.serialize(serializer.get_fast_field_write(), doc_id_map)?;
// finalize temp docstore and create version, which reflects the doc_id_map
if let Some(doc_id_map) = doc_id_map {
debug!("resort-docstore");
// Finalize the temp docstore and create the final store. With a mapping, the final store
// reflects the new doc id order; without one, it preserves the insertion order.
if serializer.store_is_temp() {
debug!("rewrite-docstore");
let store_write = serializer
.segment_mut()
.open_write(SegmentComponent::Store)?;
@@ -509,7 +527,10 @@ fn remap_and_write(
1, /* The docstore is configured to have one doc per block, and each doc is
* accessed only once: we don't need caching. */
)?;
for old_doc_id in doc_id_map.iter_old_doc_ids() {
let old_doc_ids = doc_id_map
.map(|doc_id_map| doc_id_map.iter_old_doc_ids().collect::<Vec<_>>())
.unwrap_or_else(|| (0..max_doc).collect::<Vec<_>>());
for old_doc_id in old_doc_ids {
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
serializer.get_store_writer().store_bytes(&doc_bytes)?;
}
@@ -531,7 +552,6 @@ mod tests {
use crate::collector::{Count, TopDocs};
use crate::directory::RamDirectory;
use crate::fastfield::FastValue;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::{Postings, TermInfo};
use crate::query::{PhraseQuery, QueryParser};
use crate::schema::{
@@ -544,7 +564,7 @@ mod tests {
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
TantivyDocument, TantivyError, Term, TERMINATED,
TantivyDocument, Term, TERMINATED,
};
#[test]
@@ -1183,118 +1203,4 @@ mod tests {
"Schema error: 'Error getting tokenizer for field: title'"
);
}
/// Builds a `SegmentWriter` with a fast `u64` field and a text field that only some
/// documents populate, so the text field is missing fieldnorms on some docs.
///
/// The `texts` slice provides, for each document, an optional text value. The order
/// number is always recorded in the `order` fast field so callers can recover the
/// original document via that value.
fn build_segment_writer_with_doc_id_mapping(
texts: &[Option<&str>],
) -> (Index, crate::Segment, super::SegmentWriter) {
let mut schema_builder = Schema::builder();
schema_builder.add_u64_field("order", FAST | STORED);
schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let mut index = Index::create_in_ram(schema);
index.settings_mut().manual_doc_id_mapping = true;
let segment = index.new_segment();
let order = index.schema().get_field("order").unwrap();
let text = index.schema().get_field("text").unwrap();
let mut segment_writer =
super::SegmentWriter::for_segment(15_000_000, segment.clone()).unwrap();
for (opstamp, text_opt) in texts.iter().enumerate() {
let mut doc = TantivyDocument::default();
doc.add_u64(order, opstamp as u64);
if let Some(text_value) = text_opt {
doc.add_text(text, *text_value);
}
segment_writer
.add_document(crate::indexer::AddOperation {
opstamp: opstamp as u64,
document: doc,
})
.unwrap();
}
(index, segment, segment_writer)
}
#[test]
fn test_finalize_with_doc_id_mapping_rejects_wrong_length() {
let (_index, _segment, segment_writer) =
build_segment_writer_with_doc_id_mapping(&[Some("a"), Some("b"), Some("c")]);
// Mapping only covers 2 of the 3 documents.
let mapping = DocIdMapping::from_new_id_to_old_id(vec![1, 0]);
let err = segment_writer
.finalize_with_doc_id_mapping(&mapping)
.unwrap_err();
assert!(
matches!(err, TantivyError::InvalidArgument(_)),
"unexpected error: {err:?}"
);
}
#[test]
fn test_finalize_with_doc_id_mapping_rejects_out_of_range() {
let (_index, _segment, segment_writer) =
build_segment_writer_with_doc_id_mapping(&[Some("a"), Some("b")]);
// Doc id 5 does not exist in this segment.
let mapping = DocIdMapping::from_new_id_to_old_id(vec![5, 0]);
let err = segment_writer
.finalize_with_doc_id_mapping(&mapping)
.unwrap_err();
assert!(
matches!(err, TantivyError::InvalidArgument(_)),
"unexpected error: {err:?}"
);
}
#[test]
fn test_finalize_with_doc_id_mapping_rejects_duplicates() {
let (_index, _segment, segment_writer) =
build_segment_writer_with_doc_id_mapping(&[Some("a"), Some("b"), Some("c")]);
// Old doc id 0 appears twice while doc id 2 is missing. The length still matches
// `max_doc`, so this must be caught by the permutation check.
let mapping = DocIdMapping::from_new_id_to_old_id(vec![0, 1, 0]);
let err = segment_writer
.finalize_with_doc_id_mapping(&mapping)
.unwrap_err();
assert!(
matches!(err, TantivyError::InvalidArgument(_)),
"unexpected error: {err:?}"
);
}
#[test]
fn test_finalize_with_doc_id_mapping_remaps_missing_fieldnorms() -> crate::Result<()> {
// doc 0: "alpha beta" (2 tokens)
// doc 1: <no text> (missing fieldnorm -> 0)
// doc 2: "gamma" (1 token)
// doc 3: <no text> (missing fieldnorm -> 0)
let (index, segment, segment_writer) = build_segment_writer_with_doc_id_mapping(&[
Some("alpha beta"),
None,
Some("gamma"),
None,
]);
let max_doc = segment_writer.max_doc();
// Reverse the documents. New doc id i maps to old doc id (3 - i).
let mapping = DocIdMapping::from_new_id_to_old_id(vec![3, 2, 1, 0]);
segment_writer.finalize_with_doc_id_mapping(&mapping)?;
let segment = segment.with_max_doc(max_doc);
let segment_reader = SegmentReader::open(&segment)?;
let text = index.schema().get_field("text").unwrap();
let fieldnorm_reader = segment_reader.get_fieldnorms_reader(text)?;
// After remapping, fieldnorms follow the reversed order:
// new 0 <- old 3 (0), new 1 <- old 2 (1), new 2 <- old 1 (0), new 3 <- old 0 (2)
assert_eq!(fieldnorm_reader.fieldnorm(0), 0);
assert_eq!(fieldnorm_reader.fieldnorm(1), 1);
assert_eq!(fieldnorm_reader.fieldnorm(2), 0);
assert_eq!(fieldnorm_reader.fieldnorm(3), 2);
Ok(())
}
}

View File

@@ -2,9 +2,9 @@ use std::marker::PhantomData;
use crate::indexer::operation::AddOperation;
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{DocIdMapping, SegmentWriter};
use crate::indexer::SegmentWriter;
use crate::schema::document::Document;
use crate::{Directory, Index, IndexMeta, Opstamp, Segment, TantivyDocument};
use crate::{Directory, DocId, Index, IndexMeta, Opstamp, Segment, TantivyDocument};
#[doc(hidden)]
pub struct SingleSegmentIndexWriter<D: Document = TantivyDocument> {
@@ -17,7 +17,8 @@ pub struct SingleSegmentIndexWriter<D: Document = TantivyDocument> {
impl<D: Document> SingleSegmentIndexWriter<D> {
pub fn new(index: Index, mem_budget: usize) -> crate::Result<Self> {
let segment = index.new_segment();
let segment_writer = SegmentWriter::for_segment(mem_budget, segment.clone())?;
let segment_writer =
SegmentWriter::for_segment_with_provided_doc_id_mapping(mem_budget, segment.clone())?;
Ok(Self {
segment_writer,
segment,
@@ -38,39 +39,125 @@ impl<D: Document> SingleSegmentIndexWriter<D> {
}
pub fn finalize(self) -> crate::Result<Index> {
let Self {
segment,
segment_writer,
..
} = self;
let max_doc = segment_writer.max_doc();
segment_writer.finalize()?;
Self::finalize_inner(segment, max_doc)
let max_doc = self.segment_writer.max_doc();
self.segment_writer.finalize()?;
finalize_segment(self.segment, max_doc)
}
pub fn finalize_with_doc_id_mapping(self, mapping: &DocIdMapping) -> crate::Result<Index> {
let Self {
segment,
segment_writer,
..
} = self;
let max_doc = segment_writer.max_doc();
segment_writer.finalize_with_doc_id_mapping(mapping)?;
Self::finalize_inner(segment, max_doc)
}
fn finalize_inner(segment: Segment, max_doc: u32) -> crate::Result<Index> {
let segment: Segment = segment.with_max_doc(max_doc);
let index = segment.index();
let index_meta = IndexMeta {
index_settings: index.settings().clone(),
segments: vec![segment.meta().clone()],
schema: index.schema(),
opstamp: 0,
payload: None,
};
save_metas(&index_meta, index.directory())?;
index.directory().sync_directory()?;
Ok(index.clone())
/// Finalizes this single-segment index using a caller-provided document order.
///
/// `new_doc_id_to_old_doc_id[new_id]` is the old insertion doc id of the document that should
/// be serialized at `new_id`.
pub fn finalize_with_doc_id_mapping(
self,
new_doc_id_to_old_doc_id: Vec<DocId>,
) -> crate::Result<Index> {
let max_doc = self.segment_writer.max_doc();
self.segment_writer
.finalize_with_doc_id_mapping(new_doc_id_to_old_doc_id)?;
finalize_segment(self.segment, max_doc)
}
}
fn finalize_segment(segment: Segment, max_doc: DocId) -> crate::Result<Index> {
let segment: Segment = segment.with_max_doc(max_doc);
let index = segment.index();
let index_meta = IndexMeta {
index_settings: index.settings().clone(),
segments: vec![segment.meta().clone()],
schema: index.schema(),
opstamp: 0,
payload: None,
};
save_metas(&index_meta, index.directory())?;
index.directory().sync_directory()?;
Ok(segment.index().clone())
}
#[cfg(test)]
mod tests {
use crate::collector::TopDocs;
use crate::directory::RamDirectory;
use crate::query::QueryParser;
use crate::schema::{
IndexRecordOption, NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, STORED,
};
use crate::{Index, ReloadPolicy, TantivyDocument};
#[test]
fn test_finalize_with_doc_id_mapping() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", NumericOptions::default().set_fast());
let text_field = schema_builder.add_text_field(
"text",
TextOptions::default().set_stored().set_indexing_options(
TextFieldIndexing::default()
.set_index_option(IndexRecordOption::WithFreqs)
.set_fieldnorms(true),
),
);
let stored_field = schema_builder.add_text_field("stored", STORED);
let schema = schema_builder.build();
let mut writer = Index::builder()
.schema(schema)
.single_segment_index_writer(RamDirectory::create(), 15_000_000)?;
writer.add_document(doc!(
id_field => 10u64,
text_field => "alpha beta",
stored_field => "old-0",
))?;
writer.add_document(doc!(
id_field => 20u64,
text_field => "alpha",
stored_field => "old-1",
))?;
writer.add_document(doc!(
id_field => 30u64,
text_field => "beta",
stored_field => "old-2",
))?;
let index = writer.finalize_with_doc_id_mapping(vec![2, 0, 1])?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
let fast_field = segment_reader
.fast_fields()
.u64("id")?
.first_or_default_col(0);
assert_eq!(fast_field.get_val(0), 30u64);
assert_eq!(fast_field.get_val(1), 10u64);
assert_eq!(fast_field.get_val(2), 20u64);
let fieldnorm_reader = segment_reader.get_fieldnorms_reader(text_field)?;
assert_eq!(fieldnorm_reader.fieldnorm(0), 1);
assert_eq!(fieldnorm_reader.fieldnorm(1), 2);
assert_eq!(fieldnorm_reader.fieldnorm(2), 1);
let mut stored_values = Vec::new();
for doc_id in 0..segment_reader.max_doc() {
let doc: TantivyDocument = segment_reader.get_store_reader(1024)?.get(doc_id)?;
let stored_value = doc
.get_first(stored_field)
.and_then(|value| value.as_str())
.unwrap();
stored_values.push(stored_value.to_string());
}
assert_eq!(stored_values, ["old-2", "old-0", "old-1"]);
let query = QueryParser::for_index(&index, vec![text_field]).parse_query("beta")?;
let top_docs: Vec<(_, _)> =
searcher.search(&query, &TopDocs::with_limit(10).order_by_score())?;
let doc_ids = top_docs
.into_iter()
.map(|(_, doc_address)| doc_address.doc_id)
.collect::<Vec<_>>();
assert_eq!(doc_ids, [0, 1]);
Ok(())
}
}

View File

@@ -229,7 +229,10 @@ pub use crate::index::{
Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, InvertedIndexReader, Order,
Segment, SegmentMeta, SegmentReader,
};
pub use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
pub use crate::indexer::{
merge_segments_with_doc_id_mapping, IndexMerger, IndexWriter, SegmentDocIdMapping,
SingleSegmentIndexWriter,
};
pub use crate::schema::{Document, TantivyDocument, Term};
/// Index format version.